Tuesday, February 22, 2011

Converting Ekam to C++0x

I converted Ekam to C++0x. As always, all code is at:


Note that Ekam now requires a C++0x compiler. Namely, it needs GCC 4.6, which is not officially released yet. I didn't have much trouble compiling and using the latest snapshot, but I realize that it is probably more work than most people want to do. Hopefully 4.6 will be officially released soon.


When writing Ekam, with no company style guide to stop me, I have found myself developing a very specific and unique style of C++ programming with a heavy reliance on RAII. Some features of this style:

  • I never use operator new directly (much less malloc()), but instead use a wrapper which initializes an OwnedPtr. This class is like scoped_ptr in that it wraps a pointer and automatically deletes it when the OwnedPtr is destroyed. However, unlike scope_ptr, there is no way to release a pointer from an OwnedPtr except by transferring it to another OwnedPtr. Thus, the only way that an object pointed to by an OwnedPtr could ever be leaked (i.e. become unreachable without being reclaimed) is if you constructed an OwnedPtr cycle. This is actually quite hard to do by accident -- much harder than creating a cycle of regular pointers.
  • Ekam is heavily event-driven. Any function call which starts an asynchronous operation returns an OwnedPtr<AsyncOperation>. Deleting this object cancels the operation.
  • All OS handles (e.g. file descriptors) are wrapped in objects that automatically close them.

These features turn out to work extremely well together.

A common problem in multi-tasking C++ code (whether based on threads or events) is that cancellation is very difficult. Typically, an asynchronous operation calls some callback at some future time, and the caller is expected to ensure that the callback's context is still valid at the time that it is called. If you're lucky, the operation can be canceled by calling some separate cancel() function. However, it's often the case that this function simply causes the callback to complete sooner, because it's considered too easy to leak memory if an expected callback is never called. So, you still have to wait for the callback.

So what happens if you really just want to kill off an entire large, complex chunk of your program all at once? It turns out this is something I need to do in Ekam. If a build action is in progress and one of its inputs changes, the action should be immediately halted. But actions can involve arbitrary code that can get fairly complex. What can Ekam do about it?

Well, with the style I've been using, cancellation is actually quite easy. Because all allocated objects must be anchored to another object via an OwnedPtr, if you delete a high-level object, you can be pretty sure that all the objects underneath will be cleanly deleted. And because asychronous operations are themselves represented using objects, and deleting those objects cancel the corresponding operations, it's nearly impossible to accidentally leave an operation running after its context has been deleted.

Problem: OwnedPtr transferral

So what does this have to do with C++0x? Well, there are some parts of my style that turn out to be a bit awkward.

Transferring an OwnedPtr to another OwnedPtr looked like this:

OwnedPtr<MyObject> ptr1, ptr2;

Looks fine, but this means that the way to pass ownership into a function call is by passing a pointer to an OwnedPtr, getting a little weird:

void Foo::takeOwnership(OwnedPtr<Bar>* barToAdopt) {


Foo foo;
OwnedPtr<Bar> bar;

Returning an OwnedPtr is even more awkward:

void Foo::releaseOwnership(OwnedPtr<Bar>* output) {


OwnedPtr<Bar> bar;

Furthermore, the way to allocate an owned object was through a method of OwnedPtr itself, which was kind of weird to call:

OwnedPtr<Bar> bar;
bar.allocate(constructorParam1, constructorParam2);

This turned out to be particularly ugly when allocating a subclass:

OwnedPtr<BarSub> barSub;
barSub.allocate(constructorParam1, constructorParam2);
OwnedPtr<Bar> bar;

So I made a shortcut for that:

OwnedPtr<Bar> bar;
    constructorParam1, constructorParam2);

Still, dealing with OwnedPtrs remained difficult. They just didn't flow right with the rest of the language.

Rvalue references

This is all solved by C++0x's new "rvalue references" feature. When a function takes an "rvalue reference" as a parameter, it only accepts references to values which are safe to clobber, either because the value is an unnamed temporary (which will be destroyed immediately when the function returns) or because the caller has explicitly indicated that it's OK to clobber the value.

Most of the literature on rvalue references talks about how they can be used to avoid unnecessary copies and to implement "perfect forwarding". These are nice, but what I really want is to implement a type that can only be moved, not copied. OwnedPtrs explicitly prohibit copying, since this would lead to double-deletion. However, moving an OwnedPtr is perfectly safe. By implementing move semantics using rvalue references, I was able to make it possible to pass OwnedPtrs around using natural syntax, without any risk of unexpected ownership stealing (as with the old auto_ptr).

Now the code samples look like this:

// Transferring ownership.
OwnedPtr<MyObject> ptr1, ptr2;
ptr1 = ptr2.release();

// Passing ownership to a method.
void Foo::takeOwnership(OwnedPtr<Bar> bar) {
  this->bar = bar.release();
Foo foo;
OwnedPtr<Bar> bar;

// Returning ownership from a method.
OwnedPtr<Bar> Foo::releaseOwnership() {
  return this->bar.release();
OwnedPtr<Bar> bar = foo.releaseOwnership();

// Allocating an object.
OwnedPtr<Bar> bar = newOwned<Bar>(
    constructorParam1, constructorParam2);

// Allocating a subclass.
OwnedPtr<Bar> bar = newOwned<BarSub>(
    constructorParam1, constructorParam2);

So much nicer! Notice that the release() method is always used in contexts where ownership is being transfered away from a named OwnedPtr. This makes it very clear what is going on and avoids accidents. Notice also that release() is NOT needed if the OwnedPtr is an unnamed temporary, which allows complex expressions to be written relatively naturally.

Problem: Callbacks

While working better than typical callback-based systems, my style for asynchronous operations in Ekam was still fundamentally based on callbacks. This typically involved a lot of boilerplate. For example, here is some code to implement an asynchronous read, based on the EventManager interface which provides asynchronous notification of readability:

class ReadCallback {
  virtual ~ReadCallback();
  virtual void done(size_t actual);
  virtual void error(int number);

OwnedPtr<AsyncOperation> readAsync(
    EventManager* eventManager,
    int fd, void* buffer, size_t size,
    ReadCallback* callback) {
  class ReadOperation: public EventManager::IoCallback,
                       public AsyncOperation {
    ReadOperation(int fd, void* buffer, size_t size, 
                  ReadCallback* callback)
        : fd(fd), buffer(buffer), size(size),
          callback(callback) {}
    ~ReadOperation() {}
    OwnedPtr<AsyncOperation> inner;
    // implements IoCallback
    virtual void ready() {
      ssize_t n = read(fd, buffer, size);
      if (n < 0) {
      } else {
    int fd;
    void* buffer;
    size_t size;
    ReadCallback* callback;

  OwnedPtr<ReadOperation> result =
        fd, buffer, size, callback);
  result.inner = eventManager->onReadable(fd, result.get());
  return result.release();

That's a lot of code to do something pretty trivial. Additionally, the fact that callbacks transfer control from lower-level objects to higher-level ones causes some problems:

  • Exceptions can't be used, because they would propagate in the wrong direction.
  • When the callback returns, the calling object may have been destroyed. Detecting this situation is hard, and delaying destruction if needed is harder. Most callback callers are lucky enough not to have anything else to do after the call, but this isn't always the case.

C++0x introduces lambdas. Using them, I implemented E-style promises. Here's what the new code looks like:

Promise<size_t> readAsync(
    EventManager* eventManager,
    int fd, void* buffer, size_t size) {
  return eventManager->when(eventManager->onReadable(fd))(
    [=](Void) -> size_t {
      ssize_t n = read(fd, buffer, size);
      if (n < 0) {
        throw OsError("read", errno);
      } else {
        return n;

Isn't that pretty? It does all the same things as the previous code sample, but with so much less code. Here's another example which calls the above:

Promise<size_t> readPromise = readAsync(
    eventManager, fd, buffer, size);
Promise<void> pendingOp =
      [=](size_t actual) {
        // Copy to stdout.
        write(STDOUT_FILENO, buffer, actual);
      }, [](MaybeException error) {
        try {
          // Force exception to be rethrown.
        } catch (const OsError& e) {
          fprintf(stderr, "%s\n", e.what());

Some points:

  • The return value of when() is another promise, for the result of the lambda.
  • The lambda can return another promise instead of a value. In this case the new promise will replace the old one.
  • You can pass multiple promises to when(). The lambda will be called when all have completed.
  • If you give two lambdas to when(), the second one is called in case of exceptions. Otherwise, exceptions simply propagate to the lambda returned by when().
  • Promise callbacks are never executed synchronously; they always go through an event queue. Therefore, the body of a promise callback can delete objcets without worrying that they are in use up the stack.
  • when() takes ownership of all of its arguments (using rvalue reference "move" semantics). You can actually pass things other than promises to it; they will simply be passed through to the callback. This is useful for making sure state required by the callback is not destroyed in the meantime.
  • If you destroy a promise without passing it to when(), whatever asynchronous operation it was bound to is canceled. Even if the promise was already fulfilled and the callback is simply sitting on the event queue, it will be removed and will never be called.

Having refactored all of my code to use promises, I do find them quite a bit easier to use. For example, it turns out that much of the complication in using Linux's inotify interface, which I whined about a few months ago, completely went away when I started using promises, because I didn't need to worry about callbacks interfering with each other.


C++ is still a horribly over-complicated language, and C++0x only makes that worse. The implementation of promises is a ridiculous mess of template magic that is pretty inscrutable. However, for those who deeply understand C++, C++0x provides some very powerful features. I'm pretty happy with the results.

Tuesday, February 1, 2011

Streaming Protocol Buffers

This weekend I implemented a new protobuf feature. It happens to be something that would be very helpful to me in implementing Captain Proto, but I suspect it would also prove useful to many other users.

The code (for C++; I haven't done Java or Python yet) is at:


The text below is copied from my announcement to the mailing list.


Probably the biggest deficiency in the open source protocol buffers libraries today is a lack of built-in support for handling streams of messages. True, it's not too hard for users to support it manually, by prefixing each message with its size. However, this is awkward, and typically requires users to reach into the low-level CodedInputStream/CodedOutputStream classes and do a lot of work manually.

Furthermore, many users want to handle streams of heterogeneous message types. We tell them to wrap their messages in an outer type using the "union" pattern. But this is kind of ugly and has unnecessary overhead.

These problems never really came up in our internal usage, because inside Google we have an RPC system and other utility code which builds on top of Protocol Buffers and provides appropriate abstraction. While we'd like to open source this code, a lot of it is large, somewhat messy, and highly interdependent with unrelated parts of our environment, and no one has had the time to rewrite it all cleanly (as we did with protocol buffers itself).

Proposed solution: Generated Visitors

I've been wanting to fix this for some time now, but didn't really have a good idea how. CodedInputStream is annoyingly low-level, but I couldn't think of much better an interface for reading a stream of messages off the wire.

A couple weeks ago, though, I realized that I had been failing to consider how new kinds of code generation could help this problem. I was trying to think of solutions that would go into the protobuf base library, not solutions that were generated by the protocol compiler.

So then it became pretty clear: A protobuf message definition can also be interpreted as a definition for a streaming protocol. Each field in the message is a kind of item in the stream.

// A stream of Foo and Bar messages, and also strings.
message MyStream {
  // Enables generation of streaming classes.
  option generate_visitors = true;

  repeated Foo foo = 1;
  repeated Bar bar = 2;
  repeated string baz = 3;

All we need to do is generate code appropriate for treating MyStream as a stream, rather than one big message.

My approach is to generate two interfaces, each with two provided implementations. The interfaces are "Visitor" and "Guide". MyStream::Visitor looks like this:

class MyStream::Visitor {
  virtual ~Visitor();

  virtual void VisitFoo(const Foo& foo);
  virtual void VisitBar(const Bar& bar);
  virtual void VisitBaz(const std::string& baz);

The Visitor class has two standard implementations: "Writer" and "Filler". MyStream::Writer writes the visited fields to a CodedOutputStream, using the same wire format as would be used to encode MyStream as one big message. MyStream::Filler fills in a MyStream message object with the visited values.

Meanwhile, Guides are objects that drive Visitors.

class MyStream::Guide {
  virtual ~Guide();

  // Call the methods of the visitor on the Guide's data.
  virtual void Accept(MyStream::Visitor* visitor) = 0;

  // Just fill in a message object directly rather than
  // use a visitor.
  virtual void Fill(MyStream* message) = 0;

The two standard implementations of Guide are "Reader" and "Walker". MyStream::Reader reads items from a CodedInputStream and passes them to the visitor. MyStream::Walker walks over a MyStream message object and passes all the fields to the visitor.

To handle a stream of messages, simply attach a Reader to your own Visitor implementation. Your visitor's methods will then be called as each item is parsed, kind of like "SAX" XML parsing, but type-safe.

Nonblocking I/O

The "Reader" type declared above is based on blocking I/O, but many users would prefer a non-blocking approach. I'm less sure how to handle this, but my thought was that we could provide a utility class like:

class NonblockingHelper {
  template <typename MessageType>
  NonblockingHelper(typename MessageType::Visitor* visitor);

  // Push data into the buffer.  If the data completes any
  // fields, they will be passed to the underlying visitor.
  // Any left-over data is remembered for the next call.
  void PushData(void* data, int size);

With this, you can use whatever non-blocking I/O mechanism you want, and just have to push the data into the NonblockingHelper, which will take care of calling the Visitor as necessary.