![]() |
![]() |
![]() |
Applications that use the Data Distribution Service (DDS) typically have two elements in common:
Boilerplate code:
The sequence of steps to initialize the DDS framework,
and to create and destroy domain participants, is the same from project to project.
Simplifying the code to write for the application's skeleton reduces development time.
Knowledge of IDL and C++:
DDS implementations such as OpenDDS [1] are
written in C++ and require structures used as data samples to be described in
the Object Management Group's Interface Definition Language (IDL). Allowing a
developer to write code which uses DDS in their language of choice, rather than in
IDL and C++, can lead to a shorter learning curve and wider use of DDS as a
technology.
Parts I and II of this article explore one approach to making DDS simpler to use, and to make it accessible to a wider array of development languages. We shall use OpenDDS as our DDS implementation, and goals we shall achieve are as follows:
Create a C++-based wrapper for OpenDDS to simplify written code to consist solely of publish and subscribe functions.
Provide a means to automatically generate these wrappers from user-defined data types (in Part II).
Provide a layer which exposes the generated code to the .NET world.
Demonstrate how the .NET interface to OpenDDS can be used by code written in arbitrary .NET languages.
While this article focuses on a particular wrapping of OpenDDS functionality that is made available for software using the .NET Framework, this same technique is applicable in a wider sense — code generators could be written to encapsulate OpenDDS for other target languages and environments of choice.
While OpenDDS is described in detail in [2] and elsewhere, a short review of the major elements is beneficial.
Given the definition of a data sample expressed in IDL, the OpenDDS IDL compiler tool chain generates several code elements. The elements of interest for this article include the following:
A structure that provides a C++ representation of the member variables of the data sample.
An associated type support class which provides operations such as duplication,
narrowing, and registration of the type in a DomainParticipant.
Specializations of DataReader and DataWriter for the
data sample.
Data samples are not used in isolation — they are the data representation of a topic,
associated with a DDS domain. In OpenDDS, given a numeric DomainId
that represents a DDS domain, the application creates a DomainParticipant for that DomainId. The generated
type support class for the data sample is used to register the type with the
DomainParticipant, and, given a topic name, the create_topic() method of
the DomainParticipant associates the type with the topic.
Through the factory TheTransportFactory the application creates a transport, such
as SimpleTcp or multicast, used for the transmission of data samples.
If an application publishes data samples, the DomainParticipant is used to
create a DDS Publisher. The Publisher is attached to a transport,
and then used to create a DataWriter for a particular topic. The DataWriter
is narrowed to a DataWriter specific for the data sample type (as generated by
the OpenDDS IDL compiler tool chain), and its write() method is used to publish a data sample.
If an application consumes data samples, the DomainParticipant is used to
create a DDS Subscriber. The Subscriber is attached to a transport,
and then used to create a DataReader for a particular topic. The DataReader
is narrowed to a DataReader specific for the data sample type (as generated by
OpenDDS IDL compiler tool chain), and the on_data_available() method of a listener associated
with the DataReader is called as data samples arrive.
Upon application termination, all entities derived from a given DomainParticipant
must be deleted before the DomainParticipant itself is. When all DomainParticipants
have been freed, TheTransportFactory must be released, and TheServiceParticipant
shut down.
We wish to distill the complexity described in the previous section into a simple
publish and subscribe interface, both for conceptual understanding
and later code generation. Features of the wrapper will be highlighted
here — the full implementation is available in the Common library of the
code archive that
accompanies this article.
To begin the wrapper, we will create a C++ class, DDSBase, with
two goals in mind. The first is to manage initialization and cleanup — these
will be performed in the constructor and destructor of the class, respectively,
as per the RAII programming idiom [3]. The
other is to keep track of the various DDS entities — when an entity is to be
created, a list is first consulted to see if the entity has been created before. If
it has, it is retrieved and a reference to it returned to the caller.
Otherwise, the entity is created, added to the list, and a reference returned to
the caller.
In the code below, the TheParticipantFactoryWithArgs() function
is called in the constructor of DDSBase to initialize OpenDDS.
A std::map is used to store DomainParticipants that
have been created, and the contents of this map are freed in the destructor of
DDSBase. The destructor also frees TheTransportFactory
and TheServiceParticipant.
// Common/DDSBase.h
class DDSBase {
typedef std::map<::DDS::DomainId_t, DDS::DomainParticipant_var>
DomainParticipantMap;
DDS::DomainParticipantFactory_var dpf_;
DomainParticipantMap participantMap_;
public:
DDSBase(int argc, ACE_TCHAR *argv[]) {
dpf_ = TheParticipantFactoryWithArgs(argc, argv);
}
~DDSBase() {
for (DomainParticipantMap::iterator it=participantMap_.begin();
it!=participantMap_.end(); it++) {
it->second->delete_contained_entities();
dpf_->delete_participant(it->second.in());
}
TheTransportFactory->release();
TheServiceParticipant->shutdown();
}
Next, we provide a method to retrieve a DomainParticipant, given
a DomainId. As per the pattern described previously, if a matching participant
already exists in the participant map, it is returned, else a new one is created.
The DomainParticipant is
returned as a _ptr instead of a _var because the lifetime
of the participant is managed by this class.
We will assume that the default quality of service (QoS) for each DDS entity,
such as the DomainParticipant here, is sufficient for our purposes.
Adding QoS policies would be straightforward (i.e., provide a list of QoS policies to the method
to apply, where the list becomes part of the map's key to distinguish otherwise identical
entities), but the exercise is left for the reader.
protected:
DDS::DomainParticipant_ptr GetParticipant(::DDS::DomainId_t domainId) {
DomainParticipantMap::iterator it=participantMap_.find(domainId);
if (it==participantMap_.end()) {
::DDS::DomainParticipant_ptr participant = dpf_->create_participant(
domainId,
PARTICIPANT_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == participant)
throw DDSException("DDSBase::GetParticipant(): create_participant failed");
participantMap_[domainId] = participant;
}
return ::DDS::DomainParticipant::_duplicate(participantMap_[domainId].in());
}
This method does both the search for a previously-created
DomainParticipant, and creating it if not found, as described above. The methods for the
creation of DataReaders and DataWriters, however, will
be performed in two stages. The CreateReader() and CreateWriter()
methods represent the "create if missing" aspect, while the search for a previously
created one will be performed elsewhere.
We also define a class, DDSException, a subclass of std::exception,
to represent errors in the process.
// Common/DDSBase.h
class DDSException : public std::exception {
public:
DDSException(const char *const& _What) : std::exception(_What) {}
};
Continuing with class DDSBase, we desire methods to be able to create
any DataReader and DataWriter, so must be templatized. As
many unrelated types are needed, they can be collected into a TypeTraits
class, identified by a single template parameter that is representative of the
collection. This technique is also demonstrated in the code of an earlier Middleware
News Brief. [4]
In the file TypeTraits.h we define a templated struct, TypeTraits,
to reference the various DDS entities associated with a given data sample structure. Unlike
in the aforementioned MNB, however, we define this template via a macro to reduce the amount of
code that needs to be replicated later in the application.
// TypeTraits.h
template <typename DDS_STRUCT_T>
struct TypeTraits
{
};
#define DEFINE_TYPETRAITS(TYPE) \
template <> \
struct TypeTraits<TYPE> \
{ \
typedef TYPE##TypeSupport TypeSupport; \
typedef TYPE##TypeSupportImpl TypeSupportImpl; \
typedef TYPE##DataWriter Writer; \
typedef TYPE##DataReader Reader; \
};
Returning to DDSBase, we can now define CreateWriter()
and CreateReader() methods with just one template parameter which then
provides access to the additional types. The return from CreateWriter()
is a wrapper around a type-specific
DataWriter for the given domain and topic.
template <typename TStructure>
Writer<TStructure> CreateWriter(::DDS::DomainId_t domainId,
const char *topic_name) {
We obtain the DomainParticipant for the domain, creating it if needed.
DDS::DomainParticipant_var participant = GetParticipant(domainId);
Next, we use the type support class to register the type in the domain, and to provide the type name for topic creation. Upon failure, an exception will be thrown.
// Create Topic
typename TypeTraits<TStructure>::TypeSupport::_var_type ts =
new TypeTraits<TStructure>::TypeSupportImpl();
if (ts->register_type(participant, "") != DDS::RETCODE_OK)
throw DDSException("DDSBase::CreateWriter(): register_type failed");
CORBA::String_var type_name = ts->get_type_name();
DDS::Topic_var topic =
participant->create_topic(topic_name,
type_name.in(),
TOPIC_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == topic)
throw DDSException("DDSBase::CreateWriter(): create_topic failed");
The Publisher is then created, with a default quality of service.
// Create Publisher
DDS::Publisher_var publisher =
participant->create_publisher(PUBLISHER_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == publisher)
throw DDSException("DDSBase::CreateWriter(): create_publisher failed");
We now create a transport that uses the SimpleTcp protocol, and
attach it to the Publisher — hard-coding a particular transport is a
way to simplify the wrapper's interface, but could be parameterized if needed.
As SimpleTcp provides reliable sample delivery, it is an appropriate choice.
// Initialize and attach Transport
OpenDDS::DCPS::TransportImpl_rch transport_impl =
TheTransportFactory->create_transport_impl(
OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
OpenDDS::DCPS::AUTO_CONFIG);
if (transport_impl->attach(publisher.in()) != OpenDDS::DCPS::ATTACH_OK)
throw DDSException("DDSBase::CreateWriter(): transport_impl->attach failed");
Finally, we create the type-specific DataWriter by creating a
DataWriter which is narrowed to the desired type. The DataWriter
is then returned to the caller.
// Create DataWriter
DDS::DataWriter_var writer =
publisher->create_datawriter(topic.in(),
DATAWRITER_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == writer)
throw DDSException("DDSBase::CreateWriter(): create_datawriter failed");
return Writer<TStructure>(writer.in());
}
The CreateReader() method will complete the DDSBase
class. As with CreateWriter(), we want this function to be generic as well.
Two template parameters are needed — the key for the TypeTraits, and
an additional type, representing a listener for the DataReader
is provided, which will be described later in this article.
template <typename TStructure, typename TListener>
Reader<TStructure, TListener> CreateReader(
::DDS::DomainId_t domainId, const char *topic_name,
TListener *listener) {
Code to obtain a DomainParticipant and to register the type are the same
as in CreateWriter().
DDS::DomainParticipant_var participant = GetParticipant(domainId);
// Create Topic
typename TypeTraits<TStructure>::TypeSupport::_var_type ts =
new TypeTraits<TStructure>::TypeSupportImpl();
if (ts->register_type(participant, "") != DDS::RETCODE_OK)
throw DDSException("DDSBase::CreateReader(): register_type failed");
CORBA::String_var type_name = ts->get_type_name();
DDS::Topic_var topic =
participant->create_topic(topic_name,
type_name.in(),
TOPIC_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == topic)
throw DDSException("DDSBase::CreateReader(): create_topic failed");
We now perform actions in parallel to those in CreateWriter(), but
as a subscriber — create a Subscriber, and create and attach the
SimpleTcp transport.
// Create Subscriber
DDS::Subscriber_var subscriber =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == subscriber)
throw DDSException("DDSBase::CreateReader(): create_subscriber failed");
// Initialize and attach Transport
OpenDDS::DCPS::TransportImpl_rch transport_impl =
TheTransportFactory->create_transport_impl(
OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
OpenDDS::DCPS::AUTO_CONFIG);
OpenDDS::DCPS::AttachStatus status = transport_impl->attach(subscriber.in());
if (status != OpenDDS::DCPS::ATTACH_OK)
throw DDSException("DDSBase::CreateReader(): transport_impl->attach failed");
We create a DataReader, specifying the listener, and a non-default
quality of service. The RELIABILITY QoS policy is set to RELIABLE, instead of
the default of BEST_EFFORT. Later, we will use wait_for_acknowledgements()
for the publisher to ensure that all published data has been received, and this operation
requires that a DataReader must be RELIABLE. As of this writing (OpenDDS v2.1.3),
the OpenDDS Developer's Guide shows the Messenger application using
wait_for_acknowledgements() with a BEST_EFFORT DataReader,
which is not correct.
// Create DataReader - set RELIABLE reliability
DDS::DataReaderQos dr_qos;
if (subscriber->get_default_datareader_qos(dr_qos) != ::DDS::RETCODE_OK)
throw DDSException("DDSBase::CreateReader(): get_default_datareader_qos failed");
dr_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
DDS::DataReader_var reader =
subscriber->create_datareader(topic.in(),
dr_qos,
listener,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == reader)
throw DDSException("DDSBase::CreateReader(): create_datareader failed");
Finally, we return a wrapped, narrowed DataReader to the caller.
return Reader<TStructure, TListener>(reader, listener);
}
};
As with CreateReader() and CreateWriter(), at this
level we also want the listener for the DataReader to be generic, so
it too is templatized and has a data processing method which must be overridden in
a subclass.
The DataReaderListenerImplBase class also provides empty implementations
for various listener methods that are not needed by the wrapper.
// Common/DataReaderListenerImplBase.h
template <typename TDataReader, typename TStructure>
class DataReaderListenerImplBase
: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
public:
virtual void on_requested_deadline_missed(
DDS::DataReader_ptr,
const DDS::RequestedDeadlineMissedStatus&) {}
virtual void on_requested_incompatible_qos(
DDS::DataReader_ptr,
const DDS::RequestedIncompatibleQosStatus&) {}
virtual void on_sample_rejected(
DDS::DataReader_ptr,
const DDS::SampleRejectedStatus&) {}
virtual void on_liveliness_changed(
DDS::DataReader_ptr,
const DDS::LivelinessChangedStatus&) {}
virtual void on_subscription_matched(
DDS::DataReader_ptr,
const DDS::SubscriptionMatchedStatus&) {}
virtual void on_sample_lost(
DDS::DataReader_ptr,
const DDS::SampleLostStatus&) {}
The data sample processing is handled by on_data_available(). If
a data sample is successfully received from OpenDDS, and valid_data is true,
it is passed to Process()
to be consumed by the application. The
valid_data flag is true only when data is associated with the sample, as the
sample that was taken may only represent a change of instance state, and not
actual data.
The sample processing proceeds until take_next_sample()
returns DDS::RETCODE_NO_DATA, or an error has occurred. Only samples
containing valid data will be processed — for instance, since
valid_data is false for a disposed sample, disposed samples
will be ignored. A while loop is used as performance is improved if all samples
currently available are taken, rather than taking just one sample on each invocation
of on_data_available.
virtual void Process(const TStructure &) = 0;
virtual void on_data_available(DDS::DataReader_ptr reader) {
typename TDataReader::_var_type reader_i = TDataReader::_narrow(reader);
// if an exception is thrown here, it is consumed by the transport thread,
// so ignore narrowing errors at this time
if (0 == reader_i)
return;
TStructure sample;
DDS::SampleInfo info;
DDS::ReturnCode_t status = reader_i->take_next_sample(sample, info);
while (status == DDS::RETCODE_OK) {
if (info.valid_data)
Process(sample);
status = reader_i->take_next_sample(sample, info);
}
}
};
CreateReader() and CreateWriter() return wrappers
for the corresponding DDS entities. These wrappers maintain a reference to the
contained entity, and provide a minimal interface for interacting with the
wrapped entity, to keep things simple.
As the Writer wrapper for a DataWriter is specific
to the type of DataWriter corresponding to the data sample type being
written, it must be parameterized with the appropriate TypeTraits
key.
// Common/Writer.h
template <typename TStructure>
class Writer {
typedef typename TypeTraits<TStructure>::Writer WriterType;
typename WriterType::_var_type writer_i;
public:
Writer() : writer_i(0) {}
Writer(DDS::DataWriter_ptr writer) :
writer_i(WriterType::_duplicate(WriterType::_narrow(writer))) {}
For aid in testing, helper functions are used to manage publication. These
methods are based
on ones used in various OpenDDS tests that are part of the OpenDDS software
distribution. WaitForSubscriber() does not
return until at least one
subscriber has been matched with this publisher — this allows a publisher
to block until a subscriber is available to receive the data samples.
The code waits on a "publication matched" condition, where the wait continues
until at least a match count of one is achieved.
bool WaitForSubscriber() {
// Block until Subscriber is available
DDS::StatusCondition_var condition = writer_i->get_statuscondition();
condition->set_enabled_statuses(DDS::PUBLICATION_MATCHED_STATUS);
DDS::WaitSet_var ws = new DDS::WaitSet;
ws->attach_condition(condition);
DDS::ConditionSeq conditions;
DDS::PublicationMatchedStatus matches = { 0, 0, 0, 0, 0 };
DDS::Duration_t timeout = { 30, 0 }; // 30 seconds
do {
if (ws->wait(conditions, timeout) != DDS::RETCODE_OK)
return false;
if (writer_i->get_publication_matched_status(matches) != ::DDS::RETCODE_OK)
return false;
} while (matches.current_count < 1);
ws->detach_condition(condition);
return true;
}
Conversely, WaitForAcknowledgements() can be used for clean
termination of a publisher to ensure that all data samples have been received
by a subscriber.
bool WaitForAcknowledgements() {
// Wait for samples to be acknowledged
DDS::Duration_t timeout = { 30, 0 }; // 30 seconds
return (writer_i->wait_for_acknowledgments(timeout) == DDS::RETCODE_OK);
}
Finally, Write() is used to publish a data sample.
register_instance is used to obtain an instance handle, to
improve publication performance.
DDS::ReturnCode_t Write(const TStructure &s) {
return writer_i->write(s, ::DDS::HANDLE_NIL);
}
};
The structure of the Reader wrapper for a DataReader
is analagous to that of Writer. It also stores the specific
DataReader type, so must be templatized.
// Common/Reader.h
template <typename TStructure, typename TListener>
class Reader {
typedef typename TypeTraits<TStructure>::Reader ReaderType;
typename ReaderType::_var_type reader_i;
std::tr1::shared_ptr<TListener> listener_;
public:
Reader() {}
Reader(DDS::DataReader_ptr reader, TListener *listener) :
reader_i(ReaderType::_duplicate(ReaderType::_narrow(reader))),
listener_(listener) {}
TListener *GetListener() { return listener_.get(); }
Reader provides a helper function, WaitForPublisherToComplete(),
to ensure that a publisher has disconnected — the code loops until the count of matched
subscriptions is zero.
bool WaitForPublisherToComplete() {
// Block until Publisher completes
DDS::StatusCondition_var condition = reader_i->get_statuscondition();
condition->set_enabled_statuses(DDS::SUBSCRIPTION_MATCHED_STATUS);
DDS::WaitSet_var ws = new DDS::WaitSet;
ws->attach_condition(condition);
DDS::ConditionSeq conditions;
DDS::SubscriptionMatchedStatus matches = { 0, 0, 0, 0, 0 };
DDS::Duration_t timeout = { 30, 0 }; // 30 seconds
do {
if (ws->wait(conditions, timeout) != DDS::RETCODE_OK)
return false;
if (reader_i->get_subscription_matched_status(matches) != DDS::RETCODE_OK)
return false;
} while (matches.current_count > 0);
ws->detach_condition(condition);
return true;
}
};
Although Writer provided a Write() method, no
corresponding Read() method is needed — data samples are
retrieved by the listener that was associated with the contained
DataReader.
One more generic class is part of the wrapper — a generic container
for Reader and Writer objects. The EntityMap
stores entities based on their associated domain and topic. This class
manages a map of maps — given a DomainId, a map is retrieved
which associates topics with entities. If the desired entity is not found,
a generic Create() function is called to instantiate it, before
it is added to the map and returned to the caller.
// Common/EntityMap.h
template <typename TEntity>
class EntityMap {
typedef std::map<std::string, TEntity> TTopicEntityMap;
typedef std::map<::DDS::DomainId_t, TTopicEntityMap> TDomainEntityMap;
TDomainEntityMap map_;
protected:
virtual TEntity Create(DDSBase *ddsBase, ::DDS::DomainId_t domainId,
const char *topic_name) = 0;
public:
TEntity Get(DDSBase *ddsBase,::DDS::DomainId_t domainId, const char *topic_name) {
TDomainEntityMap::iterator itDomain = map_.find(domainId);
if (itDomain == map_.end())
// domain entry not found, so create topic map and entity
map_[domainId][topic_name] = Create(ddsBase, domainId, topic_name);
else {
TTopicEntityMap::iterator itTopic = map_[domainId].find(topic_name);
if (itTopic == map_[domainId].end())
// domain found, but topic not found, so create entity
map_[domainId][topic_name] = Create(ddsBase, domainId, topic_name);
}
return map_[domainId][topic_name];
}
};
Now that the generic wrapper has been described, we can see how to use in a
specific case — that of the Messenger application as described in the
OpenDDS Developer's Guide [5].
As defined in Messenger.idl in the Messenger_IDL
project, the structure representing the Message
type of data sample is as follows:
// Messenger_IDL/Messenger.idl
module Messenger {
#pragma DCPS_DATA_TYPE "Messenger::Message"
#pragma DCPS_DATA_KEY "Messenger::Message subject_id"
struct Message {
string from;
string subject;
long subject_id;
string text;
long count;
};
};
After compiling Messenger.idl with the OpenDDS IDL compiler tool chain,
a number of files are produced that contain various generated entities.
MessengerC.h contains a definition of struct Message,
in the Messenger namespace, which is an IDL-to-C++ mapping
of the Message IDL structure. The Message structure
is as follows:
// MessengerC.h
namespace Messenger
{
...
struct Message
{
typedef Message_var _var_type;
typedef Message_out _out_type;
TAO::String_Manager from;
TAO::String_Manager subject;
::CORBA::Long subject_id;
TAO::String_Manager text;
::CORBA::Long count;
};
...
}
The file MessengerTypeSupportC.h provides definitions of
classes MessageTypeSupport, MessageDataReader, and
MessageDataWriter, also in the Messenger
namespace. Class MessageTypeSupport provides
the type suppport for the Message structure, and
MessageDataReader and MessageDataWriter
provide Message-based specializations of the DDS entities
DataReader and DataWriter respectively. It is
these types that we will use with the generic wrapper to provide
a simplified publish and subscribe interface for Messenger
data samples.
To provide a concrete wrapper, we create the project Messenger_CPP_DDSImpLib,
containing the files generated by the OpenDDS IDL compiler tool chain from Messenger.idl, plus DDSImpl.h
and DDSImpl.cpp. As our objective is to provide a simple interface
for applications that use the .NET Framework, we begin by defining a .NET
version of the Message structure, and functions to convert from the unmanaged
C++ representation to the managed, C++/CLI one. In DDSImpl.h,
we define the class MessageNet, with corresponding .NET types.
For clarity, we also maintain a parallel namespace/class representation.
// Messenger_CPP_DDSImpLib/DDSImpl.h
namespace MessengerNet {
public value class MessageNet
{
public:
System::String^ from;
System::String^ subject;
System::Int32 subject_id;
System::String^ text;
System::Int32 count;
};
}
TypeTraits must be defined for the Message structure,
and it is done with one line of code:
DEFINE_TYPETRAITS(Messenger::Message)
We add prototypes for the functions to convert from the Messenger
to the MessengerNet types, and vice versa.
// Messenger_CPP_DDSImpLib/DDSImpl.h Messenger::Message Convert(const gcroot<MessengerNet::MessageNet> &sampleParam); gcroot<MessengerNet::MessageNet> Convert(const Messenger::Message &sample);
These conversion functions are implemented in DDSImpl.cpp and are
simple copies from one representation to the other.
// Messenger_CPP_DDSImpLib/DDSImpl.cpp
Messenger::Message Convert(const gcroot<MessengerNet::MessageNet> &sampleNetParam) {
MessengerNet::MessageNet sampleNet = (MessengerNet::MessageNet)sampleNetParam;
Messenger::Message sample;
sample.subject_id = sampleNet.subject_id;
sample.from = Convert(sampleNet.from);
sample.subject = Convert(sampleNet.subject);
sample.text = Convert(sampleNet.text);
sample.count = sampleNet.count;
return sample;
}
gcroot<MessengerNet::MessageNet> Convert(const Messenger::Message &sample) {
MessengerNet::MessageNet sampleNet;
sampleNet.subject_id = sample.subject_id;
sampleNet.from = Convert(sample.from);
sampleNet.subject = Convert(sample.subject);
sampleNet.text = Convert(sample.text);
sampleNet.count = sample.count;
return sampleNet;
}
The string conversion functions are available in the Common library
in StringConvert.[h,cpp].
We now implement subclasses of the generic C++ wrapper classes, specialized
for the Messenger type. First, we subclass DataReaderListenerImplBase
such that the processing of a data sample causes it to be fired as a .NET event.
For more information on this technique, or for the use of OpenDDS with .NET in
general, see [6]. The Process() method is
overridden from the base class to post the data sample as an event, and an AddHandler()
method is provided to allow a .NET event handler to be attached to the listener. That is,
when the event is posted, the handler is invoked with the event as a parameter. The
event support classes are implemented in Common.cpp in the Common
library.
// Messenger_CPP_DDSImpLib/DDSImpl.h
class MessengerMessageDataReaderListenerImpl :
public DataReaderListenerImplBase<Messenger::MessageDataReader, Messenger::Message> {
gcroot<EventManager<MessengerNet::MessageNet>^> eventManager_;
public:
MessengerMessageDataReaderListenerImpl(
gcroot<EventManager<MessengerNet::MessageNet>^> eventManager) :
eventManager_(eventManager) {}
void Process(const Messenger::Message &sample) {
eventManager_->Process(eventManager_,
gcnew ProcessEventArgs<MessengerNet::MessageNet>(Convert(sample)));
}
void AddHandler(gcroot<EventManager<MessengerNet::MessageNet>::ProcessEventHandler^> handler) {
eventManager_->Process += handler;
}
};
Next, we implement subclasses of the EntityMap class to manage
instances of MessageDataReader and MessageDataWriter.
The map lookup logic is provided in the base class, but the Create()
methods are overloaded to instantiate the correct type.
// Messenger_CPP_DDSImpLib/DDSImpl.h
typedef Writer<Messenger::Message> MessengerMessageDataWriter;
typedef Reader<Messenger::Message, MessengerMessageDataReaderListenerImpl>
MessengerMessageDataReader;
class MessengerMessageDataWriterMap : public EntityMap<MessengerMessageDataWriter> {
virtual MessengerMessageDataWriter Create(DDSBase *ddsBase,
::DDS::DomainId_t domainId, const char *topic_name) {
return ddsBase->CreateWriter<Messenger::Message>(domainId, topic_name);
}
};
class MessengerMessageDataReaderMap : public EntityMap<MessengerMessageDataReader> {
virtual MessengerMessageDataReader Create(DDSBase *ddsBase,
::DDS::DomainId_t domainId, const char *topic_name) {
return ddsBase->CreateReader<Messenger::Message>(domainId, topic_name,
new MessengerMessageDataReaderListenerImpl(
gcnew EventManager<MessengerNet::MessageNet>()));
}
};
We now create a subclass of DDSBase that is specialized for
the Messenger type. The implementation of each of the methods
is a one-liner delegation to the appropriate contained entity.
// Messenger_CPP_DDSImpLib/DDSImpl.h
class DDSImpl : public DDSBase {
MessengerMessageDataWriterMap mapMessengerMessageDataWriter_;
MessengerMessageDataReaderMap mapMessengerMessageDataReader_;
public:
DDSImpl(int argc, ACE_TCHAR *argv[]) : DDSBase(argc, argv) {}
DDS::ReturnCode_t Publish(::DDS::DomainId_t domainId, const char *topic_name,
gcroot<MessengerNet::MessageNet> sample) {
return mapMessengerMessageDataWriter_.Get(
this, domainId, topic_name).Write(Convert(sample));
}
bool MessengerMessageWaitForSubscriber(::DDS::DomainId_t domainId,
const char *topic_name) {
return mapMessengerMessageDataWriter_.Get(
this, domainId, topic_name).WaitForSubscriber();
}
bool MessengerMessageWaitForAcknowledgements(::DDS::DomainId_t domainId,
const char *topic_name) {
return mapMessengerMessageDataWriter_.Get(
this, domainId, topic_name).WaitForAcknowledgements();
}
void Subscribe(::DDS::DomainId_t domainId, const char *topic_name,
gcroot<EventManager<MessengerNet::MessageNet>::ProcessEventHandler^> handler) {
mapMessengerMessageDataReader_.Get(
this, domainId, topic_name).GetListener()->AddHandler(handler);
}
bool MessengerMessageWaitForPublisherToComplete(::DDS::DomainId_t domainId,
const char *topic_name) {
return mapMessengerMessageDataReader_.Get(
this, domainId, topic_name).WaitForPublisherToComplete();
}
};
One more level of indirection is needed. The class DDSImpl provides
a simple publish/subscribe interface for C++, but the method parameters are not
compatible with the .NET Framework Common Type System [7].
A .NET ref class DDSNet wraps DDSImpl and provides the required
interface.
// Messenger_CPP_DDSImpLib/DDSImpl.h
public ref class DDSNet {
DDSImpl *pDDSImpl;
public:
DDSNet() {
// convert .NET arguments to standard argc/argv
int argc;
char **argv;
GetArguments(argc, argv);
pDDSImpl = new DDSImpl(argc, argv);
}
~DDSNet() {
delete pDDSImpl;
}
int Publish(int domainId, System::String^ topic_name, MessengerNet::MessageNet sample) {
NET_RETHROW_EXCEPTION(return pDDSImpl->Publish(domainId, \
ConvertToString(topic_name).c_str(), sample));
}
bool MessengerMessageWaitForSubscriber(int domainId, System::String^ topic_name) {
NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForSubscriber(domainId, \
ConvertToString(topic_name).c_str()));
}
bool MessengerMessageWaitForAcknowledgements(int domainId, System::String^ topic_name) {
NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForAcknowledgements(domainId, \
ConvertToString(topic_name).c_str()));
}
void Subscribe(int domainId, System::String^ topic_name,
EventManager<MessengerNet::MessageNet>::ProcessEventHandler^ handler) {
NET_RETHROW_EXCEPTION(pDDSImpl->Subscribe(domainId, \
ConvertToString(topic_name).c_str(), handler));
}
bool MessengerMessageWaitForPublisherToComplete(int domainId, System::String^ topic_name) {
NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForPublisherToComplete(domainId, \
ConvertToString(topic_name).c_str()));
}
};
NET_RETHROW_EXCEPTION is a macro to aid in rethrowing exceptions raised
by the unmanaged code as managed, .NET exceptions, where:
// Common/NetException.h
#define NET_RETHROW_EXCEPTION(x) \
try { \
x; \
} \
catch(std::exception &e) { \
throw gcnew DDSNetException(gcnew System::String(e.what())); \
}\
and:
// Common/NetException.cpp
public ref class DDSNetException : public Exception {
public:
DDSNetException(String^ what) : Exception(what) {}
};
The diagram below illustrates the relationships between the classes that were developed above. Please click on the image for a larger version.
Now that the .NET interface has been established, as a test, we can replicate
the behavior of the C++ Messenger sample in C#. In the
Messenger_CS_Publisher project, we create the file
Publisher.cs, and begin a standard console-mode C# application.
Although this application does not use COM interop, Main() is
marked with [STAThread] to set a single threaded apartment
COM threading model, which is emitted by default by the Visual Studio
project generator.
// Messenger_CS_Publisher/Publisher.cs
using System;
using System.Collections.Generic;
namespace Publisher
{
static class Program
{
[STAThread]
static void Main()
{
try
{
We instantiate an object of the DDSNet class.
DDSNet dds = new DDSNet();
We then wait for a subscriber on the "Movie Discussion List" topic in domain 42.
dds.MessengerMessageWaitForSubscriber(42, "Movie Discussion List");
Once a subscriber is found, we publish reviews in the same manner as the Messenger
example.
for (int i = 0; i < 10; i++) {
MessengerNet.MessageNet messageNet;
messageNet.subject_id=99;
messageNet.from = "Comic Book Guy";
messageNet.subject = "Review";
messageNet.text = "Worst. Movie. Ever.";
messageNet.count = i;
dds.Publish(42, "Movie Discussion List", messageNet);
}
We then wait for the subscriber to receive the messages, free the DDSNet instance,
and exit.
dds.MessengerMessageWaitForAcknowledgements(42, "Movie Discussion List");
dds.Dispose();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
The publisher is thus reduced from many lines of code in the C++ example to two helper methods to align the publisher with the subscriber for test purposes, and one method to publish a data sample.
The subscriber is as straightforward. In the
Messenger_CS_Subscriber project, we create the file
Subscriber.cs. We implement a class with a method which is called
when a data sample arrives — in this case, the contents of the sample
are printed to the console.
// Messenger_CS_Subscriber/Subscriber.cs
using System;
using System.Collections.Generic;
namespace Subscriber
{
public class Print {
public void MessengerNetMessageNetEventHandler(Object sender,
ProcessEventArgs<MessengerNet.MessageNet> args) {
Console.WriteLine("MessageNetEventHandler: subject = {0}",
args.Sample.subject);
Console.WriteLine("MessageNetEventHandler: subject_id = {0}",
args.Sample.subject_id);
Console.WriteLine("MessageNetEventHandler: from = {0}",
args.Sample.from);
Console.WriteLine("MessageNetEventHandler: count = {0}",
args.Sample.count);
Console.WriteLine("MessageNetEventHandler: text = {0}",
args.Sample.text);
}
};
Next, we begin the Program class as before, and instantiate an object of
type DDSNet.
static class Program
{
[STAThread]
static void Main()
{
try
{
DDSNet dds = new DDSNet();
We establish the MessengerNetMessageNetEventHandler() method as
the subscriber's listener, for the "Movie Discussion List" on domain 42.
dds.Subscribe(42, "Movie Discussion List",
new EventManager<MessengerNet.MessageNet>.ProcessEventHandler(
new Print().MessengerNetMessageNetEventHandler));
We allow the subscriber to process samples until the publisher exits, free the
DDSNet instance, and exit.
dds.MessengerMessageWaitForPublisherToComplete(42,
"Movie Discussion List");
dds.Dispose();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
The subscriber consists of one helper method to align the subscriber with the publisher for test purposes, and one method to establish the subscription itself.
Running run_test_Messenger.pl from the Test
subdirectory will execute the DCPSInfoRepo, C# publisher and subscriber, and demonstrate
that the output matches the Messenger sample in the OpenDDS Developer's Guide.
As demonstrated in the article, the OpenDDS interface can be reduced to a simple publish/subscribe. Although rote, it is burdensome to generate the concrete classes by hand for the data sample type. Part II [8] of this article will show how the concrete classes can be generated automatically based on a user-defined type used for the data sample, to make using OpenDDS with .NET even easier.
[1] OpenDDS
http://www.opendds.org/
[2] Introduction to OpenDDS
http://www.opendds.org/Article-Intro.html
[3] The RAII Programming Idiom
http://www.hackcraft.net/raii/
[4] Using QuickFAST and OpenDDS for a Low Latency Market Data Feed
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-201001.html
[5] OpenDDS Developer's Guide
http://downloads.ociweb.com/OpenDDS/OpenDDS-latest.pdf
[6] Using TAO and OpenDDS with .NET, Part II
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-200902.html
[7] Common Type System
http://msdn.microsoft.com/en-us/library/zcx1eb1e%28VS.90%29.aspx
[8] Code Generation with OpenDDS, Part II
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-201007.html
Object Computing, Inc. (OCI) is the leading provider of object-oriented technology training in the Midwest. Thousands of students participate in our training program every year. Targeted toward software engineers and the development community, our extensive program of over 50 hands-on workshops is delivered to corporations and individuals throughout the U.S. and internationally. OCI's Education Services include Private Training, Public Training, and Lab Rentals. Visit www.ociweb.com/training or contact us at training@ociweb.com.
OCI offers downloads and commercial support for a variety of middleware technologies.
Copyright
©2010
Object Computing, Inc. All rights reserved.
OMG, CORBA, IIOP, and all OMG marks and logs are trademarks or registered
trademarks of Object Management Group, Inc. in the United States and/or
other countries.
Java and all
Java-based marks are trademarks or registered trademarks of Sun
Microsystems, Inc. in the United States and/or other countries.
.NET, C#, and .NET-based marks are trademarks or registered trademarks of Microsoft
Corporation in the United States and/or other countries.