Tuesday, February 1, 2011

Streaming Protocol Buffers

This weekend I implemented a new protobuf feature. It happens to be something that would be very helpful to me in implementing Captain Proto, but I suspect it would also prove useful to many other users.

The code (for C++; I haven't done Java or Python yet) is at:


The text below is copied from my announcement to the mailing list.


Probably the biggest deficiency in the open source protocol buffers libraries today is a lack of built-in support for handling streams of messages. True, it's not too hard for users to support it manually, by prefixing each message with its size. However, this is awkward, and typically requires users to reach into the low-level CodedInputStream/CodedOutputStream classes and do a lot of work manually.

Furthermore, many users want to handle streams of heterogeneous message types. We tell them to wrap their messages in an outer type using the "union" pattern. But this is kind of ugly and has unnecessary overhead.

These problems never really came up in our internal usage, because inside Google we have an RPC system and other utility code which builds on top of Protocol Buffers and provides appropriate abstraction. While we'd like to open source this code, a lot of it is large, somewhat messy, and highly interdependent with unrelated parts of our environment, and no one has had the time to rewrite it all cleanly (as we did with protocol buffers itself).

Proposed solution: Generated Visitors

I've been wanting to fix this for some time now, but didn't really have a good idea how. CodedInputStream is annoyingly low-level, but I couldn't think of much better an interface for reading a stream of messages off the wire.

A couple weeks ago, though, I realized that I had been failing to consider how new kinds of code generation could help this problem. I was trying to think of solutions that would go into the protobuf base library, not solutions that were generated by the protocol compiler.

So then it became pretty clear: A protobuf message definition can also be interpreted as a definition for a streaming protocol. Each field in the message is a kind of item in the stream.

// A stream of Foo and Bar messages, and also strings.
message MyStream {
  // Enables generation of streaming classes.
  option generate_visitors = true;

  repeated Foo foo = 1;
  repeated Bar bar = 2;
  repeated string baz = 3;

All we need to do is generate code appropriate for treating MyStream as a stream, rather than one big message.

My approach is to generate two interfaces, each with two provided implementations. The interfaces are "Visitor" and "Guide". MyStream::Visitor looks like this:

class MyStream::Visitor {
  virtual ~Visitor();

  virtual void VisitFoo(const Foo& foo);
  virtual void VisitBar(const Bar& bar);
  virtual void VisitBaz(const std::string& baz);

The Visitor class has two standard implementations: "Writer" and "Filler". MyStream::Writer writes the visited fields to a CodedOutputStream, using the same wire format as would be used to encode MyStream as one big message. MyStream::Filler fills in a MyStream message object with the visited values.

Meanwhile, Guides are objects that drive Visitors.

class MyStream::Guide {
  virtual ~Guide();

  // Call the methods of the visitor on the Guide's data.
  virtual void Accept(MyStream::Visitor* visitor) = 0;

  // Just fill in a message object directly rather than
  // use a visitor.
  virtual void Fill(MyStream* message) = 0;

The two standard implementations of Guide are "Reader" and "Walker". MyStream::Reader reads items from a CodedInputStream and passes them to the visitor. MyStream::Walker walks over a MyStream message object and passes all the fields to the visitor.

To handle a stream of messages, simply attach a Reader to your own Visitor implementation. Your visitor's methods will then be called as each item is parsed, kind of like "SAX" XML parsing, but type-safe.

Nonblocking I/O

The "Reader" type declared above is based on blocking I/O, but many users would prefer a non-blocking approach. I'm less sure how to handle this, but my thought was that we could provide a utility class like:

class NonblockingHelper {
  template <typename MessageType>
  NonblockingHelper(typename MessageType::Visitor* visitor);

  // Push data into the buffer.  If the data completes any
  // fields, they will be passed to the underlying visitor.
  // Any left-over data is remembered for the next call.
  void PushData(void* data, int size);

With this, you can use whatever non-blocking I/O mechanism you want, and just have to push the data into the NonblockingHelper, which will take care of calling the Visitor as necessary.

No comments:

Post a Comment