![]() |
![]() |
![]() |
![]() |
For security traders, having up-to-date and accurate information has always been critical. The financial industry was an early adopter of electronic communication to distribute market data going back to the days of the stock ticker, and continues to push the limits of the technology today.
In recent years, the volume of market data generated by exchanges around the world has skyrocketed. Not only is the amount of data increasing, but so are the types of data being tracked and published. The global nature of the financial world means that traders now have to watch multiple exchanges, where a few years ago they might have monitored only one or two. Trading firms and exchanges are finding a need to process messages at increasing rates, increases that are not matched by improvements in processor speed or network throughput. This requires innovative ways to cope with the growth.
The exchanges have helped by adopting a standard way to compress market data, called FAST (Fix Adapted for STreaming). Dedicated lines from vendors ensure high bandwidth and reliability. However at the receiving end, the market data still has to be decoded, filtered, distributed for analysis, and sent to the trader or trading system before it can result in a trade. This time lapse from the feed to trade is called latency. It cannot be avoided, but shrewd trading companies try to minimize it to achieve a timing advantage in the market. A trading company's market data infrastructure needs to handle a high volume of data from multiple sources in a cost-effective, reliable manner with the lowest possible latency. The market data infrastructure must optimize its use of available bandwidth and disseminate data with a minimum of overhead.
Open-source tools such as QuickFAST and OpenDDS can make low-latency, high-volume data distribution a reality. QuickFAST is an open-source implementation of the FAST protocol, an efficient data compression protocol created by the FIX Protocol Limited organization. OpenDDS is an implementation of the Object Management's Group's Data Distribution Service for Real-Time Systems (DDS) specification for low-latency publish/subscribe data dissemination. By using open source software on the critical path, trading companies can gain continuous insight into their latency and make plans to reduce it.
This paper discusses the testing that OCI did to explore the combination of an open source decoder (QuickFAST) with an open source publish and subscribe distribution service (OpenDDS) using commodity hardware. OCI has put together a typical system that uses these open source tools to accept incoming market data and distribute it throughout the local office to the locations where it will be most useful.
We tested this system on two SGI Intel Xeon W5590 3.33GHz servers
connected via
a Voltaire QDR Infiniband switch and running SUSE Linux Enterprise
Server
11. These servers were provided by SGI.
The
test results depended greatly on how complex the messages were and
how much
processing we did on each message. For simple messages, similar
to
the ARCA messages published by the New York Stock Exchange, we
published
each FAST message as one OpenDDS sample.
For complex messages, similar to the messages published by the Chicago
Mercantile Exchange (CME), we split up the
message's MarketDataEntries sequence and published each security's data
on a separate OpenDDS sample.
For our simple, ARCA-like message, we have observed QuickFAST decode
times under 500 nanoseconds, and latencies in publishing across OpenDDS
between hosts in the neighborhood of about 40 microseconds. For
complex, CME-like messages, we have observed QuickFAST decode times
under 2.5 microseconds, and latencies in publishing across OpenDDS
between hosts in the neighborhood of about 65 microseconds.
As always, your mileage may vary. The attached source code
should enable you to run these examples, and others, in your own
environment to compare results.
QuickFAST is an open-source native C++ implementation of the FAST protocol. FAST (Fix Adapted for STreaming) was developed by FIX Protocol Limited as a way to reduce the bandwidth and network-latency required to distribute market data without incurring excessive CPU costs. It is being widely adopted in the financial industry. FAST applications exchange compression instructions out-of-band to take advantage of the repetitive nature of financial information.
FAST participants exchange out-of-band templates that describe how messages are compressed across the wire. A FAST template for a message defines the fields contained in the message, field instructions for each field, and field operators for each field instruction. For example, a field operator may define the field as a constant-valued field, which means that the field's contents are not even sent across the wire since the field's value is encoded in the template. Or, a field operator may define the field as a copied field, meaning that if the field's value is not present then the value is assumed to be the same as the previous message's value. Examples of FAST templates appear later in this article. These are simple examples, and the FAST protocol enables a great deal more control over the compression behavior than what we show here.
A QuickFAST application implements the QuickFAST ValueMessageBuilder
to receive a callback from QuickFAST for each FAST
message
and each FAST message field it receives, as the diagram illustrates. An
application may, if it
wishes, act on the contents of a message field before the entire FAST
message has been decoded.
OpenDDS is an open-source publish-and-subscribe data dissemination
framework that implements the OMG Data Distribution
Service (DDS)
specification. DDS is appropriate for distributed computing
systems in which the primary objective of the participants is the quick
and efficient dissemination of application data rather than access to
shared services. The set of suppliers and/or consumers of application
data may not be known at design time, and may change through the
execution lifetime of the application. Dissemination of financial
market data is a good match for OpenDDS's capabilities. The following
diagram illustrates basic OpenDDS communication on three topics:
OpenDDS applications use type-safe interfaces to communicate
application-defined structs directly between publishing and subscribing
processes. On the publishing side, the application creates a
type-specific DataWriter to write data on a topic; on the subscribing side,
the application
creates a DataReader for the same type, subscribing on the
same topic. Data samples are written from a DataWriter directly
to a DataReader;
there is no intervening broker for each sample written. The DataReader
only
receives data published on its requested topics.
OpenDDS also supports several Quality-of-Service (QoS) policies to
configure attributes such as reliability, persistence, the behavior of
a
late-joining subscribers, etc.
For more information on OpenDDS, see the OpenDDS web site and this introductory article.
In our example, we will simulate a market data feed, starting from the exchange and ending at a final data consumer which may be a trader's workstation, an algorithmic trading engine, a history database, or something else. We will send FAST data from our simulated exchange via UDP multicast, use QuickFAST to convert each FAST message into a C++ application struct understood by OpenDDS, and publish the resulting OpenDDS sample to any interested subscribers.
We will run two examples. First, we will demonstrate with a
simple data set that
is somewhat similar to ARCA data. Then, we will show a more
complex data set that is somewhat similar to CME data. Because actual
market data is copyrighted, we cannot provide
actual ARCA or CME data. So we will simulate it. The diagram
illustrates
the example's topology:
First, we define the format of our simple FAST messages with a FAST template definition. In the real world, such a template is usually defined by the exchange and made available to the exchange's clients. The template defines how data is compressed across FAST, and its definition depends on domain knowledge of the data being exchanged. The FAST protocol gains its efficiency by this out-of-band exchange of the compression template.
In this template file, we will define a total of four messages -- "Quote", "New", "Change", and "Remove". Plus, for our own example, we will define a "Done" message to indicate to the QuickFAST consumer when the message stream has stopped. We will send the FAST messages from a multicasting process to simulate the topology that a trading application would likely encounter -- the "Done" message tells the multicast receiver when then message stream has completed. The "Quote" template follows:
<?xml version="1.0"?>
<templates xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
<template name="Quote" id="1" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
<uInt16 name="MessageType" id="100">
<constant value="11"/>
</uInt16>
<uInt32 name="SequenceNumber" id="200">
<increment/>
</uInt32>
<uInt32 name="Timestamp" id="300">
<copy/>
</uInt32>
<uInt32 name="OrderId" id="400">
<copy/>
</uInt32>
<uInt32 name="Volume" id="500">
<copy/>
</uInt32>
<uInt32 name="Price" id="600">
<copy/>
</uInt32>
<uInt8 name="ExchangeId" id="700">
<copy/>
</uInt8>
<uInt16 name="SecurityId" id="800">
<copy/>
</uInt16>
<uInt8 name="SecurityType" id="900">
<copy/>
</uInt8>
<uInt8 name="SessionId" id="1000">
<copy/>
</uInt8>
</template>
You can see that the template has an identifier, and ten field instructions. The first field instruction is for a MessageType field, which is an unsigned 16-bit integer with a field operator that indicates that the field has a constant value of 11. Since it has a constant value, the MessageType field is never actually sent across the wire because its value can be determined by the knowing template's identifier. This is one of the ways in which FAST uses out-of-band knowledge of the data to minimize the amount of data sent over the wire.
The second field instruction is an unsigned 32-bit integer for the field named SequenceNumber. SequenceNumber has an "increment" field operator, which is another way of conserving bandwidth. When several Quote messages are encoded in the same UDP packet, the SequenceNumber value is only provided for the first message in the packet. On the receiving side, QuickFAST deduces the SequenceNumber for subsequent messages by incrementing its last known SequenceNumber.
The third field instruction is an unsigned 32-bit integer for the field named Timestamp. Timestamp has a "copy" field operator, which means that if the Timestamp field is not in a message, the value from the previously processed message should be copied. Thus, if several consecutive messages have the same Timestamp value, the Timestamp value is only sent across the wire once.
As you can see, one of the primary FAST compression techniques is to remember a value sent in a previous message instead of sending it across the wire. Those values are stored in what the FAST specification refers to as a Dictionary. By default, there is one global dictionary, and that dictionary contains a map of values using the field instruction name as a key. For example, the map has an entry for the Timestamp field's last value, and any message that has a Timestamp field can use that value. However, FAST template authors may take control over the scope of the dictionary by specifying a dictionary per template, per application type, or a user-defined dictionary. Template authors have a great deal of discretion over the behavior of the dictionary.
In the rest of the template file, we will show definitions for the "New", "Change", "Remove", and "Done" messages:
<template name="New" id="2" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
<uInt16 name="MessageType" id="100">
<constant value="12"/>
</uInt16>
<uInt32 name="SequenceNumber" id="200">
<increment/>
</uInt32>
<uInt32 name="Timestamp" id="300">
<copy/>
</uInt32>
<uInt32 name="OrderId" id="400">
<copy/>
</uInt32>
<uInt32 name="Volume" id="500">
<copy/>
</uInt32>
<uInt32 name="Price" id="600">
<copy/>
</uInt32>
<uInt8 name="ExchangeId" id="700">
<copy/>
</uInt8>
<uInt16 name="SecurityId" id="800">
<copy/>
</uInt16>
<uInt8 name="SecurityType" id="900">
<copy/>
</uInt8>
<uInt8 name="SessionId" id="1000">
<copy/>
</uInt8>
<uInt8 name="Side" id="1100">
<copy/>
</uInt8>
</template>
<template name="Change" id="3" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
<uInt16 name="MessageType" id="100">
<constant value="13"/>
</uInt16>
<uInt32 name="SequenceNumber" id="200">
<increment/>
</uInt32>
<uInt32 name="Timestamp" id="300">
<copy/>
</uInt32>
<uInt32 name="OrderId" id="400">
<copy/>
</uInt32>
<uInt32 name="Volume" id="500">
<copy/>
</uInt32>
<uInt32 name="Price" id="600">
<copy/>
</uInt32>
<uInt8 name="ExchangeId" id="700">
<copy/>
</uInt8>
<uInt16 name="SecurityId" id="800">
<copy/>
</uInt16>
<uInt8 name="SecurityType" id="900">
<copy/>
</uInt8>
<uInt8 name="SessionId" id="1000">
<copy/>
</uInt8>
<uInt8 name="Side" id="1100">
<copy/>
</uInt8>
</template>
<template name="Remove" id="4" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
<uInt16 name="MessageType" id="100">
<constant value="14"/>
</uInt16>
<uInt32 name="SequenceNumber" id="200">
<increment/>
</uInt32>
<uInt32 name="Timestamp" id="300">
<copy/>
</uInt32>
<uInt32 name="OrderId" id="400">
<copy/>
</uInt32>
<uInt8 name="ExchangeId" id="700">
<copy/>
</uInt8>
<uInt16 name="SecurityId" id="800">
<copy/>
</uInt16>
<uInt8 name="SecurityType" id="900">
<copy/>
</uInt8>
<uInt8 name="SessionId" id="1000">
<copy/>
</uInt8>
<uInt8 name="Side" id="1100">
<copy/>
</uInt8>
</template>
<template name="Done" id="99" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
<uInt16 name="MessageType" id="100">
<constant value="99"/>
</uInt16>
</template>
</templates>
Next, we need to generate test data to send from our simulated
exchange. We will use the QuickFAST Encoder to do that. We
have
created an encoder executable for the SimpleTemplates.xml
file called SimpleGenerator
.
The source code is in the src/Generators
directory.
The basic steps to encode FAST messages with QuickFAST are in the
following code snippet. The code in the src/Generators
directory
is structured a bit differently; there is a layer of
abstraction added to avoid code duplication
between the Simple and Complex examples, and
each example has a MessagePopulator class that generates test
messages containing simulated data. The simplified version is here:
QuickFAST::Codecs::XMLTemplateParser parser;
QuickFAST::Codecs::TemplateRegistryPtr templateRegistry =
parser.parse("./data/SimpleTemplates.xml");
QuickFAST::Codecs::Encoder encoder(templateRegistry);
// For each message...
QuickFAST::template_id_t templateId = <determine the message's template Id>
QuickFAST::Messages::FieldSet message(20); // allocate space for 20 message fields
// Destination for encoded message; this destination
// encodes the message into a string
QuickFAST::Codecs::DataDestinationString encodingDestination;
encoder.encodeMessage(encodingDestination,
templateId,
message);
// Repeat for each message
To generate FAST messages based on the SimpleTemplates.xml
template
file, execute the SimpleGenerator
as follows:
./bin/SimpleGenerator -t ./data/SimpleTemplates.xml -o ./data/simple30000.dat -n 30000
where ./data/simple30000.dat
is the generated FAST data
file, and 30000
is the number of FAST messages created in that data file. You can
list the command-line arguments of the SimpleGenerator via
./bin/SimpleGenerator -?
By default, the SimpleGenerator puts no more than 1024 bytes in each
message packet. That message size is configurable with the "-p"
command-line option. The purpose of the message packet is to
indicate when a group of messages will be multicast in the same UDP
multicast packet, which means that the Encoder's dictionary does not
need to be reset until all of the packet is full. Since packets
can be lost over UDP multicast, each new packet starts the decoder off
with an empty dictionary. The dictionary plays a significant role in
the FAST compression, so we do want to put as many messages into each
packet as we can before we reset the dictionary.
To indicate the start and end of a packet, the SimpleGenerator writes the length of the packet as a header preceding each set of FAST messages. The Multicaster process below uses that header to determine how many bytes are contained in the packet. Aside from reading the packet length, the Multicaster does not perform any processing on the FAST messages; it merely removes the length header and forwards the raw bytes to the Publishing process. The message length header is simply used to frame each multicast packet (we have to know when one packet ends and another begins), but it does not come into play in the decoding of the message.
The Multicaster is a simple process that reads in our FAST-generated data file, line by line, and multicasts each line of the FAST file using Boost Asio. The Multicaster can be executed as follows. However, you will want to start the OpenDDS publisher and subscriber first (see below) so the multicast FAST messages actually have somewhere to go.
./bin/Multicaster -f ./data/simple30000.dat -a 224.1.2.133 -p 13014 -s 1000
The "-f ./data/simple30000.dat"
arguments indicate the
source FAST data
file, which is the data file we generated above. The "-a
224.1.2.133"
and "-p 13014"
indicate the multicast
address and port,
and are optional.
The "-s 1000"
argument is a number of microseconds to
sleep between
multicast sends. Since UDP multicast is unreliable, it's not difficult
to throw enough messages at it to cause messages to be dropped. I have
noticed dropped packets on data sets larger than 8000 messages. A small
sleep slows the send down enough to enable all of the messages
to reach their destinations successfully. In an exchange, a typical
configuration is to multicast the same data feed on two multicast
addresses and let the decoding side arbitrate between the two feeds to
ensure that it does not miss any messages. Even with that setup, it's
valuable to test using realistic data rates to ensure that the packet
drop rate on each multicast feed is not problematic.
You can list the command-line arguments of the Multicaster via
./bin/Multicaster -?
The Multicaster code is in src/Multicaster/main.cpp
.
The Decoder/Publisher process receives multicast FAST message
packets, decodes them into QuickFAST messages, converts each decoded
messages into a C++ struct that can be published by OpenDDS, and
publishes to interested subscribers. The publisher's source code
is in src/Publishers/SimplePublisher.cpp
.
For the OpenDDS publish/subscribe portion of the application, we define
an IDL file with OpenDDS data types that map to the FAST message types
defined in the FAST template. That IDL file is located in idl/Simple.idl
.
We show the "Quote" data type below as an example. You can see how the "Quote" data type's fields map to the FAST message fields from the
FAST template above. There are analogous IDL structs for the "New",
"Change", and "Remove" data types.
module MiddlewareNewsBrief
{
#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::Quote"
struct Quote
{
unsigned short MessageType;
unsigned long SequenceNumber;
unsigned long Timestamp;
unsigned long OrderId;
unsigned long Volume;
unsigned long Price;
unsigned short SecurityId;
char ExchangeId;
char SecurityType;
char SessionId;
};
}; // module MiddlewareNewsBrief
There are three parts to the Decoder/Publisher. First, it receives multicast FAST messages and decodes them. To do that, we must give the Decoder the same FAST template file as the encoding process so each side understands what to provide and what to expect. We also provide a MessageBuilder to the decoder to process the decoded message. The MessageBuilder is application-specific; our example's MessageBuilder is a C++ template-based builder that takes each FAST message field and uses it to populate a C++ struct for publication across OpenDDS.
The code below illustrates how to set up the QuickFAST decoder to decode FAST messages and communicate with a MessageBuilder. Our MessageBuilder's details have not yet been presented.
std::string multicastAddress = // get multicast address from the command-line
size_t multicastPort = // get multicast port from the command-line
QuickFAST::Codecs::XMLTemplateParser parser;
QuickFAST::Codecs::TemplateRegistryPtr templateRegistry =
parser.parse("./data/SimpleTemplates.xml");
// The multicast decoder listens on the multicast address:port
QuickFAST::Codecs::MulticastDecoder decoder(templateRegistry,
multicastAddress,
"0.0.0.0",
multicastPort);
// MessageBuilder created for this example; will give more detail later
MiddlewareNewsBrief::CompositeMessageBuilder<QuickFAST::uint16> builder("MessageType");
// Hand the MessageBuilder to the Decoder
decoder.start(builder);
// Run the MulticastDecoder in one separate thread
decoder.run(1,false);
// Example builder will receive a "Done" message when the
// FAST stream is completely received
while (!builder.isDone()) {
boost::this_thread::sleep(boost::posix_time::seconds(1));
}
decoder.stop();
decoder.joinThreads();
Our CompositeMessageBuilder uses the "MessageType" field of each FAST
message to determine what kind of C++ struct to create from each
message, and forwards each message field to a specialized builder that
populates a matching C++ struct. For example, a FAST "Quote"
message causes a "MiddlwareNewsBrief::Quote" struct to be created and
populated, as illustrated by the diagram. Our MessageBuilder then
sends that C++ struct to an
OpenDDS DataWriter that writes the struct across OpenDDS.
The following code shows how the Quote message's fields are wired into the Quote message builder and attached to the CompositeMessageBuilder. The other message types are configured similarly.
// MessageBuilder created for this example; decides what type
// of C++ struct to populate based on the MessageType field
MiddlewareNewsBrief::CompositeMessageBuilder<QuickFAST::uint16>
builder("MessageType")
// Create a StructMessageBuilder for the Quote message type;
// The quoteDataWriter enables the builder to publish its Quote structs
// over OpenDDS; we will describe it later
MiddlewareNewsBrief::StructMessageBuilder<Quote>* quoteBuilder =
new MiddlewareNewsBrief::StructMessageBuilder<Quote>
(*(quoteDataWriter.get()));
boost::shared_ptr<MiddlewareNewsBrief::MessageBuilderBase>
quoteBuilderPtr(quoteBuilder);
// Add the Quote builder to the Composite; "11" is the value of the MessageType
// field for a FAST "Quote" message.
builder.addBuilder(11,quoteBuilderPtr);
// Also have StructMessageBuilders for New, Remove, Change messages
// Wire in all of the fields of the Quote struct, mapping each FAST field
// to a struct field
quoteBuilder->addFieldPopulator(
"MessageType",
100,
&MiddlewareNewsBrief::Quote::MessageType,
MiddlewareNewsBrief::UINT16());
quoteBuilder->addFieldPopulator(
"SequenceNumber",
200,
&MiddlewareNewsBrief::Quote::SequenceNumber,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"Timestamp",
300,
&MiddlewareNewsBrief::Quote::Timestamp,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"OrderId",
400,
&MiddlewareNewsBrief::Quote::OrderId,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"Volume",
500,
&MiddlewareNewsBrief::Quote::Volume,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"Price",
600,
&MiddlewareNewsBrief::Quote::Price,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"SecurityId",
800,
&MiddlewareNewsBrief::Quote::SecurityId,
MiddlewareNewsBrief::UINT16());
quoteBuilder->addFieldPopulator(
"SecurityType",
900,
&MiddlewareNewsBrief::Quote::SecurityType,
MiddlewareNewsBrief::UINT8());
quoteBuilder->addFieldPopulator(
"SessionId",
1000,
&MiddlewareNewsBrief::Quote::SessionId,
MiddlewareNewsBrief::UINT8());
The final configuration step of the Decoder/Publisher is the OpenDDS
initialization and configuration. For each FAST message received,
we publish a C++ struct over OpenDDS to any interested subscriber. The
subscriber then echoes the C++ struct sample back to the original
publishing process for accurate performance measurement. When we run
the example, we will use the UDP unicast transport to avoid TCP
backpressure
issues which affect the latency measurement numbers. Note that this is
where we define the quotePublisher
object that we passed
to the StructMessageBuilder<Quote>
above.
const int DOMAIN_ID = 8675309;
// Initialize, and create a DomainParticipantFactory
DDS::DomainParticipantFactory_var factory =
TheParticipantFactoryWithArgs(argc, argv);
// Create the DomainParticipant
DDS::DomainParticipant_var participant =
factory->create_participant(DOMAIN_ID,
PARTICIPANT_QOS_DEFAULT,
0,
0);
if (participant == 0)
{
std::cerr << "create_participant failed." << std::endl;
return -1;
}
// Create a publisher for the topics
const int TRANSPORT_IMPL_ID = 1;
DDS::Publisher_var publisher =
MiddlewareNewsBrief::DDSUtil::create_publisher(participant.in(),
TRANSPORT_IMPL_ID,
"Primary");
// Create a subscriber for the Echo of the two topics
const int TRANSPORT_IMPL_ID_2 = 2;
DDS::Subscriber_var subscriber =
MiddlewareNewsBrief::DDSUtil::create_subscriber(participant.in(),
TRANSPORT_IMPL_ID_2,
"Secondary");
// Initialize the DoneToken manager, which publishes a "done" token
MiddlewareNewsBrief::DoneTokenManager doneToken;
doneToken.initWriter(participant,publisher,subscriber);
//
// DataWriters, one per message type
//
typedef MiddlewareNewsBrief::Quote Quote;
boost::shared_ptr<MiddlewareNewsBrief::DataWriter<Quote> >
quoteDataWriter(new MiddlewareNewsBrief::DataWriter<Quote>());
quoteDataWriter->init(participant,
publisher,
subscriber,
&MiddlewareNewsBrief::Quote::Timestamp);
// Similar code for New, Change, Remove messages
The OpenDDS subscribing process subscribes to the four topics published by the OpenDDS publishing process. One topic publishes all Quote messages, one topic publishes all New messages, one publishes all Change messages, and one publishes all Remove messages. It is not necessary to map one topic to one DDS type, but that is what we have chosen to do in this simple example. Later, we will look at a more complex example where we use a topic to represent each security being published. The diagram illustrates how OpenDDS disseminates data on multiple topics:
The SimpleSubscriber code is located in src/Subscribers/SimpleSubscriber.cpp
.
The core of the SimpleSubscriber code is as follows:
const int DOMAIN_ID = 8675309;
//
// OpenDDS Init
//
// Initialize, and create a DomainParticipantFactory
DDS::DomainParticipantFactory_var factory =
TheParticipantFactoryWithArgs(argc, argv);
// Create the DomainParticipant
DDS::DomainParticipant_var participant =
factory->create_participant(DOMAIN_ID,
PARTICIPANT_QOS_DEFAULT,
0,
0);
if (participant == 0)
{
std::cerr << "create_participant failed." << std::endl;
return -1;
}
// Create a subscriber for the topics
const int TRANSPORT_IMPL_ID = 1;
DDS::Subscriber_var subscriber =
MiddlewareNewsBrief::DDSUtil::create_subscriber(participant.in(),
TRANSPORT_IMPL_ID,
"Primary");
// Create a publisher for the echo of the topics
const int TRANSPORT_IMPL_ID_2= 2;
DDS::Publisher_var publisher =
MiddlewareNewsBrief::DDSUtil::create_publisher(participant.in(),
TRANSPORT_IMPL_ID_2,
"Secondary");
// Initialize the DoneToken manager, which publishes a "done" token
MiddlewareNewsBrief::DoneTokenManager doneToken;
doneToken.initReader(participant,publisher,subscriber);
//
// DataReaders, one per message type
//
MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::Quote> quoteDR;
quoteDR.init(participant,publisher,subscriber,
&MiddlewareNewsBrief::Quote::Timestamp);
MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::New> newDR;
newDR.init(participant,publisher,subscriber,
&MiddlewareNewsBrief::New::Timestamp);
MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::Change> changeDR;
changeDR.init(participant,publisher,subscriber,
&MiddlewareNewsBrief::Change::Timestamp);
MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::Remove> removeDR;
removeDR.init(participant,publisher,subscriber,
&MiddlewareNewsBrief::Remove::Timestamp);
std::cout << "Finished subscribing; entering event loop" << std::endl;
//
// Process events
//
do
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
}
while(!doneToken.isDone());
We run the example in four steps. First, we start an OpenDDS DCPSInfoRepo repository process. This process connects DataWriters and DataReaders together, but does not come into play on each write of a data sample:
export
INFOREPO_HOST=<host where DCPSInfoRepo process is executed>
./bin/run_inforepo
Next, we start a subscribing process:
export
INFOREPO_HOST=<host where DCPSInfoRepo process is executed>
./bin/run_dcps_exe
SimpleSubscriber -tr udp
The run_dcps_exe
script is
a utility script that configures several OpenDDS command-line
arguments. The "-tr udp"
arguments tell that script to use the OpenDDS UDP unicast
transport. Other valid values are "tcp" and "mcast".
As with all of the example processes, passing an "-?" argument on the command line prints the usage message.
Next, we start the decoding/publishing process:
export
INFOREPO_HOST=<host where DCPSInfoRepo process is executed>
./bin/run_dcps_exe
SimplePublisher -t ./data/SimpleTemplates.xml -tr udp
The "-t ./data/SimpleTemplates.xml"
arguments provide the FAST template file for the QuickFAST decoder.
Both the QuickFAST encoder and decoder must have the
same FAST template file. The SimplePublisher, by default, listens
on the multicast address and port of 224.1.2.133:13014
.
Finally, we start a multicasting process to send encoded FAST messages
to the SimplePublisher. Recall that we generated a FAST data file
above with the SimpleGenerator. We will read that file now to
send
FAST message packets to the SimplePublisher. The Multicaster
multicasts on address:port of 224.1.2.133:13014
by default.
./bin/Multicaster -f ./data/simple30000.dat -s 1000
The SimplePublisher process should quickly print the message
Writing Done token
which indicates that all FAST messages have been received from the Multicaster, decoded, converted to C++ structs, and published through OpenDDS. The subscriber then echoes each sample back to the publisher, but the subscriber sleeps a bit between each echoed write so the measurement of round trip latency is not CPU bound. When the example completes, the publisher's output should look like this:
Elapsed time from first
start time to last end time: 2849393
name count
sum mean std_dev
recursions rsum
rmean rstd_dev
Echo<Remove>::on_data_availa()
7500 577690 77.025 15.790 0
Echo<Change>::on_data_availa()
7500 595248 79.366 14.779 0
Echo<New>::on_data_availa()
7500 619719 82.629 15.468 0
Echo<Quote>::on_data_availa()
7500 629871 83.983 16.630 0
<Remove>::endMessage()
7500 15594 2.079
0.281 0
<Change>::endMessage()
7500
16423
2.190
0.398
0
<New>::endMessage()
7500
16273
2.170
0.382
0
<Quote>::endMessage()
7500
16370
2.183
0.457
0
decoder.decode(source, builder)
1 440779
As mentioned above, this example was run across two SGI Intel Xeon
W5590 3.33GHz systems
running SUSE Enterprise Linux Server 11. These
results indicate that 30,000 FAST messages (7500 each of Quote,
New, Change, Remove) were decoded and
published across OpenDDS in 440,779 microseconds. The "*endMessage()
"
entries show the time to both decode a QuickFAST message and convert it
to a struct for publication across OpenDDS; those average about 2.1
microseconds. The "*on_data_avail()
" entries show the round-trip
latency to publish a message across OpenDDS and then echo the sample
back to the publisher. We divide those round-trip numbers in half to
get a measurement for OpenDDS latency, which is approximately 40
microseconds.
There are also simpler ways to run the publisher and subscriber. First, you can read the FAST data file directly into the publisher, skipping the Multicaster step:
./bin/run_dcps_exe SimpleSubscriber -tr udp
./bin/run_dcps_exe SimplePublisher -t ./data/SimpleTemplates.xml -f ./data/simple30000.dat -tr udp
Also, you may use the "-noecho" argument to disable the echo of the sample back to the publisher. The example runs a lot faster, but the latency measurement will not be accurate if the two processes are on different hosts:
./bin/run_dcps_exe SimpleSubscriber -tr udp -noecho
./bin/run_dcps_exe SimplePublisher -t ./data/SimpleTemplates.xml -f ./data/simple30000.dat -tr udp -noecho
Finally, you can measure the performance of the QuickFAST decoding all by itself by using an empty MessageBuilder with the SimplePublisher:
./bin/run_dcps_exe SimplePublisher -t ./data/SimpleTemplates.xml -f ./data/simple30000.dat -tr udp -emptyBuilder
We have observed QuickFAST decoding times under 500 nanoseconds for ARCA-sized data:
name count
sum
mean std_dev recursions
rsum rmean rstd_dev
EmptyBuilder::endMessage
30001 13455 0.448 0.506
0
decoder.decode(source, builder)
1
18006
In our second example, we will simulate a market data feed using more complex Fix-based data with many more fields and repeating groups. We will call these the "Complex" examples. Our FAST template file consists of two message types: a QuoteRequest and a MarketData.
This example differs from the "Simple" example in two key ways. First, the data is more complex, which means it will take longer to decode each FAST message, convert it into a C++ struct, and publish it across OpenDDS. Second, and more importantly, we will map the FAST messages to OpenDDS topics differently. The ComplexGenerator that we will use to generate test data generates data for 100 securities in its default setting. Each of our FAST messages contains data for several different securities in its MDEntries group. In our processing, we will split those FAST messages up on a per-security basis, and publish data for each security on a different OpenDDS topic. So, we will have 100 OpenDDS MarketData topics and 100 OpenDDS QuoteRequest topics, and we will split up the FAST messages to publish each security's information on the correct topic.
For illustration, the MarketData message is below, from the file
data/ComplexTemplates.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<templates xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
<template name="MarketData" id="1" reset="Y" dictionary="1"
xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
<string name="ApplVerID" id="1128">
<constant value="1.0"/>
</string>
<string name="MessageType" id="35">
<constant value="X"/>
</string>
<string name="SenderCompID" id="49">
<constant value="Test Exchange"/>
</string>
<uInt32 name="MsgSeqNum" id="34"/>
<uInt32 name="SendingTime" id="52"/>
<uInt32 name="TradeDate" id="75"/>
<sequence name="MDEntries">
<length name="NoMDEntries" id="268"/>
<uInt32 name="MDUpdateAction" id="279">
<copy value="1"/>
</uInt32>
<uInt32 name="MDPriceLevel" id="1023" presence="optional">
<default value="1"/>
</uInt32>
<string name="MDEntryType" id="269">
<copy value="0"/>
</string>
<uInt32 name="OpenCloseSettleFlag" id="286" presence="optional">
</uInt32>
<uInt32 name="SecurityIDSource" id="22">
<constant value="9"/>
</uInt32>
<uInt32 name="SecurityID" id="48">
<copy/>
</uInt32>
<uInt32 name="RptSeq" id="83">
<increment/>
</uInt32>
<decimal name="MDEntryPx" id="270">
<exponent>
<default value="0"/>
</exponent>
<mantissa>
<delta/>
</mantissa>
</decimal>
<uInt32 name="MDEntryTime" id="273">
<copy/>
</uInt32>
<int32 name="MDEntrySize" id="271" presence="optional">
<delta/>
</int32>
<uInt32 name="NumberOfOrders" id="346" presence="optional">
<delta/>
</uInt32>
<string name="TradingSessionID" id="336" presence="optional">
<default value="2"/>
</string>
<decimal name="NetChgPrevDay" id="451" presence="optional">
<exponent>
<default/>
</exponent>
<mantissa>
<delta/>
</mantissa>
</decimal>
<uInt32 name="TradeVolume" id="1020" presence="optional">
<default/>
</uInt32>
<string name="TradeCondition" id="277" presence="optional">
<default/>
</string>
<string name="TickDirection" id="274" presence="optional">
<default/>
</string>
<string name="QuoteCondition" id="276" presence="optional">
<default/>
</string>
<uInt32 name="AggressorSide" id="5797" presence="optional">
<default/>
</uInt32>
<string name="MatchEventIndicator" id="5799" presence="optional">
<default value="1"/>
</string>
</sequence>
</template>
The corresponding IDL for this FAST message is in the file idl/Complex.idl
,
as follows. You can see how each FAST field maps to a field in the IDL structs, and how the MarketDataEntry repeating group maps to the IDL MarketDataEntries
sequence:
module MiddlewareNewsBrief
{
typedef sequence<octet> Octets;
#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::MarketDataEntry"
#pragma DCPS_DATA_KEY "MiddlewareNewsBrief::MarketDataEntry securityID"
struct MarketDataEntry
{
unsigned long mdUpdateAction;
unsigned long mdPriceLevel;
string mdEntryType;
unsigned long openCloseSettleFlag;
unsigned long securityIDSource;
unsigned long securityID;
unsigned long rptSeq;
double mdEntryPx;
unsigned long mdEntryTime; // timestamp
long mdEntrySize;
unsigned long numberOfOrders;
string tradingSessionID;
double netChgPrevDay;
unsigned long tradeVolume;
string tradeCondition;
string tickDirection;
string quoteCondition;
unsigned long aggressorSide;
string matchEventIndicator;
};
typedef sequence<MarketDataEntry> MarketDataEntries;
#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::MarketData"
#pragma DCPS_DATA_KEY "MiddlewareNewsBrief::MarketData securityID"
struct MarketData
{
unsigned long securityID;
string applVersionID;
string messageType;
string senderCompID;
unsigned long msgSeqNum;
unsigned long sendingTime; // timestamp
unsigned long tradeDate;
MarketDataEntries mdEntries;
};
};
The OpenDDS publisher-side code is in src/Publishers/ComplexPublisher.cpp
;
the subscriber-side code is in
src/ComplexSubscriber.cpp.
Again, we need to generate FAST messages based on the
ComplexTemplates.xml
template
file. We execute the ComplexGenerator as follows:
./bin/ComplexGenerator -t ./data/ComplexTemplates.xml -o ./data/complex30000.dat -n 30000
where ./data/complex30000.dat
is the generated FAST data file, and 30000
is the number of FAST messages created in that data file. By
default, the ComplexGenerator generates FAST messages with an average
MarketDataEntries sequence length of 3, and where many messages have
more
than one security in the MDEntries sequence.
We can take control over that behavior in a couple of ways. First, we can change the average sequence length via
./bin/ComplexGenerator -seq <average sequence length> ...
We can also decide that each FAST message will only have information about one security in its MarketDataEntries sequence:
./bin/ComplexGenerator -nosplit ...
Both of these will affect the performance of the decoder and the latency of the OpenDDS publishing because larger messages take longer to decode and write, and using multiple securities in a FAST message maps the message to multiple OpenDDS writes instead of one in our example.
As always, you can find out all of the supported command-line arguments via
./bin/ComplexGenerator -?
We run the example in the same four steps. First, we start an OpenDDS DCPSInfoRepo repository process. This process connects DataWriters and DataReaders together, but does not come into play on each write of a data sample:
export INFOREPO_HOST=<host where DCPSInfoRepo process is executed>
./bin/run_inforepo
Next, we start a subscribing process using the UDP transport:
export INFOREPO_HOST=<host where DCPSInfoRepo process is executed>
./bin/run_dcps_exe
ComplexSubscriber -tr udp
Next, we start the decoding/publishing process, also using the UDP transport:
export
INFOREPO_HOST=<host where DCPSInfoRepo process is executed>
./bin/run_dcps_exe
ComplexPublisher -t ./data/ComplexTemplates.xml -tr udp
The ComplexPublisher, by default, listens on the multicast
address and port of 224.1.2.133:13014
.
Finally, we start a multicasting process to send encoded FAST messages
to the ComplexPublisher. Recall that the Multicaster multicasts on
address:port of 224.1.2.133:13014
by default.
./bin/Multicaster -f ./data/complex30000.dat -s 1000
The ComplexPublisher process should quickly print the message
Writing Done token
which indicates that all FAST messages have been received from the Multicaster. The publisher converts each FAST message into several DDS samples and publishes them to the subscriber. The subscriber then echoes each sample back to the publisher, but the subscriber sleeps a bit between each echoed write so the measurement of round trip latency is not CPU bound. When the example completes, the publisher's output should look like this:
Elapsed time from
first start time to last end time: 4496354
name
count sum mean
std_dev recursions rsum
rmean rstd_dev
Echo<QuoteRequest>::on_data_availa()
300 36772 122.573 25.739 0
Echo<MarketData>::on_data_availa()
41700 5419738 129.970 32.030 0
<MarketData>::endMessage()
41700 982462 23.560
11.711 0
<QuoteRequest>::endMessage()
300
1755
5.850
1.768
0
decoder.decode(source,
builder) 1 1220888
This example was also run across two SGI Intel Xeon W5590
3.33GHz systems. These results indicate that 30,000 FAST messages were
decoded into
42,000 C++ structs and published across OpenDDS as 42,000 writes in
1,220,888 microseconds. The "*endMessage"
entries show the time to both decode a QuickFAST message and convert it
to a each struct for publication across OpenDDS; as you can see,
handling the MarketDataRefresh structs is more time consuming due to
the
one-to-many mapping of FAST messages to structs. The
"*on_data_avail()"
entries show the round-trip
latency to publish each struct across OpenDDS and then echo the sample
back to the publisher. We divide those round-trip numbers in half to
get a measurement for OpenDDS latency, which is approximately 65
microseconds.
As before, you can measure the performance of the QuickFAST decoding all by itself by using an empty MessageBuilder with the SimplePublisher:
./bin/run_dcps_exe ComplexPublisher -t ./data/ComplexTemplates.xml -f ./data/complex30000.dat -tr udp -emptyBuilder
We have demonstrated that open source products such as QuickFAST and OpenDDS can be used to decode and disseminate market data quickly and efficiently. We have measured the behavior and performance of QuickFAST and OpenDDS for a low-latency market data feed, and have shown QuickFAST decoding latencies under 500 nanoseconds and OpenDDS publishing latencies as low as 40 microseconds running on high-quality commodity hardware across UDP over Infiniband. This suggests that opportunities for lowering latency exist in using native Infiniband communication across the network. OpenDDS has a pluggable transport framework designed for such a performance tuning situation.
Not only does the combination of QuickFAST and OpenDDS perform well, but it does so at a significant cost savings to proprietary software, and gives the user a level of control and visibility into the code that is not available from proprietary software. Savings realized from the use of open source software can be applied to other areas such as better hardware. QuickFAST and OpenDDS are professionally developed and commercially supported by OCI, a full-service software engineering, open source product and training company.
Object Computing, Inc. (OCI) is the leading provider of object-oriented technology training in the Midwest. Thousands of students participate in our training program every year. Targeted toward software engineers and the development community, our extensive program of over 50 hands-on workshops is delivered to corporations and individuals throughout the U.S. and internationally. OCI's Education Services include Private and Public Training and Lab Rentals. For more information visit ocitraining.com.
OCI offers downloads and commercial support for a variety of middleware technologies.
Copyright
©2010
Object Computing, Inc. All rights reserved.
OMG, CORBA, IIOP, and all OMG marks and logs are trademarks or registered
trademarks of Object Management Group, Inc. in the United States and/or
other countries.
Java and all
Java-based marks are trademarks or registered trademarks of Sun
Microsystems, Inc. in the United States and/or other countries.
.NET, C#, and .NET-based marks are trademarks or registered trademarks of Microsoft
Corporation in the United States and/or other countries.