A new user interface for you! Read more...

File 0002-TNonblockingServer-TLibEventTransport.patch of Package thrift

Index: thrift-0.9.0/lib/cpp/Makefile.am
===================================================================
--- thrift-0.9.0.orig/lib/cpp/Makefile.am	2012-10-12 02:58:06.000000000 +0200
+++ thrift-0.9.0/lib/cpp/Makefile.am	2014-12-30 15:17:58.000000000 +0100
@@ -181,7 +181,8 @@
                          src/thrift/transport/TTransportUtils.h \
                          src/thrift/transport/TBufferTransports.h \
                          src/thrift/transport/TShortReadTransport.h \
-                         src/thrift/transport/TZlibTransport.h
+                         src/thrift/transport/TZlibTransport.h \
+                         src/thrift/transport/TLibEventTransport.h
 
 include_serverdir = $(include_thriftdir)/server
 include_server_HEADERS = \
Index: thrift-0.9.0/lib/cpp/Makefile.in
===================================================================
--- thrift-0.9.0.orig/lib/cpp/Makefile.in	2012-10-12 02:59:49.000000000 +0200
+++ thrift-0.9.0/lib/cpp/Makefile.in	2014-12-30 15:17:58.000000000 +0100
@@ -618,7 +618,8 @@
                          src/thrift/transport/TTransportUtils.h \
                          src/thrift/transport/TBufferTransports.h \
                          src/thrift/transport/TShortReadTransport.h \
-                         src/thrift/transport/TZlibTransport.h
+                         src/thrift/transport/TZlibTransport.h \
+                         src/thrift/transport/TLibEventTransport.h
 
 include_serverdir = $(include_thriftdir)/server
 include_server_HEADERS = \
Index: thrift-0.9.0/lib/cpp/src/thrift/server/TNonblockingServer.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/server/TNonblockingServer.cpp	2012-10-12 02:58:06.000000000 +0200
+++ thrift-0.9.0/lib/cpp/src/thrift/server/TNonblockingServer.cpp	2014-12-30 15:48:09.000000000 +0100
@@ -25,10 +25,13 @@
 
 #include "TNonblockingServer.h"
 #include <thrift/concurrency/Exception.h>
-#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TLibEventTransport.h>
 #include <thrift/concurrency/PlatformThreadFactory.h>
 
+#include <boost/bind.hpp>
+
 #include <iostream>
+#include <queue>
 
 #ifdef HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
@@ -76,13 +79,6 @@
 using apache::thrift::transport::TTransportException;
 using boost::shared_ptr;
 
-/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
-  SOCKET_RECV_FRAMING,
-  SOCKET_RECV,
-  SOCKET_SEND
-};
-
 /**
  * Five states for the nonblocking server:
  *  1) initialize
@@ -93,7 +89,6 @@
  */
 enum TAppState {
   APP_INIT,
-  APP_READ_FRAME_SIZE,
   APP_READ_REQUEST,
   APP_WAIT_TASK,
   APP_SEND_RESULT,
@@ -116,7 +111,7 @@
   boost::shared_ptr<TProcessor> processor_;
 
   /// Object wrapping network socket
-  boost::shared_ptr<TSocket> tSocket_;
+  boost::shared_ptr<TLibEventTransport> tSocket_;
 
   /// Libevent object
   struct event event_;
@@ -124,8 +119,8 @@
   /// Libevent flags
   short eventFlags_;
 
-  /// Socket mode
-  TSocketState socketState_;
+  /// Tells whether the frame size was read completely.
+  bool readFrameSize_;
 
   /// Application state
   TAppState appState_;
@@ -142,15 +137,12 @@
   /// Read buffer size
   uint32_t readBufferSize_;
 
-  /// Write buffer
-  uint8_t* writeBuffer_;
-
-  /// Write buffer size
-  uint32_t writeBufferSize_;
-
   /// How far through writing are we?
   uint32_t writeBufferPos_;
 
+  /// Write position for external events
+  uint externalWriteBufferPos_;
+
   /// Largest size of write buffer seen since buffer was constructed
   size_t largestWriteBufferSize_;
 
@@ -169,6 +161,9 @@
   /// Transport that processor writes to
   boost::shared_ptr<TMemoryBuffer> outputTransport_;
 
+  /// Transport that processes writes from an external libevent source.
+  boost::shared_ptr<TMemoryBuffer> externalTransport_;
+
   /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
   boost::shared_ptr<TTransport> factoryInputTransport_;
   boost::shared_ptr<TTransport> factoryOutputTransport_;
@@ -185,6 +180,16 @@
   /// Thrift call context, if any
   void *connectionContext_;
 
+  /// Node for queuing write requests from internal and external sources.
+  typedef std::pair<bool, uint32_t> WriteRequest;
+
+  /// Queue with write requests from  outputTransport_ and externalTransport_.
+  std::queue<WriteRequest> writeRequests_;
+
+  /// Queues a write request for internal or external buffer and notifies
+  /// libevent.
+  void scheduleWrite(bool externalBuffer, uint32_t len);
+
   /// Go into read mode
   void setRead() {
     setFlags(EV_READ | EV_PERSIST);
@@ -195,6 +200,11 @@
     setFlags(EV_WRITE | EV_PERSIST);
   }
 
+  /// Resets write mode after all write requests were send.
+  void clearWrite() {
+    clearFlags(EV_WRITE);
+  }
+
   /// Set socket idle
   void setIdle() {
     setFlags(0);
@@ -208,12 +218,37 @@
   void setFlags(short eventFlags);
 
   /**
+   * Clear event flags for this connection.
+   *
+   * @param eventFlags flags we pass to libevent for the connection.
+   */
+  void clearFlags(short eventFlags);
+
+  /**
    * Libevent handler called (via our static wrapper) when the connection
    * socket had something happen.  Rather than use the flags libevent passed,
    * we use the connection state to determine whether we need to read or
    * write the socket.
    */
-  void workSocket();
+  void workSocket(short ev_type);
+
+  /**
+   * Reads frame size and message from socket.
+   */
+  bool readSocket();
+
+  /**
+   * Writes as many queued requests as possible
+   */
+  bool writeSocket();
+
+  /**
+   * Sets the frame size for a message that was written into a TMemoryBuffer.
+   *
+   * @param buf      buffer including a new message at the end
+   * @param msgSize  Size of the message including frame header
+   */
+  void setFrameSize(TMemoryBuffer& buf, uint32_t msgSize);
 
   /// Close this connection and free or reset its resources.
   void close();
@@ -236,7 +271,17 @@
     inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
     outputTransport_.reset(new TMemoryBuffer(
                                     server_->getWriteBufferDefaultSize()));
-    tSocket_.reset(new TSocket());
+    externalTransport_.reset(new TMemoryBuffer(
+                                    server_->getWriteBufferDefaultSize()));
+
+    boost::shared_ptr<TProtocol> externalProtocol =
+        server_->getOutputProtocolFactory()->getProtocol(externalTransport_);
+
+    tSocket_.reset(new TLibEventTransport(
+        externalProtocol,
+        boost::bind(&TConnection::beginExternalWriteEvent, this),
+        boost::bind(&TConnection::endExternalWriteEvent, this),
+        boost::bind(&TConnection::close, this)));
     init(socket, ioThread, addr, addrLen);
   }
 
@@ -244,6 +289,18 @@
     std::free(readBuffer_);
   }
 
+  /**
+   * Callback for TLibEventTransport to write 4 bytes for the frame header
+   * into externalTransport_.
+   */
+  void beginExternalWriteEvent();
+
+  /**
+   * Callback for TLibEventTransport to calculate the frame size and notifiy
+   * libevent for writting.
+   */
+  void endExternalWriteEvent();
+
  /**
    * Check buffers against any size limits and shrink it if exceeded.
    *
@@ -267,13 +324,13 @@
    * C-callable event handler for connection events.  Provides a callback
    * that libevent can understand which invokes connection_->workSocket().
    *
-   * @param fd the descriptor the event occurred on.
-   * @param which the flags associated with the event.
-   * @param v void* callback arg where we placed TConnection's "this".
+   * @param fd      the descriptor the event occurred on.
+   * @param ev_type the flags associated with the event.
+   * @param v       void* callback arg where we placed TConnection's "this".
    */
-  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
+  static void eventHandler(evutil_socket_t fd, short ev_type, void* v) {
     assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
-    ((TConnection*)v)->workSocket();
+    ((TConnection*)v)->workSocket(ev_type);
   }
 
   /**
@@ -394,6 +451,7 @@
                                            socklen_t addrLen) {
   tSocket_->setSocketFD(socket);
   tSocket_->setCachedAddress(addr, addrLen);
+  tSocket_->setEventBase(*(ioThread->getEventBase()));
 
   ioThread_ = ioThread;
   server_ = ioThread->getServer();
@@ -403,12 +461,9 @@
   readBufferPos_ = 0;
   readWant_ = 0;
 
-  writeBuffer_ = NULL;
-  writeBufferSize_ = 0;
-  writeBufferPos_ = 0;
   largestWriteBufferSize_ = 0;
 
-  socketState_ = SOCKET_RECV_FRAMING;
+  readFrameSize_ = true;
   callsForResize_ = 0;
 
   // get input/transports
@@ -436,12 +491,29 @@
   processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
 }
 
-void TNonblockingServer::TConnection::workSocket() {
-  int got=0, left=0, sent=0;
+void TNonblockingServer::TConnection::workSocket(short ev_type) {
+
+  if (ev_type & EV_READ) {
+    if (!readSocket()) {
+        return;
+    }
+  }
+
+  if (ev_type & EV_WRITE) {
+    if (!writeSocket()) {
+        return;
+    }
+  }
+
+  if (ev_type & ~(EV_READ | EV_WRITE))
+    GlobalOutput.printf("TConnection::workSocket: unexpected event type %d",
+                        ev_type & ~(EV_READ | EV_WRITE));
+}
+
+bool TNonblockingServer::TConnection::readSocket() {
   uint32_t fetch = 0;
 
-  switch (socketState_) {
-  case SOCKET_RECV_FRAMING:
+  if (readFrameSize_) {
     union {
       uint8_t buf[sizeof(uint32_t)];
       uint32_t size;
@@ -457,20 +529,22 @@
       if (fetch == 0) {
         // Whenever we get here it means a remote disconnect
         close();
-        return;
+
+        return false;
       }
       readBufferPos_ += fetch;
-    } catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+    } catch(TTransportException& te) {
+      GlobalOutput.printf("TConnection::readSocket(): %s", te.what());
       close();
 
-      return;
+      return false;
     }
 
     if (readBufferPos_ < sizeof(framing.size)) {
       // more needed before frame size is known -- save what we have so far
       readWant_ = framing.size;
-      return;
+
+      return true;
     }
 
     readWant_ = ntohl(framing.size);
@@ -483,84 +557,154 @@
                           readWant_, server_->getMaxFrameSize(),
                           tSocket_->getSocketInfo().c_str());
       close();
-      return;
-    }
-    // size known; now get the rest of the frame
-    transition();
-    return;
 
-  case SOCKET_RECV:
-    // It is an error to be in this state if we already have all the data
-    assert(readBufferPos_ < readWant_);
-
-    try {
-      // Read from the socket
-      fetch = readWant_ - readBufferPos_;
-      got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+      return false;
     }
-    catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
-      close();
 
-      return;
-    }
+    readFrameSize_ = false;
 
-    if (got > 0) {
-      // Move along in the buffer
-      readBufferPos_ += got;
-
-      // Check that we did not overdo it
-      assert(readBufferPos_ <= readWant_);
-
-      // We are done reading, move onto the next state
-      if (readBufferPos_ == readWant_) {
-        transition();
+    // We just read the request length
+    // Double the buffer size until it is big enough
+    if (readWant_ > readBufferSize_) {
+      if (readBufferSize_ == 0) {
+        readBufferSize_ = 1;
       }
-      return;
+      uint32_t newSize = readBufferSize_;
+      while (readWant_ > newSize) {
+        newSize *= 2;
+      }
+
+      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+      if (newBuffer == NULL) {
+        // nothing else to be done...
+        throw std::bad_alloc();
+      }
+      readBuffer_ = newBuffer;
+      readBufferSize_ = newSize;
     }
 
-    // Whenever we get down here it means a remote disconnect
+    readBufferPos_= 0;
+  }
+
+  // size known; now get the rest of the frame
+
+  // It is an error to be in this state if we already have all the data
+  assert(readBufferPos_ < readWant_);
+
+  uint32_t got=0;
+
+  try {
+    // Read from the socket
+    fetch = readWant_ - readBufferPos_;
+    got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+  }
+  catch (TTransportException& te) {
+    GlobalOutput.printf("TConnection::readSocket(): %s", te.what());
     close();
 
-    return;
+    return false;
+  }
+
+  if (got > 0) {
+    // Move along in the buffer
+    readBufferPos_ += got;
 
-  case SOCKET_SEND:
-    // Should never have position past size
-    assert(writeBufferPos_ <= writeBufferSize_);
+    // Check that we did not overdo it
+    assert(readBufferPos_ <= readWant_);
+
+    // We are done reading, move onto the next state
+    if (readBufferPos_ == readWant_) {
+      transition();
+    }
+
+    return true;
+  }
+
+  // Whenever we get down here it means a remote disconnect
+  close();
+
+  return false;
+}
+
+bool TNonblockingServer::TConnection::writeSocket() {
+  while(writeRequests_.empty() == false) {
+    WriteRequest& writeReq = writeRequests_.front();
+
+    TMemoryBuffer* transport = writeReq.first ? externalTransport_.get() :
+        outputTransport_.get();
+
+    uint8_t* writeBuffer;
+    uint32_t writeBufferSize;
+
+    transport->getBuffer(&writeBuffer, &writeBufferSize);
+    transport->borrow(0, &writeBufferSize);
+
+    uint32_t writeSize = writeReq.second;
+    if(writeBufferSize < writeReq.second) {
+      GlobalOutput.printf("WARNING: Write request size %u > %u buffer size.\n",
+                          writeReq.second, writeBufferSize);
+      writeSize = writeBufferSize;
+    }
 
     // If there is no data to send, then let us move on
-    if (writeBufferPos_ == writeBufferSize_) {
+    if (writeBufferSize == 0) {
       GlobalOutput("WARNING: Send state with no data to send\n");
-      transition();
-      return;
+      writeRequests_.pop();
+      continue;
     }
 
+    if (writeBufferSize > largestWriteBufferSize_) {
+      largestWriteBufferSize_ = writeBufferSize;
+    }
+
+    uint32_t sent = 0;
+
     try {
-      left = writeBufferSize_ - writeBufferPos_;
-      sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
+      sent = tSocket_->write_partial(writeBuffer, writeSize);
     }
-    catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
+    catch(TTransportException& te) {
+      GlobalOutput.printf("TConnection::writeSocket(): %s ", te.what());
       close();
-      return;
+
+      return false;
     }
 
-    writeBufferPos_ += sent;
+    // We are done!
+    if (sent == writeBufferSize) {
+      transport->consume(sent);
+      writeRequests_.pop();
+    }
+    else if (sent > 0) {
+      // Message was not  written completely.
+      transport->consume(sent);
 
-    // Did we overdo it?
-    assert(writeBufferPos_ <= writeBufferSize_);
+      // Adjust size in write request.
+      writeReq.second -= sent;
 
-    // We are done!
-    if (writeBufferPos_ == writeBufferSize_) {
-      transition();
+      return true;
     }
+  }
 
-    return;
+  // All queued requests were written, clear write flag.
+  clearWrite();
 
-  default:
-    GlobalOutput.printf("Unexpected Socket State %d", socketState_);
-    assert(0);
+  // Verify that there's no incomplete message in the output buffers.
+  if(outputTransport_->peek() || externalTransport_->peek())
+    return true;
+
+  // it's now safe to perform buffer size housekeeping.
+  if (server_->getResizeBufferEveryN() > 0
+      && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+    checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+                            server_->getIdleWriteBufferLimit());
+    callsForResize_ = 0;
   }
+
+  // Reset write buffer position to the beginning of the buffer.
+  outputTransport_->resetBuffer();
+  externalTransport_->resetBuffer();
+
+  return true;
 }
 
 /**
@@ -580,9 +724,9 @@
     // We are done reading the request, package the read buffer into transport
     // and get back some data from the dispatch function
     inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
-    outputTransport_->resetBuffer();
     // Prepend four bytes of blank space to the buffer so we can
     // write the frame size there later.
+    writeBufferPos_ = outputTransport_->writeEnd();
     outputTransport_->getWritePtr(4);
     outputTransport_->wroteBytes(4);
 
@@ -615,10 +759,10 @@
       return;
     } else {
       try {
-	if (serverEventHandler_ != NULL) {
-	    serverEventHandler_->processContext(connectionContext_,
-						getTSocket());
-	}
+        if (serverEventHandler_ != NULL) {
+            serverEventHandler_->processContext(connectionContext_,
+                                                getTSocket());
+        }
         // Invoke the processor
         processor_->process(inputProtocol_, outputProtocol_,
                             connectionContext_);
@@ -646,107 +790,36 @@
     // the writeBuffer_
 
   case APP_WAIT_TASK:
+  {
     // We have now finished processing a task and the result has been written
     // into the outputTransport_, so we grab its contents and place them into
     // the writeBuffer_ for actual writing by the libevent thread
 
     server_->decrementActiveProcessors();
-    // Get the result of the operation
-    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
-
-    // If the function call generated return data, then move into the send
-    // state and get going
-    // 4 bytes were reserved for frame size
-    if (writeBufferSize_ > 4) {
-
-      // Move into write state
-      writeBufferPos_ = 0;
-      socketState_ = SOCKET_SEND;
-
-      // Put the frame size into the write buffer
-      int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
-      memcpy(writeBuffer_, &frameSize, 4);
-
-      // Socket into write mode
-      appState_ = APP_SEND_RESULT;
-      setWrite();
 
-      // Try to work the socket immediately
-      // workSocket();
-
-      return;
-    }
-
-    // In this case, the request was oneway and we should fall through
-    // right back into the read frame header state
-    goto LABEL_APP_INIT;
-
-  case APP_SEND_RESULT:
-    // it's now safe to perform buffer size housekeeping.
-    if (writeBufferSize_ > largestWriteBufferSize_) {
-      largestWriteBufferSize_ = writeBufferSize_;
-    }
-    if (server_->getResizeBufferEveryN() > 0
-        && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
-      checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
-                              server_->getIdleWriteBufferLimit());
-      callsForResize_ = 0;
+    // If the processor did not write a response we have to remove again
+    // the 4 bytes from the frame size.
+    uint32_t writePos = outputTransport_->writeEnd();
+    if (writePos == writeBufferPos_ + 4)
+      outputTransport_->revertLastWrite(4);
+    else {
+      uint32_t size = writePos - writeBufferPos_;
+      setFrameSize(*outputTransport_, size);
+      scheduleWrite(false, size);
     }
 
     // N.B.: We also intentionally fall through here into the INIT state!
-
-  LABEL_APP_INIT:
+  }
   case APP_INIT:
 
-    // Clear write buffer variables
-    writeBuffer_ = NULL;
-    writeBufferPos_ = 0;
-    writeBufferSize_ = 0;
-
     // Into read4 state we go
-    socketState_ = SOCKET_RECV_FRAMING;
-    appState_ = APP_READ_FRAME_SIZE;
+    readFrameSize_ = true;
+    appState_ = APP_READ_REQUEST;
 
     readBufferPos_ = 0;
 
     // Register read event
     setRead();
-
-    // Try to work the socket right away
-    // workSocket();
-
-    return;
-
-  case APP_READ_FRAME_SIZE:
-    // We just read the request length
-    // Double the buffer size until it is big enough
-    if (readWant_ > readBufferSize_) {
-      if (readBufferSize_ == 0) {
-        readBufferSize_ = 1;
-      }
-      uint32_t newSize = readBufferSize_;
-      while (readWant_ > newSize) {
-        newSize *= 2;
-      }
-
-      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
-      if (newBuffer == NULL) {
-        // nothing else to be done...
-        throw std::bad_alloc();
-      }
-      readBuffer_ = newBuffer;
-      readBufferSize_ = newSize;
-    }
-
-    readBufferPos_= 0;
-
-    // Move into read request state
-    socketState_ = SOCKET_RECV;
-    appState_ = APP_READ_REQUEST;
-
-    // Work the socket right away
-    // workSocket();
-
     return;
 
   case APP_CLOSE_CONNECTION:
@@ -760,9 +833,59 @@
   }
 }
 
+void TNonblockingServer::TConnection::beginExternalWriteEvent() {
+  // Prepend four bytes of blank space to the buffer so we can
+  // write the frame size there later.
+  externalWriteBufferPos_ = externalTransport_->writeEnd();
+  externalTransport_->getWritePtr(4);
+  externalTransport_->wroteBytes(4);
+}
+
+void TNonblockingServer::TConnection::endExternalWriteEvent() {
+  // If the external function did not write anything we have to remove again
+  // the 4 bytes from the frame size.
+  uint32_t writePos = externalTransport_->writeEnd();
+
+  if (writePos == externalWriteBufferPos_ + 4)
+    externalTransport_->revertLastWrite(4);
+  else {
+    uint32_t size = writePos - externalWriteBufferPos_;
+    setFrameSize(*externalTransport_, size);
+
+    scheduleWrite(true, size);
+  }
+}
+
+void TNonblockingServer::TConnection::setFrameSize(
+    TMemoryBuffer& buf, uint32_t size) {
+  uint8_t* begin = buf.getWritePtr(0) - size;
+
+  // Put the frame size into the write buffer (subtract 4 bytes from
+  // for frame size header).
+  int32_t frameSize = (int32_t)htonl(size - 4);
+  memcpy(begin, &frameSize, 4);
+}
+
+void TNonblockingServer::TConnection::scheduleWrite(
+    bool externalBuffer, uint32_t len) {
+  if( writeRequests_.empty() ) {
+    writeRequests_.push(WriteRequest(externalBuffer, len));
+    setWrite();
+    return;
+  }
+
+  WriteRequest& req = writeRequests_.back();
+
+  // We can extend the last write request if the buffer is the same.
+  if(externalBuffer == req.first)
+    req.second += len;
+  else
+    writeRequests_.push(WriteRequest(externalBuffer, len));
+}
+
 void TNonblockingServer::TConnection::setFlags(short eventFlags) {
   // Catch the do nothing case
-  if (eventFlags_ == eventFlags) {
+  if ((eventFlags_ | eventFlags) == eventFlags_) {
     return;
   }
 
@@ -775,7 +898,7 @@
   }
 
   // Update in memory structure
-  eventFlags_ = eventFlags;
+  eventFlags_ |= eventFlags;
 
   // Do not call event_set if there are no flags
   if (!eventFlags_) {
@@ -819,11 +942,44 @@
   }
 }
 
+void TNonblockingServer::TConnection::clearFlags(short eventFlags) {
+  if ((~eventFlags & eventFlags_) == eventFlags_)
+    return;
+
+  short newFlags = ~eventFlags & eventFlags_;
+
+  eventFlags_ = 0;
+  if (event_del(&event_) == -1) {
+    GlobalOutput("TConnection::setFlags event_del");
+    return;
+  }
+
+  setFlags(newFlags);
+}
+
 /**
  * Closes a connection
  */
 void TNonblockingServer::TConnection::close() {
-  // Delete the registered libevent
+  // Close the socket
+  tSocket_->close();
+
+  // close any factory produced transports
+  factoryInputTransport_->close();
+  factoryOutputTransport_->close();
+
+  // Give this object back to the server that owns it
+  processor_.reset();
+
+  // Remove all queued write requests
+  while(writeRequests_.empty() == false)
+    writeRequests_.pop();
+
+  // Reset write buffer position to the beginning of the buffer.
+  outputTransport_->resetBuffer();
+  externalTransport_->resetBuffer();
+
+  // Remove event from the set of monitored events
   if (event_del(&event_) == -1) {
     GlobalOutput.perror("TConnection::close() event_del", errno);
   }
@@ -831,16 +987,9 @@
   if (serverEventHandler_ != NULL) {
     serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
   }
-  ioThread_ = NULL;
-
-  // Close the socket
-  tSocket_->close();
 
-  // close any factory produced transports
-  factoryInputTransport_->close();
-  factoryOutputTransport_->close();
+  ioThread_ = NULL;
 
-  // Give this object back to the server that owns it
   server_->returnConnection(this);
 }
 
@@ -856,6 +1005,7 @@
   if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
     // just start over
     outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
+    externalTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
     largestWriteBufferSize_ = 0;
   }
 }
@@ -1122,7 +1272,7 @@
 void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
   threadManager_ = threadManager;
   if (threadManager != NULL) {
-    threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
+    threadManager->setExpireCallback(boost::bind(&TNonblockingServer::expireClose, this, _1));
     threadPoolProcessing_ = true;
   } else {
     threadPoolProcessing_ = false;
Index: thrift-0.9.0/lib/cpp/src/thrift/transport/TBufferTransports.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/transport/TBufferTransports.cpp	2012-10-12 02:58:06.000000000 +0200
+++ thrift-0.9.0/lib/cpp/src/thrift/transport/TBufferTransports.cpp	2014-12-30 15:17:58.000000000 +0100
@@ -374,6 +374,14 @@
   wBase_ += len;
 }
 
+void TMemoryBuffer::revertLastWrite(uint32_t len)
+{
+  if (wBase_ - len < rBase_)
+    throw TTransportException("Can't reset write pointer before read pointer");
+
+  wBase_ -= len;
+}
+
 const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) {
   (void) buf;
   rBound_ = wBase_;
Index: thrift-0.9.0/lib/cpp/src/thrift/transport/TBufferTransports.h
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/transport/TBufferTransports.h	2012-10-12 02:58:06.000000000 +0200
+++ thrift-0.9.0/lib/cpp/src/thrift/transport/TBufferTransports.h	2014-12-30 15:17:58.000000000 +0100
@@ -683,6 +683,9 @@
   // that had been provided by getWritePtr().
   void wroteBytes(uint32_t len);
 
+  // Resets the write pointer by len bytes if the were not yet read.
+  void revertLastWrite(uint32_t len);
+
   /*
    * TVirtualTransport provides a default implementation of readAll().
    * We want to use the TBufferBase version instead.
Index: thrift-0.9.0/lib/cpp/src/thrift/transport/TLibEventTransport.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ thrift-0.9.0/lib/cpp/src/thrift/transport/TLibEventTransport.h	2014-12-30 15:48:09.000000000 +0100
@@ -0,0 +1,126 @@
+#ifndef _THRIFT_TRANSPORT_TLIB_EVENTTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TLIB_EVENTTRANSPORT_H_ 1
+
+#include "TSocket.h"
+#include "TBufferTransports.h"
+
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+struct event_base;
+
+namespace apache { namespace thrift {
+namespace protocol {
+  class TProtocol;
+}
+namespace transport {
+
+/**
+ * TLibEventTransport is used by TNonblockingServer to integrate events from
+ * an external libevent source.
+ *
+ * To support asynchronous communication between a thrift client and server the
+ * following pattern is required:
+ * - all request in a Thrift service must be defined as oneway
+ * - an additional "callback" service is required to send asynchronous
+ *   responses and updates from server to client. The "callback" service is
+ *   implemented by the client. Here as well all request have to be oneway.
+ *   Since all request are oneway they can share the same thrift connection.
+ * - In the server the callback client object must be initialized in the
+ *   handler factory with the protocol from  TConnectionInfo. For
+ *   TLibEventTransport one has to use TLibEventTransport::getProtocol().
+ * - In the client one has to create an own thread to handle the callbacks.
+ *   One only has to call process of the generated "callback" processor with
+ *   the protocols created for the server connection.
+ *
+ * For an external event beginWriteEvent() must be called before using the
+ * the callback object to write 4 bytes for the frame size into the output
+ * transport. After using the callback object endWriteEvent() must be called
+ * to calculate and set the frame size and notify libevent for writting.
+ */
+class TLibEventTransport : public TSocket
+{
+public:
+  typedef boost::function<void ()>      EventCb;
+
+  /*!
+   * TLibEventTransport is created by TNonblockingServer when a new client
+   * connection is accepted.
+   *
+   * @param protocol      protocol for the callback object
+   * @param beginEventCb  callback to start sending an asynchronous event
+   * @param endEventCb    callback to finish sending an asynchronous event
+   * @param closeEventCb  callback to close transport and deregister from
+   *                      libevent mainloop
+   */
+  TLibEventTransport(boost::shared_ptr<protocol::TProtocol>& protocol,
+                     EventCb beginEventCb,
+                     EventCb endEventCb,
+                     EventCb closeEventCb) :
+    eventBase_(0),
+    protocol_(protocol),
+    beginWriteEventCb_(beginEventCb),
+    endWriteEventCb_(endEventCb),
+    closeEventCb_(closeEventCb) {}
+
+  /**
+   * @returns event_base used by this transport.
+   */
+  event_base& getEventBase() const {
+    assert(eventBase_);
+    return *eventBase_;
+  }
+
+  /*!
+   * Setting correct event_base from TNonblockingServer.
+   *
+   * @param ev            event_base used by TNonblockingServer
+   */
+  void setEventBase(event_base& ev) {
+    eventBase_ = &ev;
+  }
+
+  /**
+   * @returns the protocol for the callback object.
+   */
+  boost::shared_ptr<protocol::TProtocol>& getProtocol() {
+    return protocol_;
+  }
+
+  /**
+   * Writes 4 bytes for the frame header into the output transport.
+   *
+   * Must be called before using a request from the callback object.
+   */
+  void beginWriteEvent() {
+    beginWriteEventCb_();
+  }
+
+  /**
+   * Calculates the frame size and notifies libevent for writting.
+   *
+   * Must be called after using a request from the callback object.
+   */
+  void endWriteEvent() {
+    endWriteEventCb_();
+  }
+
+  /**
+   * Closes the socket and deregister from libevent main loop.
+   */
+  void closeTransport() {
+      closeEventCb_();
+  }
+
+private:
+  event_base* eventBase_;
+  boost::shared_ptr<protocol::TProtocol>  protocol_;
+  EventCb beginWriteEventCb_;
+  EventCb endWriteEventCb_;
+  EventCb closeEventCb_;
+};
+
+}}} // apache::thrift::transport
+
+
+#endif // _THRIFT_TRANSPORT_TLIB_EVENTTRANSPORT_H_