Comparing OpenDDS and ZeroMQ Usage and Performance

by
Don Busch, Principal Software Engineer and Partner
Object Computing, Inc. (OCI)

Introduction

High-performance messaging middleware is an increasingly important infrastructure component of any distributed application. A well-designed messaging infrastructure can minimize dependencies between application components, enabling evolution of the software as requirements change and as new components are integrated.

There are many such messaging middleware systems. One is OpenDDS (www.opendds.org), OCI's open source implementation of the Object Management Group (OMG) Data Distribution Service (DDS) specification. DDS supports high-performance publish-subscribe messaging through type-safe interfaces over custom, high-speed transports with configurable Quality-of-Service guarantees. DDS-based solutions have been successfully applied to many thorny middleware problems.

The second messaging middleware product mentioned in the title of this article is ZeroMQ (www.zeromq.org). ZeroMQ is a middleware product with a different focus than OpenDDS. While OpenDDS is type-safe and emphasizes publish-subscribe behavior, ZeroMQ is a lightweight socket-like message queueing layer that sends raw message buffers from sender to receiver. OpenDDS can also send raw buffers from sender to receiver, but that is not its target usage.

The event that drove the research and testing in this article was a conversation with a prospective customer who had an existing .NET-based application and was planning to integrate either OpenDDS or ZeroMQ as messaging middleware underneath it. The customer planned to stream .NET objects into a raw buffer and send that buffer across the messaging middleware.

Before we ran the tests, we wondered what the tradeoffs would be between in terms of ease-of-use and performance in using OpenDDS to do that as opposed to ZeroMQ. As we said above, sending a raw buffer through OpenDDS is not its best use case, but it's worthwhile to do the comparison to characterize relative performance. The raw buffer test is definitely going to favor ZeroMQ since that's what it is designed for. In a subsequent test, we'll send typed C++ data through both OpenDDS and ZeroMQ to find out how much overhead is added to each test by strong typing. That test is closer to OpenDDS's intended use. We'll test OpenDDS using a regular IDL struct, and we'll test ZeroMQ with a C++ struct using both Boost Serialization and Google Protocol Buffers to simulate a likely application developer use case. The use of strongly typed data will slow the ZeroMQ test cases down, but also makes them more realistic for a wider range of applications. Until we test, we are not sure whether OpenDDS or ZeroMQ will be faster for the strongly typed tests.

Finally, as a baseline, we'll test with Boost.Asio. Boost.Asio is a thin layer on top of sockets. We are interested in determining how ZeroMQ's performance compares to Boost.Asio's performance, and how ease-of-use compares as well. In other words, what does ZeroMQ provide in ease-of-use over Boost.Asio, and what do you pay in performance?

In the raw buffer test cases, we expect Boost.Asio have the best performance, followed by ZeroMQ, and lastly OpenDDS. In the typed C++ data tests, we're not sure what kind of performance to expect until we run the tests -- but we expect to find out how efficient OpenDDS is at sending strongly typed data across the wire.

One item worth noting is that we do not claim to be ZeroMQ experts. We have used references and documentation on the ZeroMQ web site to learn how to set up our ZeroMQ tests. We have built all relevant software with debugging turned off to enable apples-to-apples performance comparisons. Since the source code for all of the tests is available at the end of the article, feel free to download them yourself, run the tests, and make modifications as you see fit. See the README files in the root directory and the bin directory for more information.

To summarize, our performance tests are as follows:

Performance

These tests were executed on an old laptop with a 1.6 GHz Intel Pentium 4 and 2 GB RAM running Windows XP. We're not interested in raw numbers so much as we're interested in comparisons, so the speed of the test machine is not very important. We could have run most of these tests on a much faster Linux workstation, but we wouldn't have been able to run the .NET tests with Microsoft's compiler and runtime.

OpenDDS Raw Buffer Test

OpenDDS applications use a "topic" as a rendezvous point. In other words, OpenDDS publications and subscriptions find each other based on nothing more than a topic name. A federation-capable deamon, the OpenDDS InfoRepo, associates publications and subscriptions, enabling the publication and subscription applications to be coded without any knowledge of the other side's endpoints. This architecture differs from that of the ZeroMQ and Boost.Asio examples, as we will see later.

The OpenDDS raw buffer test code simply publishes a raw data buffer on an OpenDDS topic. The code is in opendds/idl/Raw.idl, opendds/cpp/RawPubisher.cpp, and opendds/cpp.RawSubscriber.cpp. We define an IDL struct that simply contains a raw octet buffer, as shown below:

module MiddlewareNewsBrief
{
  const string RAW_BUFFER_TOPIC_NAME = "MiddlewareNewsBrief::RawBuffer";
  const string RAW_BUFFER_TOPIC_TYPE = "MiddlewareNewsBrief::RawBuffer";
#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::RawBuffer"
  typedef sequence<octet> BufferType;
  struct RawBuffer
  {
    BufferType buffer;
  };
};

An octet is a raw byte, and a sequence of octets represents a raw byte array that can grow and shrink. To use this, we'll write an OpenDDS publishing process and an OpenDDS subscribing process to create raw octet buffers and write them across the wire. The publisher side publishes a data sample containing a raw buffer; the subscriber receives it, and publishes an echo sample back to the publisher. We measure total latency for each sample by measuring the time it takes to make that round trip. Then we send the next sample.

We won't show the OpenDDS publishing and subscribing code here simply because we have a lot of other code to show, and you can see other examples of OpenDDS publishing or subscribing code by examining the attached code files or by browsing through this introductory OpenDDS article.

To execute the test, we first run the OpenDDS daemon process, the DCPSInfoRepo. This process is the rendezvous point for publications and subscriptions, enabling them to find each other based on the topic name. In this particular command-line, the DCPSInfoRepo writes out its reference to a file called repo.ior. We could also tell the DCPSInfoRepo to listen on a particular endpoint, as documented in this article. Obviously, we're running this particular test (and all of the others) on one host because we're not interested in measuring the latency of the network itself.

Note that the MNB_ROOT environment variable must point to the directory where the Middleware News Brief code has been downloaded and unzipped. See the README file in that directory for more information on setting up the environment.

We run the DCPSInfoRepo daemon as follows:

  %DDS_ROOT%\bin\DCPSInfoRepo -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf -o repo.ior

We then run an OpenDDS Raw Buffer subscribing process:

  %MNB_ROOT%\bin\OpenDdsRawSubscriber \
      -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \
      -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \
      -DCPSInfoRepo file://repo.ior

Finally, we run an OpenDDS Raw Buffer publishing process to publish one message to our interested subscriber, wait for an echo of that message back from that subscriber, and repeat that loop 999 more times.

  %MNB_ROOT%\bin\OpenDdsRawPublisher \
      -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \
      -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \
      -DCPSInfoRepo file://repo.ior -num 1000

In the publisher's window, we should see output that looks something like this, measuring the average one-way latency of each of the 1000 messages:

  Number of messages is 1000
  Messages size is 1000
  Writing...


  Average latency in milliseconds: 0.185


So, that's our approximate baseline for OpenDDS — 0.185 milliseconds, or 185 microseconds. Repeated runs of the test will show different values, so you will want to run the test several times to get a clearer picture of the average latency.

We can run more than one subscribing process at a time simply by launching extra subscribers. We also need to tell the publishing process how many subscribers to expect so that it knows how many echoed samples it should expect:

  %MNB_ROOT%\bin\OpenDdsRawSubscriber \
      -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \
      -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \
      -DCPSInfoRepo file://repo.ior

  %MNB_ROOT%\bin\OpenDdsRawSubscriber \
      -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \
      -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \
      -DCPSInfoRepo file://repo.ior

  %MNB_ROOT%\bin\OpenDdsRawSubscriber \
      -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \
      -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \
      -DCPSInfoRepo file://repo.ior


  %MNB_ROOT%\bin\OpenDdsRawPublisher \
      -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \
      -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \
      -DCPSInfoRepo file://repo.ior -num 1000 \
      -ns 3

Note that the OpenDDS test also has a Perl script that can launch the DCPSInfoRepo, the publishing process and subscribing process for us:

  perl %MNB_ROOT%\bin\opendds_test.pl OpenDdsRaw -num 1000

That Perl script can also launch more than one subscribing process, enabling performance testing of 1-to-N cases:

  perl %MNB_ROOT%\bin\opendds_test.pl OpenDdsRaw -num 1000 -num-subs 3

We can also run the OpenDDS example with a multicast transport by using the script's -tr mcast command-line arguments. An OpenDDS application's transport can be configured via a file.

  perl %MNB_ROOT%\bin\opendds_test.pl OpenDdsRaw -num 1000 -tr mcast

ZeroMQ Raw Buffer Test

ZeroMQ (www.zeromq.org) is designed as a rather thin message queueing layer over sockets, so sending raw buffers is what it does best. Our ZeroMQ test measures latency the same way as the OpenDDS test does, by publishing a buffer, waiting for an echo, and then repeating that process.

ZeroMQ does not use a daemon process to associate publications and subscriptions. In fact, it doesn't use topics for for association purposes at all. In ZeroMQ applications, we manually pass the endpoint of the subscription into the publication's process as we would in a socket application.

The ZeroMQ raw buffer test code is in zeromq/cpp/ZeromqPublisher.cpp and zeromq/cpp/ZeromqSubscriber.cpp.

The core of the publishing process code is shown below. We'll use ZeroMQ's ZMQ_PUB and ZMQ_SUB capabilities to come as close as we can to a publish-subscribe API. You can see from the first for loop below that ZeroMQ does allow us to attach many transports to the same ZeroMQ socket object, so we only have to publish the buffer once to send it to all connected subscribers.

int main (int argc, char *argv [])
{
  try {
    if (argc < 5) {
      std::cout << "usage: ZeromqPublisher <message-size> <roundtrip-count>  <bind-to-sub> [<connect-to-pub>]+\n"
                << "e.g.:  ZeromqPublisher 1000 10000 tcp://eth0:54321 tcp://spider:54322 tcp://spider:54323\n"
                << "          use a literal IP address and port for each endpoint\n"
                << std::endl;
    }
    size_t message_size = atoi (argv [1]);
    size_t roundtrip_count = atoi (argv [2]);
    const char* bind_to_sub = argv[3];

Here, we create our ZeroMQ sockets. We specify the outgoing socket as a publishing socket and the incoming socket as a subscribing socket.

    zmq::context_t ctx (1, 1);
    zmq::socket_t pub(ctx,ZMQ_PUB);
    zmq::socket_t sub(ctx,ZMQ_SUB);
    sub.setsockopt(ZMQ_SUBSCRIBE,"DataFeed\x00",9);
    sub.bind(bind_to_sub);

We have to provide an endpoint for each subscribing process on the command-line. Here, we iterate through those endpoints and connect to each.

    size_t num_subscribers = 0;
    for (int argi = 4; argi < argc; ++argi) {
      pub.connect(argv[argi]);
      ++num_subscribers;
    }

Note that we prepend a "topic" name, "DataFeed", onto each message buffer. ZeroMQ uses the topic names like message filters, enabling a subscriber to instruct ZeroMQ to filter out messages with "topic" names it does not wish to receive.

    zmq::message_t msg(message_size);
    memset(msg.data(), 0, message_size);
    memcpy(msg.data(), "DataFeed\x00", 9);

    printf("Entering send loop -- %d messages, size = %d\n", 
           roundtrip_count,
           message_size);

    void* watch = zmq_stopwatch_start ();

We loop, publishing a message and waiting for its echo. Note that we only call send once to send the message to all connected subscribers.

    for (size_t i = 0; i != roundtrip_count; i++) {
      pub.send(msg);  
      for (size_t jj = 0; jj < num_subscribers; ++jj) {
        // Wait for echoed message from each subscriber
        sub.recv(&msg);
        if (msg.size() != message_size) {
          std::cout << "Message of incorrect size received: " << msg.size() 
                    << std::endl;
          return -1;
        }
      }
    }

    unsigned long elapsed = zmq_stopwatch_stop (watch);
    double latency = (double) elapsed / (roundtrip_count * 2.0) 
                     / (double)num_subscribers;

    printf ("message size: %d [B]\n", (int) message_size);
    printf ("roundtrip count: %d\n", (int) roundtrip_count);
    printf ("\n\naverage latency: %.3f [us]\n\n\n", (double) latency);

    return 0;
  } catch (std::exception &e) {
    std::cout << "An error occurred: " << e.what() << std::endl;
    return 1;
  }
}

As we mentioned, the line memcpy(msg.data(), "DataFeed\x00", 9); is the "topic" that we're publishing upon. ZeroMQ simulates topics with a message filtering mechanism; the subscribing process indicates that it only wants messages prepended with the filter string "DataFeed". It is the responsibility of the publisher to encode the filter string on the front of the message buffer, and it is the responsibility of the subscriber to skip over the filter string on its received buffer.

The core of the subscribing process code is shown below, with the echo of the buffer back to the publisher for round-trip latency measurement.

#include "zmq.hpp"
#include <iostream>

int main (int argc, char *argv [])
{
  try {
    if (argc != 5) {
      std::cout << "usage: ZeromqSubscriber <message-size> <roundtrip-count> <bind-to-sub> <connect-to-pub>\n"
                << "e.g.:  ZeromqSubscriber 1000 10000 tcp://eth0:54322 tcp://spider:54321\n"
                << "          on Windows, use a literal IP address and port for each endpoint\n"
                << std::endl;
        return 1;
    }

    size_t message_size = atoi (argv [1]);
    size_t roundtrip_count = atoi (argv [2]);
    const char* bind_to_sub = argv [3];
    const char* connect_to_pub = argv [4];

    zmq::context_t ctx (1, 1);
    zmq::socket_t pub (ctx, ZMQ_PUB);
    pub.connect(connect_to_pub);

Note the subscription to the "DataFeed" topic, which tells ZeroMQ to only pass through message buffers that start with the string "DataFeed". It is the subscribing processes' responsibility to skip over this topic string to extract any message content, as we'll see in a later example.

    zmq::socket_t sub (ctx, ZMQ_SUB);
    sub.setsockopt(ZMQ_SUBSCRIBE,"DataFeed\x00",9);
    sub.bind(bind_to_sub);

    printf("Entering recv loop -- %d messages, size = %d\n", 
           roundtrip_count,
           message_size);

    for (size_t i = 0; i != roundtrip_count; i++) {
      zmq::message_t msg;
      sub.recv (&msg);

      // Echo it back
      pub.send(msg,0);
    }
    printf("Finished receiving messages\n");
    
    return 0;

  } catch (std::exception &e) {
    std::cout << "An error occurred: " << e.what() << std::endl;
    return 1;
  }
}

To run this test case, we start a ZeroMQ subscribing process and a ZeroMQ publishing process, as follows:

  %MNB_ROOT%\bin\ZeromqSubscriber  1000 1000 \
      tcp://10.201.200.72:54322 \
      tcp://10.201.200.72:54321

  %MNB_ROOT%\bin\ZeromqPublisher  1000 1000 \
      tcp://10.201.200.72:54321 \
      tcp://10.201.200.72:54322

In the publisher's window, we should see output that looks something like this, measuring the average one-way latency of each of the 1000 messages:

  Entering send loop -- 1000 messages, size = 1000
  message size: 1000 [B]
  roundtrip count: 1000


  average latency: 170.336 [us]


The average latency of about 170 microseconds compares to an average latency with the OpenDDS raw buffer test of 185 microseconds. However, you really need to run each test several times to get a clear picture of the relative performance. It's also a good idea to shut down as many processes as possible on the test computer.

One thing that you can see while running the ZeroMQ test is that we need to supply ZeroMQ with endpoints (an IP address and a port) for its sockets. OpenDDS handles that internally, and only needs a topic name to associate a publication and a subscription. The difference becomes more apparent when we run several ZeroMQ subscribers:

  %MNB_ROOT%\bin\ZeromqSubscriber  1000 1000 \
      tcp://10.201.200.72:54322 \
      tcp://10.201.200.72:54321

  %MNB_ROOT%\bin\ZeromqSubscriber  1000 1000 \
      tcp://10.201.200.72:54323 \
      tcp://10.201.200.72:54321

  %MNB_ROOT%\bin\ZeromqSubscriber  1000 1000 \
      tcp://10.201.200.72:54324 \
      tcp://10.201.200.72:54321


  %MNB_ROOT%\bin\ZeromqPublisher  1000 1000 \
      tcp://10.201.200.72:54321 \
      tcp://10.201.200.72:54322 \
      tcp://10.201.200.72:54323 \
      tcp://10.201.200.72:54324

Notice how the publishing process needs each subscribing process's endpoint to associate correctly with each subscriber. Contrast that with the OpenDDS test, where we merely run as many subscribing processes as we want and let OpenDDS make the associations for us. So we're seeing some ease-of-use differences between the two middleware products, and that's without even touching on OpenDDS's extensive Quality-of-Service.

Again, each test also has a Perl script that can launch the publishing process and subscribing process for us:

  perl %MNB_ROOT%\bin\zeromq_test.pl Zeromq 1000 1000

That Perl script can also launch more than one subscribing process, enabling performance testing of 1-to-N cases:

  perl %MNB_ROOT%\bin\zeromq_test.pl Zeromq 1000 1000 -num-subs 3

Boost.Asio Raw Buffer Test

Boost.Asio (www.boost.org) is C++ network programming framework providing a thin, object-oriented layer over sockets. Boost.Asio's layer is — or at least should be — thinner than ZeroMQ's. The purpose of testing Boost.Asio is to give us a baseline and get an idea of how much processing ZeroMQ and OpenDDS add to basic socket communication.

Like ZeroMQ, our Boost.Asio example does not use a daemon process to associate publications and subscriptions. Again, we manually pass the endpoint of the subscription into the publication's process, as we would in any other socket application.

The Boost.Asio raw buffer test code is in boostasio/cpp/BoostPublisher.cpp and boostasio/cpp/BoostSubscriber.cpp.

The Boost.Asio publishing process code is shown below. Note that we have to manage the one-to-many relationship from the publisher to the subscribers ourself. Also note that this code is limited to TCP; we can change the transport for both the OpenDDS and ZeroMQ examples by passing in different command-line options.

#include <cstdlib>
#include <cstring>
#include <iostream>
#include <boost/asio.hpp>

#include "Profiler.h"

using boost::asio::ip::tcp;

int main(int argc, char* argv[])
{
  try
  {
    if (argc < 5)
    {
      std::cerr << "Usage: BoostPublisher <message-size> <num-messages> [<host> <port>]+\n";
      return 1;
    }

    size_t message_size = std::atoi(argv[1]);
    size_t num_messages = std::atoi(argv[2]);
    const size_t used_args = 3;

    boost::asio::io_service io_service;

    size_t num_subscribers = (argc - used_args) / 2;    
    std::vector<boost::shared_ptr<tcp::socket> > subscribers;
    subscribers.reserve(num_subscribers);    

We resolve each subscriber's endpoint.

    for (size_t i=0; i < num_subscribers; ++i)
    {
      tcp::resolver resolver(io_service);
      tcp::resolver::query query(tcp::v4(), argv[i*2+used_args], argv[i*2+used_args+1]);
      tcp::resolver::iterator iterator = resolver.resolve(query);

      boost::shared_ptr<tcp::socket> s(new tcp::socket(io_service));      
      subscribers.push_back(s);
      s->connect(*iterator);
    }

    printf("Sending %d messages of size %d to %d subscribers\n",
           num_messages,
           message_size,
           num_subscribers);

    boost::shared_ptr<char> request(new char[message_size]);
    boost::shared_ptr<char> reply(new char[message_size]);

    MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE start = 
      MIDDLEWARENEWSBRIEF_PROFILER_GET_TIME;

    for (size_t i=0; i < num_messages; ++i) {

      memset(request.get(), 0, message_size);
      memset(reply.get(), 0, message_size);

      // Two loops here simulates pub/sub behavior, 
      // where we publish to all subscribers before
      // looking for an echoed sample coming back

We must manually publish to each attached subscriber endpoint.

      for (size_t jj = 0; jj < num_subscribers; ++jj) {
        boost::asio::write(*(subscribers[jj]), boost::asio::buffer(request.get(), message_size));
      }   

      for (size_t jj = 0; jj < num_subscribers; ++jj) {
        size_t reply_length = boost::asio::read(*(subscribers[jj]),
            boost::asio::buffer(reply.get(), message_size));
        if (reply_length != message_size)
        {
          std::cerr << "Message reply size mismatch; expected " 
                    << message_size << ", received " << reply_length << std::endl;
          return -1;
        }
      }
    }

    MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE finish = 
      MIDDLEWARENEWSBRIEF_PROFILER_GET_TIME;

    MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE elapsed =
      MIDDLEWARENEWSBRIEF_PROFILER_DIFF(finish,start);

    double latency = (double) elapsed / (num_messages * 2.0) / (double)(num_subscribers);
    printf("\n\nAverage latency in %s: %.3f\n\n\n", 
           MIDDLEWARENEWSBRIEF_PROFILER_TIME_UNITS,
           latency);
    printf("Finished\n");
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

The subscribing process code is shown below, with the echo of the buffer back to the publisher for round-trip latency measurement. The bulk of the subscriber logic takes place in a Server class, which is not shown but is available in the attached source code in boostasio/cpp/Server[.h|.cpp]. The Server class is a class that we've written to manage the socket sessions between the publisher and the subscribers.

#include "Server.h"
#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 4)
    {
      std::cerr << "Usage: BoostSubscriber <message-size> <num-messages> <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;

    size_t message_size = std::atoi(argv[1]);
    size_t num_messages = std::atoi(argv[2]);

    Server s(io_service, std::atoi(argv[3]), num_messages);

    printf("Waiting; running for %d messages\n", num_messages);

    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

To run the test, run a publishing process and a subscribing process:

  %MNB_ROOT%\bin\BoostSubscriber  1000 1000 54421

  %MNB_ROOT%\bin\BoostPublisher  1000 1000 10.201.200.72 54421

In the publisher's window, we should see output that looks something like this, measuring the average one-way latency of each of the 1000 messages:

  Sending 1000 messages of size 1000 to 1 subscribers


  Average latency in milliseconds: 0.075


  Finished

The average latency of about 75 microseconds compares to an average latency with the OpenDDS raw buffer test of 185 microseconds and the ZeroMQ raw buffer test of about 170 microseconds. Again, though, you really need to run each test several times to get a clear picture of the relative performance.

As with the ZeroMQ test, we need to supply the Boost.Asio test with endpoints for its sockets. OpenDDS, of course, handles that internally, and only needs a topic name to associate a publication and a subscription. As before, the difference is more obvious with several subscribers:

  %MNB_ROOT%\bin\BoostSubscriber  1000 1000 54421

  %MNB_ROOT%\bin\BoostSubscriber  1000 1000 54422

  %MNB_ROOT%\bin\BoostSubscriber  1000 1000 54423


  %MNB_ROOT%\bin\BoostPublisher  1000 1000 10.201.200.72 54421 \
                                           10.201.200.72 54422 \
                                           10.201.200.72 54423

Notice how the Boost.Asio publishing process also needs each subscribing process's endpoint to associate correctly with each subscriber.

As usual, each test also has a Perl script that can launch the publishing process and subscribing process for us:

  perl %MNB_ROOT%\bin\boost_test.pl Boost 1000 1000

That Perl script can also launch more than one subscribing process, enabling performance testing of 1-to-N cases:

  perl %MNB_ROOT%\bin\boost_test.pl Boost 1000 1000 -num-subs 3

.NET Streaming Tests

For our next round of tests, we use C# to stream a couple of .NET objects into an raw buffer and send them across the wire. These tests were driven by a customer who was exploring the use of ZeroMQ to stream .NET objects from a publisher to a subscriber. We wanted to determine if we could do the same thing with OpenDDS and compare performance and usability. As a baseline, we examined streaming .NET objects across Boost.Asio as well.

We will stream instances of the following .NET types across the wire in our tests. These files are in the common/csharp directory.

using System:

namespace MiddlewareNewsBrief
{
   [Serializable]
   public class MarketDataEntry
   {
      public uint mdUpdateAction = 0;
      public uint mdPriceLevel = 0;
      public String mdEntryType = "";
      public uint openCloseSettleFlag = 0;
      public uint securityIDSource = 0;
      public uint securityID = 0;
      public uint rptSeq = 0;
      public double mdEntryPx = 0.0;
      public uint mdEntryTime = 0;
      public uint mdEntrySize = 0;
      public uint numberOfOrders = 0;
      public String tradingSessionID = "";
      public double netChgPrevDay = 0.0;
      public uint tradeVolume = 0;
      public String tradeCondition = "";
      public String tickDirection = "";
      public String quoteCondition = "";
      public uint aggressorSide = 0;
      public String matchEventIndicator = "";

      public static MarketDataEntry createTestData()
      {
         // implementation omitted for brevity
      }
   }

   [Serializable]
   public class MarketData
   {
      public uint securityID = 0;
      public String applVersionID = "";
      public String messageType = "";
      public String senderCompID = "";
      public uint msgSeqNum = 0;
      public uint sendingTime = 0;
      public uint tradeDate = 0;

      public bool isEcho = false;
      public uint counter = 0;

      public MarketDataEntry[] mdEntries = new MarketDataEntry[0];

      public static MarketData createTestData()
      {
         // implementation omitted for brevity
      }

   }

   [Serializable]
   public class RelatedSym
   {
      public String symbol = "";
      public ulong orderQuantity = 0;
      public uint side = 0;
      public ulong transactTime = 0;
      public uint quoteType = 0;
      public uint securityID = 0;
      public uint securityIDSource = 0;

      public static RelatedSym createTestData()
      {
         // implementation omitted for brevity
      }
   }

   [Serializable]
   public class QuoteRequest
   {
      public uint securityID = 0;
      public String applVersionID = "";
      public String messageType = "";
      public String senderCompID = "";
      public uint msgSeqNum = 0;
      public uint sendingTime = 0;
      public String quoteReqID = "";

      public bool isEcho = false;
      public uint counter = 0;

      public RelatedSym[] related = new RelatedSym[0];

      public static QuoteRequest createTestData()
      {
         // implementation omitted for brevity
      }
   }
}

ZeroMQ .NET Tests

ZeroMQ has a set of .NET bindings, so streaming .NET objects is fairly straightforward. We do have one complicating factor, though. We publish our data on two topics, a "MarketData" topic and a "QuoteRequest" topic. In ZeroMQ, we indicate these by prepending message filter strings to the front of each buffer. So we have to be careful to stream the .NET objects into the buffer at a location after each message filter string.

The publishing process creates its .NET objects, and sets an "echo" flag to false and a counter on each object. It streams the objects into a buffer and sends them to the subscriber. On the subscriber side, the subscriber deserializes the objects, checks the counter and the "echo" flag, changes the "echo" flag to true, reserializes the objects, and echos them back. The publisher takes the message reply, deserializes the objects again, and checks the echo flag and the counter. These steps ensure that the serialization was successful, i.e. that we can stream and resurrect the .NET objects successfully after sending them through ZeroMQ.

The ZeroMQ publisher code, located in zeromq/csharp/ZmqTypedPublisher.cs, is shown below:

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using MiddlewareNewsBrief;

class ZmqTypedPublisher
{
   static unsafe int Main(string[] args)
   {
      const String QUOTE_TOPIC = "QuoteRequest";
      const String MARKET_DATA_TOPIC = "MarketData";

      if (args.Length < 3)
      {
         Console.Out.WriteLine("usage: ZmqTypedPublisher <roundtrip-count> " +
              "<bind-to-sub> [<connect-to-pub>]+\n");
         Console.Out.WriteLine(" e.g.: ZmqTypedPublisher 10000 tcp://10.201.200.72:54321 tcp://10.201.200.72:54322\n");
         return 1;
      }

      int roundtripCount = Convert.ToInt32(args[0]);
      String bind_to_sub = args[1];

We publish "MarketData" and "QuoteRequest" messages; the subscriber also uses the topic filter string to determine the message type.

      //  Initialise 0MQ infrastructure
      ZMQ.Context ctx = new ZMQ.Context(1, 1, 0);
      ZMQ.Socket pub = ctx.Socket(ZMQ.PUB);
      ZMQ.Socket sub = ctx.Socket(ZMQ.SUB);
      sub.SetSockOpt(ZMQ.SUBSCRIBE, MARKET_DATA_TOPIC);
      sub.SetSockOpt(ZMQ.SUBSCRIBE, QUOTE_TOPIC);

      Console.Out.WriteLine("Binding to " + bind_to_sub);
      sub.Bind(bind_to_sub);

We connect to each subscriber endpoint.

      int num_subscribers = 0;
      for (int i = 2; i < args.Length; ++i)
      {
         Console.Out.WriteLine("Connecting to " + args[i]);
         pub.Connect(args[i]);
         ++num_subscribers;
      }

      // Create two messages to send, and stream each message
      // to a byte array
      IFormatter formatter = new BinaryFormatter();
      ASCIIEncoding encoding = new System.Text.ASCIIEncoding();

      MarketData md = MarketData.createTestData();
      QuoteRequest qr = QuoteRequest.createTestData();

      Console.Out.WriteLine("Sending messages -- " + num_subscribers + " subscribers, "
         + roundtripCount + " messages");

      //  Start measuring the time.
      System.Diagnostics.Stopwatch watch;
      watch = new Stopwatch();
      watch.Start();

      //  Start sending messages.
      for (uint i = 0; i < roundtripCount; i++)
      {

Every tenth message is a QuoteRequest; the others are MarketData messages.

         // Send 90% MarketData messages
         if (i % 10 == 5)
         {
            qr.isEcho = false;
            qr.counter = i;
            byte[] quoteMsg = serialize(qr, QUOTE_TOPIC, formatter, encoding);
            pub.Send(quoteMsg);
         }
         else
         {
            md.isEcho = false;
            md.counter = i;
            byte[] mdMsg = serialize(md, MARKET_DATA_TOPIC, formatter, encoding);
            pub.Send(mdMsg);
         }

We wait for an echoed message from each subscriber.

         byte[] echoMsg;
         for (int jj = 0; jj < num_subscribers; ++jj)
         {
            sub.Recv(out echoMsg);

            // Get the "Topic" from the front of the byte array
            int topicEndIndex = Array.IndexOf(echoMsg, (byte)'\x00');
            String topic = new String(encoding.GetChars(echoMsg, 0, topicEndIndex));

            // Deserialize the echo, which should be the same message
            object echo = deserialize(echoMsg, topic, formatter);

            if (topic.Equals(MARKET_DATA_TOPIC))
            {
               MarketData mdEcho = (MarketData)echo;
               Debug.Assert(mdEcho.isEcho == true, "Subscriber forgot to set isEcho flag to true");
               Debug.Assert(mdEcho.counter == i, "Counter mismatch in subscriber's reply");
            }
            else if (topic.Equals(QUOTE_TOPIC))
            {
               QuoteRequest qrEcho = (QuoteRequest)echo;
               Debug.Assert(qrEcho.isEcho == true, "Subscriber forgot to set isEcho flag to true");
               Debug.Assert(qrEcho.counter == i, "Counter mismatch in subscriber's reply");
            }
            else
            {
               Console.Out.WriteLine("ERROR: received topic " + topic);
               return -1;
            }
         }
      }

      //  Stop measuring the time.
      watch.Stop();
      Int64 elapsedTime = watch.ElapsedTicks;

      //  Print out the test parameters.
      Console.Out.WriteLine("roundtrip count: " + roundtripCount);

      //  Compute and print out the latency.
      double latency = (double)(elapsedTime) / roundtripCount / 2 *
          1000000 / Stopwatch.Frequency / (double)num_subscribers;
      Console.Out.WriteLine("\n\nYour average latency is {0} [us]\n\n",
          latency.ToString("f2"));

      return 0;
    } 

The serialization functions are below. Note that we first write the topic name, with a null terminator, to the stream. Then we write the streamed object itself. The .NET stream keeps track of its cursor internally, which helps significantly.

    static byte[] serialize(object obj, String topic, IFormatter formatter, ASCIIEncoding encoding)
    {
       MemoryStream stream = new MemoryStream();

       // "topic" for ZeroMQ
       stream.Write(encoding.GetBytes(topic + '\x00'),
                    0,
                    topic.Length + 1);

       formatter.Serialize(stream, obj);
       stream.Close();
       return stream.ToArray();
    }

To deserialize, note that we must seek past the topic name. We know what the echoed topic name should be because it should match what we published. On the subscriber side, it is a bit more complicated to extract the topic name from the message before deserializing the objects.

    static object deserialize(byte[] msg, String topic, IFormatter formatter)
    {
       MemoryStream stream = new MemoryStream(msg);

       // Seek past "topic" for ZeroMQ
       stream.Seek(topic.Length + 1, SeekOrigin.Begin);
       object obj = formatter.Deserialize(stream);
       stream.Close();
       return obj;
    }
}

The subscribing process subscribes to the "MarketData" and "QuoteRequest" filters and loops to receive messages. For each message, it checks the message's filter string value, skips over the filter string, deserializes the rest of the message into the appropriate .NET instance, and checks the "echo" flag and the counter. It then sets the "echo" flag to true and reserializes the .NET instances to echo them back across the wire to the publisher.

The subscriber's code, located in zeromq/csharp/ZmqTypedSubscriber.cs, is shown below:

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using MiddlewareNewsBrief;

class ZmqTypedSubscriber
{
    static unsafe int Main(string[] args)
    {
        const String QUOTE_TOPIC = "QuoteRequest";
        const String MARKET_DATA_TOPIC = "MarketData";

        if (args.Length != 3)
        {
           Console.Out.WriteLine("usage: ZmqTypedSubscriber <roundtrip-count> " +
                "<bind-to-sub> <connect-to-pub>\n");
           Console.Out.WriteLine(" e.g.: ZmqTypedSubscriber 10000 " +
                "tcp://10.201.200.72:54322 tcp://10.201.200.72:54321\n");
           return 1;
        }

        int roundtripCount = Convert.ToInt32(args[0]);
        String bind_to_sub = args[1];
        String connect_to_pub = args[2];

        //  Initialise 0MQ infrastructure
        ZMQ.Context ctx = new ZMQ.Context(1, 1, 0);
        
        ZMQ.Socket pub = ctx.Socket(ZMQ.PUB);
        pub.Connect(connect_to_pub);

The subscriber subscribes to "MarketData" and "QuoteRequest".

        ZMQ.Socket sub = ctx.Socket(ZMQ.SUB);
        sub.SetSockOpt(ZMQ.SUBSCRIBE, QUOTE_TOPIC);
        sub.SetSockOpt(ZMQ.SUBSCRIBE, MARKET_DATA_TOPIC);
        sub.Bind(bind_to_sub);

        ASCIIEncoding encoding = new System.Text.ASCIIEncoding();
        IFormatter formatter = new BinaryFormatter();

        Console.Out.WriteLine("Entering recv loop -- " 
           + roundtripCount + " messages");
        for (int i = 0; i < roundtripCount; i++)
        {
            byte[] msg;
            sub.Recv(out msg);

Here, we detect the full topic name by assuming that it is null-terminated. We save the topic name and seek past it to deserialize the QuoteRequest or MarketData object, whichever it might be. The topic name tells us the data type of the serialized object.

            // Get the "Topic" from the front of the byte array
            int topicEndIndex = Array.IndexOf(msg, (byte)'\x00');
            String topic = new String(encoding.GetChars(msg, 0, topicEndIndex));

            MemoryStream inStream = new MemoryStream(msg);
            inStream.Seek(topic.Length + 1, SeekOrigin.Begin);

            object obj = formatter.Deserialize(inStream);
            inStream.Close();

            if (topic.Equals(MARKET_DATA_TOPIC))
            {
               MarketData md = (MarketData)obj;
               if (md.isEcho == true) { Console.Out.WriteLine("Subscriber received echo sample"); }
               if (md.counter != i) { Console.Out.WriteLine("Counter mismatch"); }
               md.isEcho = true;
            }
            else if (topic.Equals(QUOTE_TOPIC))
            {
               QuoteRequest qr = (QuoteRequest)obj;
               if (qr.isEcho == true) { Console.Out.WriteLine("Subscriber received echo sample"); }
               if (qr.counter != i) { Console.Out.WriteLine("Counter mismatch"); }
               qr.isEcho = true;
            }
            else
            {
               Console.Out.WriteLine("ERROR: received topic " + topic);
               return -1;
            }

            MemoryStream outStream = new MemoryStream();

To echo the object back, we first stream the topic name, then the object.

            // "topic" for ZeroMQ
            outStream.Write(new System.Text.ASCIIEncoding().GetBytes(topic + '\x00'),
                            0,
                            topic.Length + 1);

            formatter.Serialize(outStream, obj);
            outStream.Close();
            msg = outStream.ToArray();

            // Echo it back
            pub.Send(msg);   
        }
        Console.Out.WriteLine("Finished receiving messages");

        return 0;
    }
}

We'll make our lives easier by running the test with the provided Perl script, as follows:

  perl %MNB_ROOT%\bin\zeromq_test.pl ZeromqTypedDotnet 1000 

We should see output that looks like this:

  Entering recv loop -- 1000 messages
  Binding to tcp://10.201.200.72:54321
  Connecting to tcp://10.201.200.72:54322
  Sending messages -- 1 subscribers, 1000 messages
  Finished receiving messages
  roundtrip count: 1000


  Your average latency is 537.33 [us]


So our average latency for 1000 messages is about 537 microseconds. You'll want to run the test several times, as the latency numbers can vary based on other processes currently executing.

As with all of our Perl scripts, we can tell the script to launch more than one subscriber:

  perl %MNB_ROOT%\bin\zeromq_test.pl ZeromqTypedDotnet 1000 -num-subs 3

OpenDDS .NET Tests

Since neither OpenDDS nor Boost.Asio supplies .NET bindings, we'll have to roll our own. We'll use a small C++/CLI layer as an intermediary between the C# code and our C++ middleware. In the end, we'll combine C#, C++/CLI, and standard C++ in the same executable. For more information on combining .NET and OpenDDS, please see Charles Calkins' fine series of Middleware News Brief articles, especially this one.

The code for the C++/CLI layer between .NET and OpenDDS is in the opendds/managed_cpp directory. Since this article is not trying to describe how to integrate .NET and OpenDDS, I'll simply show the header file for the interface between the two, plus the write method. For more information on integrating .NET and OpenDDS, please see either the attached code or this article.

Our C++/CLI interface between C# and OpenDDS, located in opendds/managed_cpp/OpenDdsPubSubProxy.h and opendds/managed_cpp/OpenDdsPubSubProxy.cpp, is as follows:

// .NET
using namespace System;
#include <vcclr.h>

namespace MiddlewareNewsBrief
{
class OpenDdsPubSubUtil;

public enum class ProcessType {
  PUBLISHER_PROCESS,
  SUBSCRIBER_PROCESS
};

// wrap OpenDDS interaction with a .NET class
public ref class OpenDdsPubSubProxy
{

We delegate to an unmanaged C++ class (not shown, but available in the files opendds/managed_cpp/OpenDdsPubSubUtil.h and opendds/managed_cpp/OpenDdsPubSubUtil.cpp) that forms the bridge into OpenDDS.

  // can't put an unmanaged thing in a ref class, but a 
  // pointer to an unmanaged thing is okay
  
  OpenDdsPubSubUtil* impl_;
public:
  OpenDdsPubSubProxy(ProcessType ptype);
  ~OpenDdsPubSubProxy();

  // Returns an integer handle for writing to a particular topic
  int createDataWriter(String^ topic_name);

Note the use of the pin_ptr in the write method. We're passing a C++/CLI buffer into unmanaged C++, and we need to pin the buffer to its current memory location so the C++/CLI garbage collector doesn't try to move it around.

  void write(int data_writer_handle, array<unsigned char> ^managed_buffer)
  {
    pin_ptr<unsigned char> pinned_managed_buffer = &managed_buffer[0];  

    MiddlewareNewsBrief::BufferType destination;
    destination.replace(managed_buffer->Length,
                        managed_buffer->Length,
                        &pinned_managed_buffer[0],
                        false);

    impl_->write(data_writer_handle, destination);
  }

  void writeDone();
  bool isDone();
  void fini();

This event handler is called when an OpenDDS sample is received. The OpenDDS on_data_available callback is mapped to this C# EventHandler.

  delegate void EventHandler(Object^ sender, 
                             int data_writer_handle, 
                             array<unsigned char> ^managed_buffer);
  EventHandler^ handler_;
  event EventHandler^ ProcessNotification
  {
    void add(EventHandler^ p) { handler_ +=p; }
    void remove(EventHandler^ p) { handler_ -=p; }
    void raise(Object^ obj, int data_writer_handle, array<unsigned char> ^managed_buffer) {
      if (handler_!=nullptr) { 
        handler_(obj, data_writer_handle, managed_buffer);
      }
    }
  };
};

} // namespace

This simple C++/CLI API exposes the ability to create a DataWriter on a topic, to write data samples, and to attach an event handler to receive callbacks when echoed data is received.

The publishing code is in opendds/csharp/TypedDotnetPublisher.cs, and is shown below:

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using System.Threading;
using MiddlewareNewsBrief;

public class TypedDotnetPublisher
{
   OpenDdsPubSubProxy proxy_;
   uint echo_count_ = 0;
   uint message_count_ = 0;

   int marketDataWriterHandle_ = -1;
   int quoteRequestDataWriterHandle_ = -1;
   IFormatter formatter_ = new BinaryFormatter();

   static unsafe int Main(string[] args)
   {
      TypedDotnetPublisher me = new TypedDotnetPublisher();
      me.Run(args);
      return 0;
   }

   public TypedDotnetPublisher()
   {
   }

   int Run(string[] args)
   {
      const String QUOTE_TOPIC = "QuoteRequest";
      const String MARKET_DATA_TOPIC = "MarketData";

      int roundtripCount = 10000;
      int num_subscribers = 1;

      for (int i = 0; i < args.Length; ++i)
      {
         if (args[i].Equals("-num"))
         {
            roundtripCount = Convert.ToInt32(args[i + 1]);
         } 
         else if (args[i].Equals("-ns"))
         {
            num_subscribers = Convert.ToInt32(args[i + 1]);
         }
      }

      Console.WriteLine(num_subscribers + " Subscribers, " + roundtripCount + " messages");

We create our DataWriters and attach callbacks through the C++/CLI OpenDdsPubSubProxy.

      this.proxy_ = new OpenDdsPubSubProxy(MiddlewareNewsBrief.ProcessType.PUBLISHER_PROCESS);
      this.proxy_.ProcessNotification += new OpenDdsPubSubProxy.EventHandler(OnEchoReceived);

      this.marketDataWriterHandle_ = this.proxy_.createDataWriter(MARKET_DATA_TOPIC);
      this.quoteRequestDataWriterHandle_ = this.proxy_.createDataWriter(QUOTE_TOPIC);

      System.Threading.Thread.Sleep(1000);
      
      Console.Out.WriteLine("Sending messages -- " + num_subscribers + " subscribers, "
         + roundtripCount + " messages");

      // Create two messages to send
      MarketData md = MarketData.createTestData();
      QuoteRequest qr = QuoteRequest.createTestData();

      //  Start measuring the time.
      System.Diagnostics.Stopwatch watch;
      watch = new Stopwatch();
      watch.Start();

      //  Start sending messages.
      for (uint i = 0; i < roundtripCount; i++)
      {
         this.message_count_ = i;
         

We take either a MarketData or a QuoteRequest, serialize it into the buffer, and write the buffer across the wire.

         // Send 90% MarketData messages
         if (i % 10 == 5)
         {
            qr.isEcho = false;
            qr.counter = i;
            byte[] quoteMsg = serialize(qr, formatter_);

            proxy_.write(this.quoteRequestDataWriterHandle_, quoteMsg);
         }
         else
         {
            md.isEcho = false;
            md.counter = i;
            byte[] mdMsg = serialize(md, formatter_);

            proxy_.write(this.marketDataWriterHandle_, mdMsg);
         }

Here, we wait for an echoed sample from each subscriber.

         // Wait for echo from each subscriber
         for (int jj = 0; jj < num_subscribers; ++jj)
         {
            this.TakeEcho();
         }
      }

      //  Stop measuring the time.
      watch.Stop();
      Int64 elapsedTime = watch.ElapsedTicks;

      this.proxy_.writeDone();

      //  Print out the test parameters.
      Console.Out.WriteLine("roundtrip count: " + roundtripCount);

      //  Compute and print out the latency.
      double latency = (double)(elapsedTime) / roundtripCount / 2 *
          1000000 / Stopwatch.Frequency / (double)num_subscribers;
      Console.Out.WriteLine("\n\nYour average latency is {0} [us]\n\n",
          latency.ToString("f2"));

      this.proxy_.fini();
      return 0;
   }

This is the callback triggered when the subscriber echoes back the sample; we know the type of the object based on DDS DataReader's topic, which we map to a DataWriter integer value in the unshown C++/CLI code. We use that to deserialize and cast the object to the correct type.

   // invoked when a DDS sample arrives
   void OnEchoReceived(object parent, int data_writer_handle, byte[] buffer)
   {
      // deserialize the buffer into a Quote or MarketData object

      MemoryStream stream = new MemoryStream(buffer);
      object obj = this.formatter_.Deserialize(stream);
      stream.Close();

      if (data_writer_handle == this.quoteRequestDataWriterHandle_)
      {
         QuoteRequest qr = (QuoteRequest)obj;
         Debug.Assert(qr.isEcho == true, "Subscriber forgot to set isEcho flag to true");
         Debug.Assert(qr.counter == this.message_count_, "Counter mismatch in subscriber's reply");
      }
      else if (data_writer_handle == this.marketDataWriterHandle_)
      {
         MarketData md = (MarketData)obj;
         Debug.Assert(md.isEcho == true, "Subscriber forgot to set isEcho flag to true");
         Debug.Assert(md.counter == this.message_count_, "Counter mismatch in subscriber's reply");
      }
      else
      {
         Console.WriteLine("OnEchoReceived: no data_writer_handle for " + data_writer_handle);
         return;
      }
    
      lock (this)
      {
         ++this.echo_count_;
         Monitor.PulseAll(this);
      }
    }

We use the echo count as a semaphore to indicate how many echoes we're waiting on.

    void TakeEcho()
    {
      lock (this)
      {
         while (this.echo_count_ == 0) {
            Monitor.Wait(this);
         }
         --this.echo_count_;
      }
    }

Our serialization and deserialization is the same as in the ZeroMQ test, although we don't have to manage a filter string on the front of the buffer like we do in ZeroMQ.

    static byte[] serialize(object obj, IFormatter formatter)
    {
       MemoryStream stream = new MemoryStream();
       formatter.Serialize(stream, obj);
       stream.Close();
       return stream.ToArray();
    }

    static object deserialize(byte[] msg, String topic, IFormatter formatter)
    {
       MemoryStream stream = new MemoryStream(msg);

       // Seek past "topic" for ZeroMQ
       stream.Seek(topic.Length + 1, SeekOrigin.Begin);
       object obj = formatter.Deserialize(stream);
       stream.Close();
       return obj;
    }
}

The subscribing code is in opendds/csharp/TypedDotnetSubscriber.cs, and is shown below:

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using MiddlewareNewsBrief;

class TypedDotnetSubscriber
{
   OpenDdsPubSubProxy proxy_;

   int marketDataWriterHandle_ = -1;
   int quoteRequestDataWriterHandle_ = -1;
   IFormatter formatter_ = new BinaryFormatter();

   int message_count_ = 0;

   static int Main(string[] args)
   {
      TypedDotnetSubscriber me = new TypedDotnetSubscriber();
      me.Run(args);
      return 0;
   }

   public TypedDotnetSubscriber()
   {
   }

   int Run(string[] args)
   {
      const String QUOTE_TOPIC = "QuoteRequest";
      const String MARKET_DATA_TOPIC = "MarketData";

We create data writers and attach a callback. The data writers are used to echo samples back to the publisher.

      // Initialize OpenDDS; publisher transport Id is 2 for subscriber process
      this.proxy_ = new OpenDdsPubSubProxy(MiddlewareNewsBrief.ProcessType.SUBSCRIBER_PROCESS);
      this.proxy_.ProcessNotification += new OpenDdsPubSubProxy.EventHandler(OnWriteReceived);

      this.marketDataWriterHandle_ = this.proxy_.createDataWriter(MARKET_DATA_TOPIC);
      this.quoteRequestDataWriterHandle_ = this.proxy_.createDataWriter(QUOTE_TOPIC);

      Console.Out.WriteLine("Ready to receive...");

      // Receive messages, and echo them back
      while (!this.proxy_.isDone())
      {
         System.Threading.Thread.Sleep(1000);
      }

      Console.Out.WriteLine("Done...");

      this.proxy_.fini();
      return 0;
   }

This callback is triggered for each sample received. We determine the data type of the object by the name of the topic on which the sample was received; our C++ code (not shown) maps that topic name to a data writer handle, which is what we use here. The key piece of information here is that knowledge of the topic indicates the data type.

   // invoked when a DDS sample arrives
   void OnWriteReceived(object parent, int data_writer_handle, byte[] buffer)
   {
      // deserialize the buffer into a Quote or MarketData object

      MemoryStream stream = new MemoryStream(buffer);

      object obj = this.formatter_.Deserialize(stream);
      stream.Close();
      if (data_writer_handle == this.quoteRequestDataWriterHandle_)
      {
         // Verify the correct data type, and set the "echo" flag
         QuoteRequest qr = (QuoteRequest)obj;
         Debug.Assert(qr.isEcho == false, "Subscriber received echo sample");
         Debug.Assert(qr.counter == this.message_count_, "Message count mismatch");
         qr.isEcho = true;
      }
      else if (data_writer_handle == this.marketDataWriterHandle_)
      {
         // Verify the correct data type, and set the "echo" flag
         MarketData md = (MarketData)obj;
         Debug.Assert(md.isEcho == false, "Subscriber received echo sample");
         Debug.Assert(md.counter == this.message_count_, "Message count mismatch");
         md.isEcho = true;
      }
      else
      {
         Console.WriteLine("OnWriteReceived: no data_writer_handle for " + data_writer_handle);
         return;
      }

      ++this.message_count_;

      // Re-serialize the message and echo it back
      
      MemoryStream outStream = new MemoryStream();
      this.formatter_.Serialize(outStream, obj);
      outStream.Close();

      this.proxy_.write(data_writer_handle, outStream.ToArray());
   }
}

We'll make our lives easier by running the test with the provided Perl script, as follows:

  perl %MNB_ROOT%\bin\opendds_test.pl  OpenddsTypedDotnet -num 1000 

We should see output that looks like this:

  Ready to receive...
  1 Subscribers, 1000 messages
  Sending messages -- 1 subscribers, 1000 messages
  roundtrip count: 1000


  Your average latency is 630.37 [us]


  Done...

So our average latency for 1000 messages is about 630 microseconds. You'll want to run the test several times, as the latency numbers can vary based on other processes currently executing. We keep repeating this, but it's an important point.

As with all of our Perl scripts, we can tell the script to launch more than one subscriber:

  perl %MNB_ROOT%\bin\opendds_test.pl  OpenddsTypedDotnet -num 1000 -num-subs 3

Boost.Asio .NET Tests

To stream .NET objects across Boost.Asio, we also need a C++/CLI layer to act as an intermediary between C# and standard C++. The interface for the API between Boost.Asio and C# is separated into a publisher-side and a subscriber-side. The publisher's interface is located in boostasio/managed_cpp/BoostPubUtil[.h|.cpp], and the header file is shown below. It is similar to the OpenDDS C++/CLI interface, as it contains methods to write and attach callbacks for data samples.

class Session;
#include <vector>
#include <cstdlib>
#include <cstring>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;

// .NET
using namespace System;
#include <vcclr.h>

namespace MiddlewareNewsBrief
{
class BoostPubUtil;

// wrap OpenDDS interaction with a .NET class
public ref class BoostPubProxy
{

The BoostPubUtil class (below) is unmanaged C++.

  // can't put an unmanaged thing in a ref class, but a 
  // pointer to an unmanaged thing is okay
  BoostPubUtil* impl_;

public:
  BoostPubProxy();
  ~BoostPubProxy();

  boost::uint32_t get_num_subscribers();
  boost::uint32_t get_num_messages();
  void write(array<unsigned char> ^managed_buffer);

This event handler is similar to the event handler for the OpenDDS C# example.

  delegate void EventHandler(Object^ sender, 
                             array<unsigned char> ^managed_buffer);
  EventHandler^ handler_;
  event EventHandler^ ProcessNotification
  {
    void add(EventHandler^ p) { handler_ +=p; }
    void remove(EventHandler^ p) { handler_ -=p; }
    void raise(Object^ obj, array<unsigned char> ^managed_buffer) {
      if (handler_!=nullptr) { 
        handler_(obj, managed_buffer);
      }
    }
  };
};
  
class BoostPubUtil
{
public:
  BoostPubUtil(int argc,
               char** argv,
               gcroot<BoostPubProxy^> managed_parent);

  size_t get_num_messages() const { return this->num_messages_; }
  size_t get_num_subscribers() const { return this->num_subscribers_; }
  void write(unsigned char* buffer, size_t message_size);
  
private:

This is a backpointer to the C++/CLI BoostPubProxy class; we use this to call through to the event handler when a buffer is received.

  gcroot<BoostPubProxy^> managed_parent_;
  const size_t num_messages_;
  size_t num_subscribers_;

  boost::asio::io_service io_service_;
  std::vector<boost::shared_ptr<tcp::socket> > subscribers_;
};

} // namespace

The C# application code uses our BoostPubProxy class to write a buffer, attach a callback to receive echoed buffers from the subscriber, and find out the number of subscribers attached and the number of messages sent.

The publisher code is located in boostasio/csharp/BoostTypedDotNetPublisher.cs, and is shown below:

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using System.Threading;
using MiddlewareNewsBrief;

public class BoostTypedDotnetPublisher
{
   BoostPubProxy proxy_;
   uint echo_count_ = 0;
   uint message_count_ = 0;

   IFormatter formatter_ = new BinaryFormatter();
   ASCIIEncoding encoding_ = new System.Text.ASCIIEncoding();

   const String QUOTE_TOPIC = "QuoteRequest";
   const String MARKET_DATA_TOPIC = "MarketData";

   static unsafe int Main(string[] args)
   {
      BoostTypedDotnetPublisher me = new BoostTypedDotnetPublisher();
      me.Run(args);
      return 0;
   }

   public BoostTypedDotnetPublisher()
   {
   }

   int Run(string[] args)
   {
      if (args.Length < 3)
      {
         Console.Out.WriteLine("usage: BoostTypedDotnetPublisher <roundtrip-count> " +
              "[<subscriber host> <subscriber port>]+\n");
         Console.Out.WriteLine(" e.g.: BoostTypedDotnetPublisher 10000 localhost 54321 localhost 54322\n");
         return 1;
      }

We create our bridge to C++/CLI, and attach a callback.

      // Initialize OpenDDS; publisher transport Id is 1 for publisher process
      this.proxy_ = new BoostPubProxy();
      this.proxy_.ProcessNotification += new BoostPubProxy.EventHandler(OnEchoReceived);

      // Create two messages to send, and stream each message
      // to a byte array

      MarketData md = MarketData.createTestData();
      QuoteRequest qr = QuoteRequest.createTestData();

      Console.Out.WriteLine("Sending messages -- " + this.proxy_.get_num_subscribers() 
         + " subscribers, " + this.proxy_.get_num_messages() + " messages");

      uint roundtripCount = this.proxy_.get_num_messages();
      uint num_subscribers = this.proxy_.get_num_subscribers();

      //  Start measuring the time.
      System.Diagnostics.Stopwatch watch;
      watch = new Stopwatch();
      watch.Start();

      //  Start sending messages.
      for (uint i = 0; i < roundtripCount; i++)
      {
         this.message_count_ = i;
         

We serialize and write the MarketData or QuoteRequest object.

         // Send 90% MarketData messages
         if (i % 10 == 5)
         {
            qr.isEcho = false;
            qr.counter = i;
            byte[] quoteMsg = serialize(qr, QUOTE_TOPIC, this.formatter_, this.encoding_);
            this.proxy_.write(quoteMsg);
         }
         else
         {
            md.isEcho = false;
            md.counter = i;
            byte[] mdMsg = serialize(md, MARKET_DATA_TOPIC, this.formatter_, this.encoding_);
            this.proxy_.write(mdMsg);
         }

         // Wait for echo from each subscriber
         for (int jj = 0; jj < num_subscribers; ++jj)
         {
            this.TakeEcho();
         }
      }

      //  Stop measuring the time.
      watch.Stop();
      Int64 elapsedTime = watch.ElapsedTicks;

      //  Print out the test parameters.
      Console.Out.WriteLine("roundtrip count: " + roundtripCount);

      //  Compute and print out the latency.
      double latency = (double)(elapsedTime) / roundtripCount / 2 *
          1000000 / Stopwatch.Frequency / (double)(num_subscribers);
      Console.Out.WriteLine("\n\nYour average latency is {0} [us]\n\n",
          latency.ToString("f2"));

      return 0;
   }

This is the callback for the echoed data; we deserialize the buffer and cast it to the correct type. You can see that we have borrowed the ZeroMQ technique of attaching a "topic" name on the front of the buffer to help us figure out what the buffer contains.

   // invoked when a sample arrives
   void OnEchoReceived(object parent, byte[] msg)
   {
      // deserialize the buffer into a Quote or MarketData object

      // Get the "Topic" from the front of the byte array
      int topicEndIndex = Array.IndexOf(msg, (byte)'\x00');
      String topic = new String(encoding_.GetChars(msg, 0, topicEndIndex));

      object obj = deserialize(msg, topic, this.formatter_);

      if (topic.Equals(MARKET_DATA_TOPIC))
      {
         MarketData md = (MarketData)obj;
         Debug.Assert(md.isEcho == true, "Subscriber forgot to set isEcho flag to true");
         Debug.Assert(md.counter == this.message_count_, "Counter mismatch in subscriber's reply");
      }
      else if (topic.Equals(QUOTE_TOPIC))
      {
         QuoteRequest qr = (QuoteRequest)obj;
         Debug.Assert(qr.isEcho == true, "Subscriber forgot to set isEcho flag to true");
         Debug.Assert(qr.counter == this.message_count_, "Counter mismatch in subscriber's reply");
      }
      else
      {
         Console.Out.WriteLine("ERROR: received topic " + topic);
         return;
      }

      lock (this)
      {
         ++this.echo_count_;
         Monitor.PulseAll(this);
      }
   }

   void TakeEcho()
   {
      lock (this)
      {
         while (this.echo_count_ == 0) {
            Monitor.Wait(this);
         }
         --this.echo_count_;
      }
    }

This is the serialization code. Note that we prepend the "topic" name on the front of the buffer to enable the OnEchoReceived to determine what is in the buffer.

    static byte[] serialize(object obj, String topic, IFormatter formatter, ASCIIEncoding encoding)
    {
       MemoryStream stream = new MemoryStream();

       // "topic" for Boost.Asio
       stream.Write(encoding.GetBytes(topic + '\x00'),
                    0,
                    topic.Length + 1);

       formatter.Serialize(stream, obj);
       stream.Close();
       return stream.ToArray();
    }

    static object deserialize(byte[] msg, String topic, IFormatter formatter)
    {
       MemoryStream stream = new MemoryStream(msg);

       // Seek past "topic" for Boost.Asio
       stream.Seek(topic.Length + 1, SeekOrigin.Begin);
       object obj = formatter.Deserialize(stream);
       stream.Close();
       return obj;
    }
}

The subscriber code is located in boostasio/csharp/BoostTypedDotNetSubscriber.cs, and is shown below. There is also a BoostSubProxy C++/CLI bridge to Boost.Asio, available in boostasio/managed_cpp/BoostSubUtil[.h|.cpp]. However, it is similar to the BoostPubUtil class that we've just shown, so we won't show it here.

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using MiddlewareNewsBrief;

class BoostTypedDotnetSubscriber
{
   BoostSubProxy proxy_;
   IFormatter formatter_ = new BinaryFormatter();
   ASCIIEncoding encoding_ = new System.Text.ASCIIEncoding();

   int message_count_ = 0;

   const String QUOTE_TOPIC = "QuoteRequest";
   const String MARKET_DATA_TOPIC = "MarketData";

   static unsafe int Main(string[] args)
   {
      BoostTypedDotnetSubscriber me = new BoostTypedDotnetSubscriber();
      me.Run(args);
      return 0;
   }

   public BoostTypedDotnetSubscriber()
   {
   }

   int Run(string[] args)
   {
      // Initialize OpenDDS; publisher transport Id is 2 for subscriber process
      this.proxy_ = new BoostSubProxy();
      this.proxy_.ProcessNotification += new BoostSubProxy.EventHandler(OnWriteReceived);

      Console.Out.WriteLine("Ready to receive...");

      // Receive messages, and echo them back
      this.proxy_.run();

      Console.Out.WriteLine("Done...");
      return 0;
   }

This is the callback for the received data; we deserialize the buffer and cast it to the correct type, as we do in the publisher. Again, you can see that we have borrowed the ZeroMQ technique of attaching a "topic" name on the front of the buffer.

   // invoked when a DDS sample arrives
   void OnWriteReceived(object sender, byte[] msg, ulong sessionPtr)
   {
      // deserialize the buffer into a Quote or MarketData object

      // Get the "Topic" from the front of the byte array
      int topicEndIndex = Array.IndexOf(msg, (byte)'\x00');
      String topic = new String(this.encoding_.GetChars(msg, 0, topicEndIndex));

      MemoryStream inStream = new MemoryStream(msg);
      inStream.Seek(topic.Length + 1, SeekOrigin.Begin);

      object obj = this.formatter_.Deserialize(inStream);
      inStream.Close();

      if (topic.Equals(MARKET_DATA_TOPIC))
      {
         MarketData md = (MarketData)obj;
         Debug.Assert(md.isEcho == false, "Subscriber received echo sample");
         Debug.Assert(md.counter == this.message_count_, "Message count mismatch");
         md.isEcho = true;
      }
      else if (topic.Equals(QUOTE_TOPIC))
      {
         QuoteRequest qr = (QuoteRequest)obj;
         Debug.Assert(qr.isEcho == false, "Subscriber received echo sample");
         Debug.Assert(qr.counter == this.message_count_, "Message count mismatch");
         qr.isEcho = true;
      }
      else
      {
         Console.Out.WriteLine("ERROR: received topic " + topic);
         return;
      }

      ++this.message_count_;

      // Re-serialize the message and echo it back

      MemoryStream outStream = new MemoryStream();

      // "topic" for Boost.Asio
      outStream.Write(new System.Text.ASCIIEncoding().GetBytes(topic + '\x00'),
                      0,
                      topic.Length + 1);

      this.formatter_.Serialize(outStream, obj);
      outStream.Close();
      msg = outStream.ToArray();

      this.proxy_.echo(msg, sessionPtr);
   }
}

We'll run the test with the provided Perl script, as follows:

  perl %MNB_ROOT%\bin\boost_test.pl BoostTypedDotnet 1000 

We should see output that looks like this:

  Ready to receive...
  Waiting; running for 1000 messages
  Sending messages -- 1 subscribers, 1000 messages
  Done...
  roundtrip count: 1000


  Your average latency is 413.15 [us]


So our average latency for 1000 messages is about 413 microseconds. As always, you'll want to run the test several times, as the latency numbers can vary based on other processes currently executing.

As with all of our Perl scripts, we can tell the script to launch more than one subscriber:

  perl %MNB_ROOT%\bin\boost_test.pl BoostTypedDotnet 1000 -num-subs 3

The latency numbers for the .NET examples are all quite higher than the numbers for the raw buffer examples, obviously. It's quite intensive to serialize and deserialize .NET objects. As we would expect, the Boost.Asio example was the fastest, followed by ZeroMQ, followed by OpenDDS.

Strongly Typed C++ Tests

However, these tests don't really hit OpenDDS's sweet spot. OpenDDS functions best with strongly typed C++ objects. In our set of examples, we'll use OpenDDS to send several objects across the wire that we define in OpenDDS's Interface Definition Language (IDL). To compare OpenDDS with similarly behaving ZeroMQ and Boost Asio examples, we'll look at a couple of schemes for serializing C++ objects into a raw buffer to send across ZeroMQ and Boost.Asio. Then, we'll compare performance to see how much overhead the serialization adds to the ZeroMQ and Boost.Asio examples as opposed to using OpenDDS's internal serialization capability. In other words, if you're sending C++ instances across publish-subscribe middleware, are the performance advantages of ZeroMQ and/or Boost.Asio eliminated by the overhead of object serialization?

OpenDDS Strongly Typed Test

For the OpenDDS strongly typed C++ example, we define several IDL structs that match the .NET data types we've used in previous examples. The IDL is available at opendds/idl/MarketData.idl, and is shown below:

module MiddlewareNewsBrief
{
  typedef sequence<octet> Octets;

#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::MarketDataEntry"
#pragma DCPS_DATA_KEY  "MiddlewareNewsBrief::MarketDataEntry securityID"
  struct MarketDataEntry
  {
    unsigned long mdUpdateAction;
    unsigned long mdPriceLevel;
    string        mdEntryType;
    unsigned long openCloseSettleFlag;
    unsigned long securityIDSource;
    unsigned long securityID;
    unsigned long rptSeq;
    double        mdEntryPx;
    unsigned long mdEntryTime;   // timestamp
    long          mdEntrySize;
    unsigned long numberOfOrders;
    string        tradingSessionID;
    double        netChgPrevDay;
    unsigned long tradeVolume;
    string        tradeCondition;
    string        tickDirection;
    string        quoteCondition;
    unsigned long aggressorSide;
    string        matchEventIndicator;
  };
  typedef sequence<MarketDataEntry> MarketDataEntries;

#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::MarketData"
#pragma DCPS_DATA_KEY  "MiddlewareNewsBrief::MarketData securityID"
    struct MarketData
    {
      boolean            is_echo;
      unsigned long      counter;
      
      unsigned long      securityID;

      string             applVersionID;
      string             messageType;
      string             senderCompID;
      unsigned long      msgSeqNum;
      unsigned long      sendingTime;   // timestamp
      unsigned long      tradeDate;

      MarketDataEntries  mdEntries;
    };

#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::RelatedSym"
#pragma DCPS_DATA_KEY  "MiddlewareNewsBrief::RelatedSym securityID"
    struct RelatedSym
    {
      string             symbol;
      unsigned long long orderQuantity;
      unsigned long      side;
      unsigned long long transactTime;  // timestamp
      unsigned long      quoteType;
      unsigned long      securityID;
      unsigned long      securityIDSource;
    };
    typedef sequence<RelatedSym> RelatedSyms;

#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::QuoteRequest"
#pragma DCPS_DATA_KEY  "MiddlewareNewsBrief::QuoteRequest securityID"
    struct QuoteRequest
    {
      boolean            is_echo;
      unsigned long      counter;

      unsigned long      securityID;

      string             applVersionID;
      string             messageType;
      string             senderCompID;
      unsigned long      msgSeqNum;
      unsigned long      sendingTime;   // timestamp
      string             quoteReqID;

      RelatedSyms        related;
    };

};

The OpenDDS publisher code is in opendds/cpp/TypedPublisher.cpp. It uses a DDS utility library written for this Middleware News Brief. That library is not shown here, but is available in the attached code archive in the opendds/cpp directory. The publisher is shown below:

MiddlewareNewsBrief::MarketData* createMarketData();
MiddlewareNewsBrief::MarketDataEntry* createMarketDataEntry();
MiddlewareNewsBrief::QuoteRequest* createQuoteRequest();
MiddlewareNewsBrief::RelatedSym* createRelatedSym();

int main(int argc, char** argv)
{
  const int DOMAIN_ID = 8675309;

  size_t numMessages = 10000;
  size_t numSubscribers = 1;

  // Command-Line Arguments
  //
  //    -num numMessages : Number of messages to write
  //    -ns numNubscribers : number of subscribers
  //
  int i = 0;
  while (i < argc) {
    if (std::string(argv[i]) == "-num" && (i+1) < argc) {
      numMessages = boost::lexical_cast<size_t>(argv[++i]);
    } else if (std::string(argv[i]) == "-ns" && (i+1) < argc) {
       numSubscribers = boost::lexical_cast<size_t>(argv[++i]);
    } else if (std::string(argv[i]) == "-?" ) {
      usage();
      return 0;
    }
    ++i;
  }

  std::cout << "Number of messages is " << numMessages << std::endl;

  try
  {
    //
    // OpenDDS Init
    //

    // Initialize, and create a DomainParticipantFactory
    DDS::DomainParticipantFactory_var factory = 
      TheParticipantFactoryWithArgs(argc, argv);

    //  Create the DomainParticipant
    DDS::DomainParticipant_var participant = 
      factory->create_participant(DOMAIN_ID,
                                  PARTICIPANT_QOS_DEFAULT,
                                  0,
                                  0);
    if (participant == 0) 
    {
      std::cerr << "create_participant failed." << std::endl;
      return -1;
    }

    // Create a publisher for the topics
    const int TRANSPORT_IMPL_ID = 1;
    DDS::Publisher_var publisher = 
      MiddlewareNewsBrief::DDSUtil::create_publisher(participant.in(),
                                                     TRANSPORT_IMPL_ID,
                                                     "Primary");

    // Create a subscriber for the Echo of the two topics
    const int TRANSPORT_IMPL_ID_2 = 2;
    DDS::Subscriber_var subscriber = 
      MiddlewareNewsBrief::DDSUtil::create_subscriber(participant.in(),
                                                      TRANSPORT_IMPL_ID_2,
                                                      "Secondary");

    // Initialize the DoneToken manager, which publishes a "done" token
    MiddlewareNewsBrief::DoneTokenManager doneToken;
    doneToken.initWriter(participant,publisher,subscriber);

The code below creates our two DataWriters for the "MarketData" and "QuoteRequest" topics.

    //
    // DataWriters, one per message type
    //

    typedef MiddlewareNewsBrief::MarketData MarketData;
    boost::shared_ptr<MiddlewareNewsBrief::DataWriter<MarketData> > 
      mdDataWriter(new MiddlewareNewsBrief::DataWriter<MarketData>());
    mdDataWriter->init(participant, 
                       publisher, 
                       subscriber);
    mdDataWriter->sleepUsecBetweenWrites(0);

    typedef MiddlewareNewsBrief::QuoteRequest QuoteRequest;
    boost::shared_ptr<MiddlewareNewsBrief::DataWriter<QuoteRequest> > 
      qrDataWriter(new MiddlewareNewsBrief::DataWriter<QuoteRequest>());
    qrDataWriter->init(participant, 
                        publisher, 
                        subscriber);
    qrDataWriter->sleepUsecBetweenWrites(0);

    boost::this_thread::sleep(boost::posix_time::seconds(3));

We create a MarketData and a QuoteRequest object for publication. We do not need to serialize these objects; OpenDDS takes care of that.

    MiddlewareNewsBrief::MarketData_var md = createMarketData();
    MiddlewareNewsBrief::QuoteRequest_var qr = createQuoteRequest();

    std::cout << "Writing..." << std::endl;

    MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE start = 
      MIDDLEWARENEWSBRIEF_PROFILER_GET_TIME;

    for (size_t i = 0; i < numMessages; ++i) 
    {

In the loop, we write either a MarketData or a QuoteRequest and wait for the echo from the subscriber.

      if (i % 5)
      {
        qr->is_echo = false;
        qr->counter = i;

        qrDataWriter->onMessage(qr
  #ifdef MIDDLEWARENEWSBRIEF_PROFILER_ENABLE
                                ,0
  #endif
                                );
        for (size_t jj = 0; jj < numSubscribers; ++jj) {      
          qrDataWriter->recv_echo();
        }
      }
      else
      {
        md->is_echo = false;
        md->counter = i;

        mdDataWriter->onMessage(md
  #ifdef MIDDLEWARENEWSBRIEF_PROFILER_ENABLE
                                ,0
  #endif
                                );
        for (size_t jj = 0; jj < numSubscribers; ++jj) {      
          mdDataWriter->recv_echo();
        }
      }
    }

    MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE end = 
      MIDDLEWARENEWSBRIEF_PROFILER_GET_TIME;

    double latency = (double)(end-start) / ((double)numMessages * 2.0)  / (double)numSubscribers;
    std::cout << "\n\nAverage latency in " 
              << MIDDLEWARENEWSBRIEF_PROFILER_TIME_UNITS
              << ": " << latency << "\n\n" << std::endl;

    doneToken.writeDone("done");

    // Give the samples time to make it to the subscribers
    DDS::Duration_t duration;
    duration.sec = DDS::DURATION_INFINITE_SEC;
    duration.nanosec = DDS::DURATION_INFINITE_NSEC;
    publisher->wait_for_acknowledgments(duration);

    // OpenDDS Cleanup
    doneToken.fini(publisher.in());
    if (publisher != 0) 
    {
      publisher->delete_contained_entities();
      participant->delete_publisher(publisher.in());
      publisher = DDS::Publisher::_nil();
    }
    if (participant != 0) 
    {
      participant->delete_contained_entities();
    }
    if (factory != 0) 
    {
      factory->delete_participant(participant.in ());
    }
    TheTransportFactory->release();
    TheServiceParticipant->shutdown();  
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << std::endl;
    return -1;
  }
  catch (CORBA::Exception& e) 
  {
    std::cerr << "Exception: " << e << std::endl;
    return -1;
  }
  return 0;
}

The OpenDDS subscriber code is in opendds/cpp/TypedSubscriber.cpp, and is shown below:

int main(int argc, char** argv)
{
  const int DOMAIN_ID = 8675309;

  try
  {
    //
    // OpenDDS Init
    //

    // Initialize, and create a DomainParticipantFactory
    DDS::DomainParticipantFactory_var factory = 
      TheParticipantFactoryWithArgs(argc, argv);

    //  Create the DomainParticipant
    DDS::DomainParticipant_var participant = 
      factory->create_participant(DOMAIN_ID,
                                  PARTICIPANT_QOS_DEFAULT,
                                  0,
                                  0);
    if (participant == 0) 
    {
      std::cerr << "create_participant failed." << std::endl;
      return -1;
    }

    // Create a subscriber for the topics
    const int TRANSPORT_IMPL_ID = 1;
    DDS::Subscriber_var subscriber = 
      MiddlewareNewsBrief::DDSUtil::create_subscriber(participant.in(),
                                                      TRANSPORT_IMPL_ID,
                                                      "Primary");
    
    // Create a publisher for the echo of the topics
    const int TRANSPORT_IMPL_ID_2= 2;
    DDS::Publisher_var publisher = 
      MiddlewareNewsBrief::DDSUtil::create_publisher(participant.in(),
                                                     TRANSPORT_IMPL_ID_2,
                                                     "Secondary");


    // Initialize the DoneToken manager, which publishes a "done" token
    MiddlewareNewsBrief::DoneTokenManager doneToken;
    doneToken.initReader(participant,publisher,subscriber);

We create DataReaders for "MarketData" and "QuoteRequest."

    //
    // DataReaders, one per message type
    //

    MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::MarketData> mdDR;
    mdDR.init(participant,publisher,subscriber);

    MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::QuoteRequest> qrDR;
    qrDR.init(participant,publisher,subscriber);

    printf("Finished subscribing; entering event loop\n");

We loop, waiting for events; the MiddlewareNewsBrief::DataReader template that we wrote for this article receives the DDS samples and echoes them back. The details of that class are available in the opendds/cpp directory.

    //
    // Process events
    //
    do 
    {
      boost::this_thread::sleep(boost::posix_time::seconds(1));
    } 
    while(!doneToken.isDone());

    doneToken.do_echo();

    // OpenDDS Cleanup
    doneToken.fini(publisher.in());
    if (participant != 0) 
    {
      participant->delete_contained_entities();
    }
    if (factory != 0) 
    {
      factory->delete_participant(participant.in ());
    }
    TheTransportFactory->release();
    TheServiceParticipant->shutdown();  
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
    return -1;
  }
  catch (CORBA::Exception& e) 
  {
    std::cerr << e << std::endl;
    return -1;
  }
  return 0;
}

We run the example with our opendds_test.pl Perl script:

  perl %MNB_ROOT%\bin\opendds_test.pl OpenDdsTyped -num 1000

The output should look something like this:

  Finished subscribing; entering event loop
  Number of messages is 1000
  Writing...


  Average latency in milliseconds: 0.205


The average latency for a typed OpenDDS message is 205 microseconds. As always, repeated runs of the test will show different values, so run the test repeatedly to get a clearer picture of the average latency. Repeated runs show that the performance penalty over the OpenDDS raw buffer test is quite small, on the order of 20 microseconds. Next, we will find out how ZeroMQ and Boost.Asio compare when we serialize C++ objects into the buffer before writing across the wire.

ZeroMQ Strongly Typed C++ Test

We'll use the Boost Serialization library to serialize our MarketData and QuoteRequest C++ objects into a raw buffer and deserialize them on the other side, and we'll use these types for both the ZeroMQ and the Boost.Asio tests. The following C++ code, available in the common/cpp directory, shows our MarketData and MarketDataEntry types and the Boost Serialization template function, called serialize, that we use to serialize them. We have also defined QuoteRequest and RelatedSym types, but those are not shown.

namespace MiddlewareNewsBrief
{
  struct CommonTypes_Export MarketDataEntry
  {
    friend class boost::serialization::access;

    boost::uint32_t mdUpdateAction;
    boost::uint32_t mdPriceLevel;
    std::string mdEntryType;
    boost::uint32_t openCloseSettleFlag;
    boost::uint32_t securityIDSource;
    boost::uint32_t securityID;
    boost::uint32_t rptSeq;
    double mdEntryPx;
    boost::uint32_t mdEntryTime;
    boost::uint32_t mdEntrySize;
    boost::uint32_t numberOfOrders;
    std::string tradingSessionID;
    double netChgPrevDay;
    boost::uint32_t tradeVolume;
    std::string tradeCondition;
    std::string tickDirection;
    std::string quoteCondition;
    boost::uint32_t aggressorSide;
    std::string matchEventIndicator;

    static MarketDataEntry createTestData();

  private:
    template<class Archive>
    void serialize(Archive& ar, const unsigned int version)
    {
      ar & mdUpdateAction;
      ar & mdPriceLevel;
      ar & mdEntryType;
      ar & openCloseSettleFlag;
      ar & securityID;
      ar & securityIDSource;
      ar & rptSeq;
      ar & mdEntryPx;
      ar & mdEntryTime;
      ar & mdEntrySize;
      ar & numberOfOrders;
      ar & tradingSessionID;
      ar & netChgPrevDay;
      ar & tradeVolume;
      ar & tradeCondition;
      ar & tickDirection;
      ar & quoteCondition;
      ar & aggressorSide;
      ar & matchEventIndicator;
    }  
  };

  struct CommonTypes_Export MarketData
  {
    friend class boost::serialization::access;

    static const std::string TOPIC;

    boost::uint32_t securityID;
    std::string applVersionID;
    std::string messageType;
    std::string senderCompID;
    boost::uint32_t msgSeqNum;
    boost::uint32_t sendingTime;
    boost::uint32_t tradeDate;

    bool isEcho;
    boost::uint32_t counter;

    std::vector<MiddlewareNewsBrief::MarketDataEntry> mdEntries;

    static MarketData createTestData();

  private:
    template<class Archive>
    void serialize(Archive& ar, const unsigned int version)
    {
        ar & securityID;
        ar & applVersionID;
        ar & messageType;
        ar & senderCompID;
        ar & msgSeqNum;
        ar & sendingTime;
        ar & tradeDate;
        
        ar & isEcho;
        ar & counter;

        ar & mdEntries;
    }  
  };
}

Our ZeroMQ publisher creates instances of these types, uses Boost Serialization to serialize them into a buffer, and sends that buffer across the wire. The subscriber then deserializes the buffer, checks some flags, and reserializes to echo the buffer back. The behavior is the same as the .NET examples; we've simply eliminated the use of .NET. The publisher code, located in zeromq/cpp/ZeromqTypedPublisher.cpp, is shown below. Note that we still have to prepend the "topic" name, i.e. the topic filter string, to the front of the buffer, being careful to serialize our C++ object to the point in the buffer after the filter string:

int main (int argc, char *argv [])
{
  try {
    if (argc < 4) {
      std::cout << "usage: ZeromqTypedPublisher <roundtrip-count>  <bind-to-sub> [<connect-to-pub>]+\n"
                << "e.g.:  ZeromqTypedPublisher  10000 tcp://eth0:54321 tcp://spider:54322 tcp://spider:54323\n"
                << "          use a literal IP address and port for each endpoint\n"
                << std::endl;
    }
    size_t roundtrip_count = atoi (argv [1]);
    const char* bind_to_sub = argv[2];

We publish "MarketData" and "QuoteRequest" messages; the subscriber also uses the topic filter string to determine the message type.

    zmq::context_t ctx (1, 1);
    zmq::socket_t pub(ctx,ZMQ_PUB);
    zmq::socket_t sub(ctx,ZMQ_SUB);
    sub.setsockopt(ZMQ_SUBSCRIBE,"MarketData\x00",11);
    sub.setsockopt(ZMQ_SUBSCRIBE,"QuoteRequest\x00",13);
    sub.bind(bind_to_sub);

We connect to each subscriber endpoint.

    size_t num_subscribers = 0;
    for (int argi = 3; argi < argc; ++argi) {
      pub.connect(argv[argi]);
      ++num_subscribers;
    }

    printf("Entering send loop -- %d messages\n", roundtrip_count);

    MiddlewareNewsBrief::MarketData md = 
      MiddlewareNewsBrief::MarketData::createTestData();
    MiddlewareNewsBrief::QuoteRequest qr = 
      MiddlewareNewsBrief::QuoteRequest::createTestData();

    void* watch = zmq_stopwatch_start ();

    for (size_t i = 0; i < roundtrip_count; i++) 
    {
      std::ostringstream request(std::ios::binary);
      boost::archive::binary_oarchive oa(request);

      // Send 10 MarketDatas for each QuoteRequest
      std::string topic_name;

We serialize either a MarketData or a QuoteRequest. The MiddlewareNewsBrief::stream. method (not shown) is a template method that takes an object, an archive, and an index, sets the index, and streams the object into the Boost Serialization archive, which in this case is a binary archive. The MiddlewareNewsBrief::stream method is in common/cpp/Functions_T.h.

      if (i % 10 == 5) 
      {
        topic_name = MiddlewareNewsBrief::QuoteRequest::TOPIC;
        MiddlewareNewsBrief::stream(qr,oa,i);
      }
      else 
      {
        topic_name = MiddlewareNewsBrief::MarketData::TOPIC;
        MiddlewareNewsBrief::stream(md,oa,i);
      }

      size_t request_length = request.str().length() + topic_name.length() + 1;
      zmq::message_t msg(request_length);
      memset(msg.data(),0,msg.size());
      memcpy(msg.data(),topic_name.c_str(),topic_name.length());
      memcpy(reinterpret_cast<char*>(msg.data()) + topic_name.length() + 1,
             request.str().data(),request.str().length());

      pub.send(msg);

      for (size_t jj = 0; jj < num_subscribers; ++jj)
      {
        // Wait for echoed message from each subscriber
        zmq::message_t reply_msg;
        sub.recv(&reply_msg);

        char* reply_buffer = reinterpret_cast<char*>(reply_msg.data());
        
        // "topic" name should be first N characters, null-terminated
        std::string reply_topic_name(reply_buffer);

Once we receive the echoed buffer from the subscriber, we put the buffer in a std::stringstream before we hand it off to be deserialized.

        std::istringstream reply_stream(
          std::string(reply_buffer + reply_topic_name.length() + 1,
                      reply_msg.size() - (reply_topic_name.length() + 1)),
          std::ios::binary);
        boost::archive::binary_iarchive ia(reply_stream);

We deserialize the echoed object. The MiddlewareNewsBrief::check method (not shown) is a template method that takes an empty object, an archive, and an index, deserializes the buffer into the object, and checks the object's index. The MiddlewareNewsBrief::check method is also in common/cpp/Functions_T.h.

        if (reply_topic_name == MiddlewareNewsBrief::MarketData::TOPIC) 
        {
          MiddlewareNewsBrief::MarketData mdReply;
          if (MiddlewareNewsBrief::check(mdReply,ia,i) != 0) { return -1; }
        } 
        else if (reply_topic_name == MiddlewareNewsBrief::QuoteRequest::TOPIC)
        {
          MiddlewareNewsBrief::QuoteRequest qrReply;
          if (MiddlewareNewsBrief::check(qrReply,ia,i) != 0) { return -1; }
        }
        else 
        {
          std::cerr << "Received invalid topic name: " << reply_topic_name.c_str() << std::endl;
          return -1;
        }
      }
    }

    unsigned long elapsed = zmq_stopwatch_stop (watch);
    double latency = (double) elapsed / (roundtrip_count * 2.0) / (double)num_subscribers;

    printf ("roundtrip count: %d\n", (int) roundtrip_count);
    printf ("\n\naverage latency: %.3f [us]\n\n\n", (double) latency);
  
    return 0;
  } catch (std::exception& e) {
    std::cout << "An error occurred: " << e.what() << std::endl;
    return 1;
  }
}

Note that we used a boost::archive::binary_oarchive for performance reasons. The data serialized into a binary archive is NOT interoperable, however — it cannot be sent reliably between a publisher and a subscriber running on two different platforms, such as Windows and Linux. For interoperability between platforms, use a boost::archive::text_oarchive. Keep in mind, however, that the boost::archive::text_oarchive is much slower than the boost::archive::binary_oarchive. We chose the binary form to demonstrate the performance of ZeroMQ and Boost.Asio in the best possible light for this test. Note that by using OpenDDS you do not need to make this choice; OpenDDS's native serialization safely writes between participants running on different platforms.

The subscriber code, located in zeromq/cpp/ZeromqTypedSubscriber.cpp, is shown below:

int main (int argc, char *argv [])
{
  try {
    if (argc != 4) {
      std::cout << "usage: ZeromqTypedSubscriber <roundtrip-count> <bind-to-sub> <connect-to-pub>\n"
                << "e.g.:  ZeromqTypedSubscriber 10000 tcp://eth0:54322 tcp://spider:54321\n"
                << "          use a literal IP address and port for each endpoint\n"
                << std::endl;
        return 1;
    }
    size_t roundtrip_count = atoi (argv [1]);
    const char* bind_to_sub = argv [2];
    const char* connect_to_pub = argv [3];

    zmq::context_t ctx (1, 1);
    zmq::socket_t pub (ctx, ZMQ_PUB);
    pub.connect(connect_to_pub);

We subscribe to "MarketData" and "QuoteRequest" messages; the subscriber also uses this topic filter string to determine the message type.

    zmq::socket_t sub (ctx, ZMQ_SUB);
    sub.setsockopt(ZMQ_SUBSCRIBE,"MarketData\x00",11);
    sub.setsockopt(ZMQ_SUBSCRIBE,"QuoteRequest\x00",13);
    sub.bind(bind_to_sub);

    boost::detail::atomic_count msg_counter(0);

    printf("Entering recv loop -- %d messages\n", 
           roundtrip_count);

    for (size_t i = 0; i < roundtrip_count; i++) {
      zmq::message_t msg;
      sub.recv (&msg);

      char* msg_buffer = reinterpret_cast<char*>(msg.data());
      
      // "topic" name should be first N characters, null-terminated
      std::string topic_name(msg_buffer);

We take the request and reply buffers and create binary Boost Serialization archives for each.

      std::istringstream request(
        std::string(msg_buffer + topic_name.length() + 1,
                    msg.size() - (topic_name.length() + 1)),
        std::ios::binary);
      boost::archive::binary_iarchive ia(request);

      std::ostringstream reply(std::ios::binary);
      boost::archive::binary_oarchive oa(reply);

We deserialize the reserialize the objects. The MiddlewareNewsBrief::check_and_restream method (not shown) is another template method that takes an empty object, an input archive, an output archive, and an index, deserializes the input buffer into the object, checks the object's index, and reserializes the object into the output buffer. The MiddlewareNewsBrief::check_and_restream method is also in common/cpp/Functions_T.h.

      if (topic_name == MiddlewareNewsBrief::MarketData::TOPIC) 
      {
        MiddlewareNewsBrief::MarketData md;
        MiddlewareNewsBrief::check_and_restream(md,ia,oa,msg_counter);
      } 
      else if (topic_name == MiddlewareNewsBrief::QuoteRequest::TOPIC)
      {
        MiddlewareNewsBrief::QuoteRequest qr;
        MiddlewareNewsBrief::check_and_restream(qr,ia,oa,msg_counter);
      }
      else 
      {
        std::cerr << "Received invalid topic name: " << topic_name.c_str() << std::endl;
        return -1;
      }

We echo the objects back to the publisher.

      // Echo it back
      size_t reply_length = reply.str().length() + topic_name.length() + 1;
      zmq::message_t reply_msg(reply_length);
      memset(reply_msg.data(),0,reply_msg.size());
      memcpy(reply_msg.data(),topic_name.c_str(),topic_name.length());
      memcpy(reinterpret_cast<char*>(reply_msg.data()) + topic_name.length() + 1,reply.str().data(),reply.str().length());

      pub.send(reply_msg,0);

      ++msg_counter;
    }
    printf("Finished receiving messages\n");
    
    return 0;

  } catch (std::exception& e) {
    std::cout << "An error occurred: " << e.what() << std::endl;
    return 1;
  }
}

Note again that we manually extract the "topic" name from the ZeroMQ buffer, and we must be careful to manually skip over it before we attempt to deserialize the objects.

We'll run the test with the provided Perl script, as follows:

  perl %MNB_ROOT%\bin\zeromq_test.pl ZeromqTyped 1000 

We should see output that looks like this:

  Entering recv loop -- 1000 messages
  Entering send loop -- 1000 messages
  Finished receiving messages
  roundtrip count: 1000


  average latency: 577.619 [us]


So our average latency for 1000 messages is about 577 microseconds.

However, you'll notice that the latency is quite a bit higher than the OpenDDS latency. Adding the serialization overwhelms the raw buffer performance advantage of ZeroMQ. We think it's a fair comparison — if an application wants to send strongly typed C++ data across the wire over ZeroMQ, Boost Serialization is a reasonably likely choice. Recall that we chose binary serialization over text serialization due to the significant performance advantage of binary serialization, even though it prevents us from interoperating across different platforms. Later, we'll look at another example where we use Google Protocol Buffers to serialize our C++ objects before sending them through ZeroMQ and see if that helps the comparison.

Boost.Asio Strongly Typed C++ Test

The Boost.Asio example uses the same C++ types as we used above in the ZeroMQ strongly typed test.

The publisher code, located in boostasio/cpp/BoostTypedPublisher.cpp, is below:

using boost::asio::ip::tcp;

int main(int argc, char* argv[])
{
  try
  {
    if (argc < 4)
    {
      std::cerr << "Usage: BoostTypedPublisher <num-messages> [<host> <port>]+\n";
      return 1;
    }
    size_t num_messages = std::atoi(argv[1]);
    const size_t used_args = 2;

    boost::asio::io_service io_service;

    size_t num_subscribers = (argc - used_args) / 2;    
    std::vector<boost::shared_ptr<tcp::socket> > subscribers;
    subscribers.reserve(num_subscribers);

We resolve each subscription's endpoint.

    for (size_t i=0; i < num_subscribers; ++i)
    {
      tcp::resolver resolver(io_service);
      tcp::resolver::query query(tcp::v4(), argv[i*2+used_args], argv[i*2+used_args+1]);
      tcp::resolver::iterator iterator = resolver.resolve(query);

      boost::shared_ptr<tcp::socket> s(new tcp::socket(io_service));      
      subscribers.push_back(s);
      s->connect(*iterator);
    }

    printf("Sending %d messages to %d subscribers\n",
           num_messages,
           num_subscribers);

    MiddlewareNewsBrief::MarketData md = 
      MiddlewareNewsBrief::MarketData::createTestData();
    MiddlewareNewsBrief::QuoteRequest qr = 
      MiddlewareNewsBrief::QuoteRequest::createTestData();

    MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE start = 
      MIDDLEWARENEWSBRIEF_PROFILER_GET_TIME;

    const size_t MAX_REPLY_BUFFER_SIZE = 1024;
    boost::shared_ptr<char> reply_buffer(new char[MAX_REPLY_BUFFER_SIZE]);

    for (size_t i=0; i < num_messages; ++i) 
    {

We use the Boost Serialization archive classes just as we did in the ZeroMQ example.

      std::ostringstream request(std::ios::binary);
      boost::archive::binary_oarchive oa(request);

We serialize either a MarketData or a QuoteRequest. The MiddlewareNewsBrief::stream method is the same as in the ZeroMQ example.

      // Send 10 MarketDatas for each QuoteRequest
      if (i % 10 == 5) 
      {
        oa << MiddlewareNewsBrief::QuoteRequest::TOPIC;
        MiddlewareNewsBrief::stream(qr,oa,i);
      }
      else 
      {
        oa << MiddlewareNewsBrief::MarketData::TOPIC;
        MiddlewareNewsBrief::stream(md,oa,i);
      }

We write to each attached subscription.

      // Two loops here simulates pub/sub behavior, 
      // where we publish to all subscribers before
      // looking for an echoed sample coming back

      for (size_t jj = 0; jj < num_subscribers; ++jj) {
        boost::asio::write(*(subscribers[jj]), 
                           boost::asio::buffer(request.str().data(), request.str().size()));
      }   

      // Reply is same size as request for this test
      size_t reply_buffer_size = request.str().size();
      if (reply_buffer_size > MAX_REPLY_BUFFER_SIZE) 
      {
        std::cerr << "Maximum reply buffer size exceeded: " << reply_buffer_size << std::endl;
        return -1;
      }

We wait for each subscription's reply, and pull the topic name out of the reply.

      for (size_t jj = 0; jj < num_subscribers; ++jj) 
      {
        memset(reply_buffer.get(),0,reply_buffer_size);
        size_t reply_length = boost::asio::read(*(subscribers[jj]),
            boost::asio::buffer(reply_buffer.get(), reply_buffer_size));

        std::istringstream reply_stream(std::string(reply_buffer.get(),reply_length),
                                        std::ios::binary);
        boost::archive::binary_iarchive ia(reply_stream);

        std::string topic_name;
        ia >> topic_name;

We deserialize the echoed reply object. The MiddlewareNewsBrief::check method is the same as in the ZeroMQ example.

        if (topic_name == MiddlewareNewsBrief::MarketData::TOPIC) 
        {
          MiddlewareNewsBrief::MarketData mdReply;
          if (MiddlewareNewsBrief::check(mdReply,ia,i) != 0) { return -1; }
        } 
        else if (topic_name == MiddlewareNewsBrief::QuoteRequest::TOPIC)
        {
          MiddlewareNewsBrief::QuoteRequest qrReply;
          if (MiddlewareNewsBrief::check(qrReply,ia,i) != 0) { return -1; }
        }
        else 
        {
          std::cerr << "Received invalid topic name: " << topic_name.c_str() << std::endl;
          return -1;
        }
      }
    }

    MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE finish = 
      MIDDLEWARENEWSBRIEF_PROFILER_GET_TIME;

    MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE elapsed =
      MIDDLEWARENEWSBRIEF_PROFILER_DIFF(finish,start);

    double latency = (double) elapsed / (num_messages * 2.0) / (double)(num_subscribers);
    printf("\n\nAverage latency in %s: %.3f\n\n\n", 
           MIDDLEWARENEWSBRIEF_PROFILER_TIME_UNITS,
           latency);
    printf("Finished\n");
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

The subscriber code, located in boostasio/cpp/BoostTypedSubscriber.cpp, is below. The Server and Session classes were written for this Middleware News Brief, and can be found in the boostasio/cpp directory, but are not shown.

using boost::asio::ip::tcp;

void on_data_available(char* buffer, 
                       size_t num_bytes, 
                       Session* session, 
                       boost::detail::atomic_count* msg_counter);

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 3)
    {
      std::cerr << "Usage: BoostTypedSubscriber <num-messages> <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;
    boost::detail::atomic_count msg_counter(0);

    size_t num_messages = std::atoi(argv[1]);

The Server class was also used in the Boost.Asio Raw Buffer test. We bind the on_data_available function as a callback function for Boost.Asio.

    Server s(io_service, 
             std::atoi(argv[2]), 
             num_messages, 
             boost::bind(&on_data_available,_1,_2,_3,&msg_counter));

    printf("Waiting; running for %d messages\n", num_messages);

    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

This callback is triggered for each received buffer.

void 
on_data_available(char* buffer, size_t num_bytes, Session* session, boost::detail::atomic_count* msg_counter)
{
  std::istringstream request(std::string(buffer,num_bytes),std::ios::binary);
  boost::archive::binary_iarchive ia(request);

  std::string topic_name;
  ia >> topic_name;

  std::ostringstream reply(std::ios::binary);
  boost::archive::binary_oarchive oa(reply);
  oa << topic_name;

  size_t counter = *msg_counter;
  ++(*msg_counter);

We deserialize either a MarketData or a QuoteRequest. The MiddlewareNewsBrief::check_and_restream method is the same as in the ZeroMQ example.

  if (topic_name == MiddlewareNewsBrief::MarketData::TOPIC) 
  {
    MiddlewareNewsBrief::MarketData md;
    MiddlewareNewsBrief::check_and_restream(md,ia,oa,counter);
  } 
  else if (topic_name == MiddlewareNewsBrief::QuoteRequest::TOPIC)
  {
    MiddlewareNewsBrief::QuoteRequest qr;
    MiddlewareNewsBrief::check_and_restream(qr,ia,oa,counter);
  }
  else 
  {
    std::cerr << "Received invalid topic name: " << topic_name.c_str() << std::endl;
    return;
  }

We echo the objects back to the publisher.

  session->write(reply.str().data(), reply.str().size());
}

We'll run the test with the provided Perl script, as follows:

  perl %MNB_ROOT%\bin\boost_test.pl BoostTyped 1000 

We should see output that looks like this:

  Waiting; running for 1000 messages
  Sending 1000 messages to 1 subscribers


  Average latency in milliseconds: 0.396


  Finished

So our average latency for 1000 messages is about 396 microseconds.

You'll notice that this is quite a bit faster than the ZeroMQ version of this test, but is significantly slower that the OpenDDS version where we use OpenDDS's ability to specify types in IDL and let OpenDDS handle the serialization for us. The takeaway from this is that serialization adds a lot of overhead, and can overwhelm any gain in performance from using a lighter-weight library.

ZeroMQ Google Protocol Buffers Test

Finally, we'll use Google Protocol Buffers to serialize our C++ objects and deserialize them. The theory is that Google Protocol Buffers might be more efficient at serialization, and thus will make for a better comparison between OpenDDS and ZeroMQ.

We specify our data types in an IDL-like language that is part of the Google Protocol Buffers specification. Our data types are defined in zeromq/cpp/MarketData.proto, and shown below:


message pbMarketDataEntry
{
  required uint32        mdUpdateAction = 1;
  required uint32        mdPriceLevel = 2;
  required string        mdEntryType = 3;
  required uint32        openCloseSettleFlag = 4;
  required uint32        securityIDSource= 5;
  required uint32        securityID = 6;
  required uint32        rptSeq = 7;
  required double        mdEntryPx = 8;
  required uint32        mdEntryTime = 9;   // timestamp
  required int32         mdEntrySize = 10;
  required uint32        numberOfOrders = 11;
  required string        tradingSessionID = 12;
  required double        netChgPrevDay = 13;
  required uint32        tradeVolume = 14;
  required string        tradeCondition = 15;
  required string        tickDirection = 16;
  required string        quoteCondition = 17;
  required uint32        aggressorSide = 18;
  required string        matchEventIndicator = 19;
};

message pbMarketData
{
  required bool        is_echo = 1;
  required uint32      counter = 2;
      
  required uint32      securityID = 3;

  required string      applVersionID = 4;
  required string      messageType = 5;
  required string      senderCompID = 6;
  required uint32      msgSeqNum = 7;
  required uint32      sendingTime = 8;   // timestamp
  required uint32      tradeDate = 9;

  repeated pbMarketDataEntry  mdEntries = 10;
};

message pbRelatedSym
{
  required string      symbol = 1;
  required uint64      orderQuantity = 2;
  required uint32      side = 3;
  required uint64      transactTime = 4;  // timestamp
  required uint32      quoteType = 5;
  required uint32      securityID = 6;
  required uint32      securityIDSource = 7;
};

message pbQuoteRequest
{
  required bool          is_echo = 1;
  required uint32        counter = 2;

  required uint32        securityID = 3;

  required string        applVersionID = 4;
  required string        messageType = 5;
  required string        senderCompID = 6;
  required uint32        msgSeqNum = 7;
  required uint32        sendingTime = 8;   // timestamp
  required string        quoteReqID = 9;

  repeated pbRelatedSym related = 10;
};

Our ZeroMQ publisher and subscriber are virtually the same as before except that they create instances of these Google Protocol Buffers types and use Protocol Buffers for serialization and deserialization. Our build script compiles the Messages.proto file with the Google Protocol Buffers compiler, as shown below:

  protoc --cpp_out=. MarketData.proto

Our ZeroMQ Protocol Buffers publisher is located in zeromq/cpp/ZeromqTypedProtobufPublisher.cpp, and is shown below:

pbMarketData createMarketData();
pbQuoteRequest createQuoteRequest();

const std::string MARKET_DATA_TOPIC = "MarketData";
const std::string QUOTE_REQUEST_TOPIC = "QuoteRequest";

int main (int argc, char *argv [])
{
  try {
    if (argc < 4) {
      std::cout << "usage: ZeromqTypedProtobufPublisher <roundtrip-count>  <bind-to-sub> [<connect-to-pub>]+\n"
                << "e.g.:  ZeromqTypedProtobufPublisher  10000 tcp://eth0:54321 tcp://spider:54322 tcp://spider:54323\n"
                << "          use a literal IP address and port for each endpoint\n"
                << std::endl;
    }
    size_t roundtrip_count = atoi (argv [1]);
    const char* bind_to_sub = argv[2];

We publish "MarketData" and "QuoteRequest" messages, as before.

    zmq::context_t ctx (1, 1);
    zmq::socket_t pub(ctx,ZMQ_PUB);
    zmq::socket_t sub(ctx,ZMQ_SUB);
    sub.setsockopt(ZMQ_SUBSCRIBE,"MarketData\x00",11);
    sub.setsockopt(ZMQ_SUBSCRIBE,"QuoteRequest\x00",13);
    sub.bind(bind_to_sub);

We connect to each subscription endpoint, as before.

    size_t num_subscribers = 0;
    for (int argi = 3; argi < argc; ++argi) {
      pub.connect(argv[argi]);
      ++num_subscribers;
    }

    printf("Entering send loop -- %d messages\n", roundtrip_count);

    pbMarketData md = createMarketData();
    pbQuoteRequest qr = createQuoteRequest();

    void* watch = zmq_stopwatch_start ();

    for (size_t i = 0; i < roundtrip_count; i++) 
    {

We set a google::protobuf::Message pointer to point to either a pbMarketData or a pbQuoteRequest object.

      // Send 10 MarketDatas for each QuoteRequest
      const std::string* topic_name;
      google::protobuf::Message* payload;
      if (i % 10 == 5) 
      {
        topic_name = &QUOTE_REQUEST_TOPIC;
        qr.set_is_echo(false);
        qr.set_counter(i);
        payload = &qr;
      }
      else 
      {
        topic_name = &MARKET_DATA_TOPIC;
        md.set_is_echo(false);
        md.set_counter(i);
        payload = &md;
      }

We serialize the Google Protocol Buffers object into a byte array. Note that we still prepend a topic filter string to the front of the buffer, and we must be careful to serialize to the buffer after that string.

      size_t request_length = payload->ByteSize() + topic_name->length() + 1;
      zmq::message_t msg(request_length);
      memset(msg.data(),0,msg.size());
      memcpy(msg.data(),topic_name->c_str(),topic_name->length());
      payload->SerializeToArray(reinterpret_cast<char*>(msg.data())+topic_name->length()+1,payload->ByteSize());

      pub.send(msg);

      for (size_t jj = 0; jj < num_subscribers; ++jj) 
      {

We wait for the echoed reply, and extract the topic name after we have it.

        // Wait for echoed message from each subscriber
        zmq::message_t reply_msg;
        sub.recv(&reply_msg);

        char* reply_buffer = reinterpret_cast<char*>(reply_msg.data());
        
        // "topic" name should be first N characters, null-terminated
        std::string reply_topic_name(reply_buffer);

The ParseFromArray method is a Google Protocol Buffers method that deserializes the raw buffer into an object. We use topic name string which we have prepended onto the buffer to figure out which type the buffer contains, and then deserialize it into either a MarketData or QuoteRequest object.

        if (reply_topic_name == MARKET_DATA_TOPIC) 
        {
          pbMarketData mdReply;
          mdReply.ParseFromArray(reply_buffer + reply_topic_name.length() + 1,
                                 reply_msg.size() - (reply_topic_name.length() + 1));

          if (mdReply.is_echo() == false || mdReply.counter() != i)
          {
            std::cerr << "MarketData reply isEcho or counter mismatch" << std::endl;
          }
        } 
        else if (reply_topic_name == QUOTE_REQUEST_TOPIC)
        {
          pbQuoteRequest qrReply;
          qrReply.ParseFromArray(reply_buffer + reply_topic_name.length() + 1,
                                 reply_msg.size() - (reply_topic_name.length() + 1));

          if (qrReply.is_echo() == false || qrReply.counter() != i)
          {
            std::cerr << "QuoteRequest reply isEcho or counter mismatch" << std::endl;
          }
        }
        else 
        {
          std::cerr << "Received invalid topic name: " << reply_topic_name.c_str() << std::endl;
          return -1;
        }
      }
    }

    unsigned long elapsed = zmq_stopwatch_stop (watch);
    double latency = (double) elapsed / (roundtrip_count * 2.0) / (double)num_subscribers;

    printf ("roundtrip count: %d\n", (int) roundtrip_count);
    printf ("\n\naverage latency: %.3f [us]\n\n\n", (double) latency);
  
    return 0;
  } catch (std::exception& e) {
    std::cout << "An error occurred: " << e.what() << std::endl;
    return 1;
  }
}

The subscriber code, located in zeromq/cpp/ZeromqTypedProtobufSubscriber.cpp, is shown below:

const std::string MARKET_DATA_TOPIC = "MarketData";
const std::string QUOTE_REQUEST_TOPIC = "QuoteRequest";

int main (int argc, char *argv [])
{
  try {
    if (argc != 4) {
      std::cout << "usage: ZeromqTypedProtobufSubscriber <roundtrip-count> <bind-to-sub> <connect-to-pub>\n"
                << "e.g.:  ZeromqTypedProtobufSubscriber 10000 tcp://eth0:54322 tcp://spider:54321\n"
                << "          use a literal IP address and port for each endpoint\n"
                << std::endl;
        return 1;
    }
    size_t roundtrip_count = atoi (argv [1]);
    const char* bind_to_sub = argv [2];
    const char* connect_to_pub = argv [3];

    zmq::context_t ctx (1, 1);
    zmq::socket_t pub (ctx, ZMQ_PUB);
    pub.connect(connect_to_pub);

We subscribe to "MarketData" and "QuoteRequest" messages, as before.

    zmq::socket_t sub (ctx, ZMQ_SUB);
    sub.setsockopt(ZMQ_SUBSCRIBE,"MarketData\x00",11);
    sub.setsockopt(ZMQ_SUBSCRIBE,"QuoteRequest\x00",13);
    sub.bind(bind_to_sub);

    boost::detail::atomic_count msg_counter(0);

    printf("Entering recv loop -- %d messages\n", 
           roundtrip_count);

    pbMarketData md;
    pbQuoteRequest qr;
    for (size_t i = 0; i < roundtrip_count; i++) {
      zmq::message_t msg;

We wait for a message from the publisher, and pull out the topic name once we have it.

      sub.recv (&msg);

      char* msg_buffer = reinterpret_cast<char*>(msg.data());
      
      // "topic" name should be first N characters, null-terminated
      std::string topic_name(msg_buffer);
      google::protobuf::Message* payload;

Again, the ParseFromArray method is the Google Protocol Buffers method that deserializes the raw buffer into an object. Note that, as we do with all ZeroMQ and Boost.Asio examples, we use the topic name to determine the type. We manually skip over the topic name before deserializing the object.

      if (topic_name == MARKET_DATA_TOPIC) 
      {
        md.ParseFromArray(msg_buffer + topic_name.length() + 1,
                          msg.size() - (topic_name.length() + 1));

        if (md.is_echo() == true || md.counter() != i)
        {
          std::cerr << "MarketData reply isEcho or counter mismatch" << std::endl;
        }
        md.set_is_echo(true);
        payload = &md;
      } 
      else if (topic_name == QUOTE_REQUEST_TOPIC)
      {
        qr.ParseFromArray(msg_buffer + topic_name.length() + 1,
                          msg.size() - (topic_name.length() + 1));

        if (qr.is_echo() == true || qr.counter() != i)
        {
          std::cerr << "QuoteRequest reply isEcho or counter mismatch" << std::endl;
        }
        qr.set_is_echo(true);
        payload = &qr;
      }
      else 
      {
        std::cerr << "Received invalid topic name: " << topic_name.c_str() << std::endl;
        return -1;
      }

We echo the reserialized object back to the publisher, prepending the topic name to the front of the buffer.

      // Echo it back
      size_t reply_length = payload->ByteSize() + topic_name.length() + 1;
      zmq::message_t reply_msg(reply_length);
      memset(reply_msg.data(),0,reply_msg.size());
      memcpy(reply_msg.data(),topic_name.c_str(),topic_name.length());
      payload->SerializeToArray(reinterpret_cast<char*>(reply_msg.data())+topic_name.length()+1,payload->ByteSize());

      pub.send(reply_msg,0);

      ++msg_counter;
    }
    printf("Finished receiving messages\n");
    
    return 0;

  } catch (std::exception& e) {
    std::cout << "An error occurred: " << e.what() << std::endl;
    return 1;
  }
}

We'll run the test with the provided Perl script, as follows:

  perl %MNB_ROOT%\bin\zeromq_test.pl ZeromqTypedProtobuf 1000 

We should see output that looks like this:

  Entering recv loop -- 1000 messages
  Entering send loop -- 1000 messages
  Finished receiving messages
  roundtrip count: 1000


  average latency: 216.139 [us]


So our average latency for 1000 messages is about 216 microseconds.

This is much faster than the ZeroMQ example using Boost Serialization, so there's definitely a gain to be had from using a library such as Google Protocol Buffers. However, there doesn't appear to be a performance advantage here over using OpenDDS with its IDL compilation and strongly typed data facilities. OpenDDS latency came in at around 205 microseconds. If you plan to use some type of interface definition language and a compiler to generate code from that language, you may as well use something like OpenDDS where all of the pieces are fully integrated.

Performance Summary

The following table summarizes the performance, in terms of latency, of our various test cases. As previously stated, if you want to duplicate these tests, be sure to run each test several times with as few processes running as possible. Repeated tests can show widely different numbers. The numbers presented here are essentally 80th percentile numbers gathered from several test runs.

 OpenDDS Raw Buffer 185 usec  
 ZeroMQ Raw Buffer 170 usec  
 Boost.Asio Raw Buffer 75 usec  
 OpenDDS .NET Object streamed through a Raw Buffer 630 usec  
 ZeroMQ .NET Object streamed through a Raw Buffer 537 usec  
 Boost.Asio .NET Object streamed through a Raw Buffer 413 usec  
 OpenDDS Strongly Typed Data 205 usec  
 ZeroMQ Strongly Typed Data with Boost Serialization 577 usec  
 Boost.Asio Strongly Typed Data with Boost Serialization 396 usec  
 ZeroMQ Strongly Typed Data with Google Protocol Buffers         216 usec  

Summary

We have demonstrated that OpenDDS, ZeroMQ, and Boost.Asio can all be used successfully to send various types of data — raw buffers, .NET objects, and typed C++ data — from a publisher to a subscriber; however, ease of use and performance are significantly impacted by the choice of middleware.

Of the three, OpenDDS contains the most complete set of publish-and-subscribe capabilities in that topics and endpoints are managed by the middleware, extensive Quality-of-Service attributes are available to configure common publish-subscribe middleware behaviors, and transports (such as TCP/IP, UDP unicast, and reliable and unreliable UDP multicast) can be configured completely outside of code. We can start as many publishers and subscribers as we want on a topic without the need to manually configure endpoints on the publisher or subscriber. Responsibilities of the middleware layer can be kept separated from the application code.

Boost.Asio has the lightest-weight interface over sockets. Our Boost.Asio example supports only TCP/IP (although other Boost.Asio-supported transports can be used with coding effort, but the application developer would be responsible for that). However, for a simple, straightforward application that doesn't want to do much more than send raw buffers over TCP/IP, Boost.Asio has the lowest latency.

ZeroMQ sits in between OpenDDS and Boost.Asio in terms of both performance and usage. ZeroMQ is a lightweight layer over sockets, although it does provide mechanisms for configuring various transports (such as a reliable multicast transport based on Pragmatic Reliable Multicast), and it also provides additional message queuing facilities that are not discussed in this article.

For the case of sending typed C++ data over publish-subscrbe middleware, OpenDDS excels. OpenDDS's IDL compiler generates code that serializes IDL-generated C++ objects very efficiently, adding little overhead when compared to sending raw buffers over OpenDDS, and matching or exceeding the combination of ZeroMQ and Google Protocol Buffers while providing a higher-level publish-subscribe interface.

OpenDDS is professionally developed and commercially supported by OCI, a full-service software engineering, open source product and training company.

References


OCI Education Services

Object Computing, Inc. (OCI) is the leading provider of object-oriented technology training in the Midwest. Thousands of students participate in our training program every year. Targeted toward software engineers and the development community, our extensive program of over 50 hands-on workshops is delivered to corporations and individuals throughout the U.S. and internationally. OCI's Education Services include Private and Public Training and Lab Rentals. For more information visit ocitraining.com.

OCI Products and Services

OCI offers downloads and commercial support for a variety of middleware technologies.

ACE + TAO - premier open source C++ CORBA ORB Boost - portable reusable free C++ libraries JacORB - Leading open source Java CORBA ORB opalORB - premier open source Perl CORBA ORB OpenDDS - open source C++ implementation of DDS QuickFIX - Full-featured open source multi-platform language QuickFAST - Open source C++ and .Net implementation of FAST