![]() |
![]() |
![]() |
The trend in modern computer systems is toward increased numbers of processing units — more cores on a single die, multiple processors in a single machine, or linking machines into larger and larger clusters. As a consequence, complex concurrent applications that are difficult to develop are becoming the norm. The Erlang [1] language can simplify the task of writing distributed concurrent applications. Although a system can be developed entirely in Erlang, interaction with non-Erlang systems is a necessity. This article will show how Erlang can send and receive messages from the outside world via CORBA and DDS.
Erlang was developed over two decades ago by Ericsson for the implementation of large-scale telephony systems, and it is now used by companies [2] such as Amazon, Yahoo!, and Facebook, and in open-source applications such as CouchDB [3], a database engine, and RabbitMQ [4], an implementation of the AMQP messaging protocol [5].
Erlang is a functional language, relying strongly on pattern matching and recursion. Erlang processes are cheap to create, and use message passing, rather than shared memory, for communication. As there is no shared state, computations do not need to block on synchronization primitives which would guard that state, allowing an Erlang system to scale well with the number of available processors.
Another major strength of the Erlang language is fault-tolerance. Processes can be linked and if a process fails, all processes that are linked to it, even if running on a different physical machine, are notified. Action can then be taken to restart the process elsewhere, or to fail-over to alternative behavior. Running code can also be upgraded on the fly, without having to stop the system as a whole. These features allow Erlang-based systems to run continuously for years, without downtime.
In this article, we will use TAO [6] and OpenDDS [7] as the CORBA and DDS implementations, respectively. The code that accompanies this article has been tested under 64-bit Windows 7 with Visual Studio 2010, 32-bit GNU/Linux with GCC 4.3.2, and 64-bit GNU/Linux with GCC 4.4.3 (TAO and DDS compiled as 64-bit, with a 64-bit Erlang distribution). Although Erlang concepts will be described as needed, a full tutorial is beyond the scope of this article. For more information, please consult the several books that have been published [8, 9, 10], and the web sites that have been created [11, 12, 13], that delve deeply into the language and its use.
The Erlang distribution includes a CORBA ORB called Orber as part of the Open Telecom Platform (OTP). OTP is a collection of libraries and procedures for producing distributed applications. While Orber tutorials can be found at [14] and [15], we will extend the example presented in the Middleware News Brief Multi-Language CORBA Development with C++ (TAO), Java (JacORB), Perl (opalORB), and C# (IIOP.NET) [16] to include Erlang.
In that article, servers were created in each language to provide an object that implements the Math interface, as follows:
// CORBA/Erlang/Math.idl
module MathModule {
interface Math
{
long Add(in long x, in long y);
};
};
Clients were then created in each language, and tests run to demonstrate that clients in any language can connect to servers in any language. For this test, we will only reference the C++ client and server in order to demonstrate interoperability between Orber and other ORBs.
We begin by compiling the IDL file with the Erlang compiler.
erlc Math.idl
The files MathModule.hrl, MathModule_Math.erl, MathModule_Math.hrl,
oe_Math.erl, and oe_Math.hrl are created, which include client stubs and other
CORBA infrastructure. We can also create server stubs by running the compiler again, and
specifying a back-end template:
erlc +"{be,erl_template}" Math.idl
This creates the file MathModule_Math_impl.erl. In this file, the skeleton for the Add
method is created as:
'Add'(State, X, Y) ->
{reply, OE_Reply, State}.
In Erlang, variables begin with uppercase letters, but function names must be atoms (non-numerical constants).
The single quotes are used to convert the term into an atom. The variables X
and Y are mapped from CORBA longs to Erlang integers — see [17] for
the complete CORBA type mapping. We now implement
the function as a sum of X and Y, assigning the result to OE_Reply. Within a function,
expressions in an expression list are separated by commas and executed sequentially. Here, the last expression is a
3-tuple (denoted by curly braces) to return from the function, with the first element of the tuple the atom reply, the second
the variable OE_Reply, and the third the variable State that was passed into the function.
'Add'(State, X, Y) ->
OE_Reply = X + Y,
{reply, OE_Reply, State}.
We can now create server and client tests. Create a file named crb.erl. Code is arranged into modules,
as specified by the -module() declaration. Functions within a module that are to be used outside of the
module are specified in a list (denoted by square brackets) in an -export() declaration. The name of each
function to export is followed by a slash, and then the arity (number of parameters) of the function. Here, we export
the client and server test functions. Each expression in Erlang, including these definitions, is terminated by
a period and whitespace. Comments are denoted by %%.
%% CORBA/Erlang/crb.erl -module(crb). -export([client_test/1, server_test/1]).
Next, we create a helper function, start_orber(), to initialize a single-node instance of Orber.
start_orber(Name) ->
mnesia:start(),
corba:orb_init([{domain, Name},
{orber_debug_level, 10},
{iiop_port, 0} ]),
orber:install([node()],
[{ifr_storage_type, ram_copies},
{nameservice_storage_type, ram_copies}]),
orber:start().
This function takes the name of the domain as an argument. Orber instances within the same domain communicate via the Erlang distribution protocol, but communication between instances in different domains is by the OMG's GIOP. Attempting to use GIOP between a client and server in the same domain will cause an OBJECT_NOT_EXIST CORBA exception to be raised.
start_orber() begins by calling mnesia:start() which starts the Mnesia database [18].
Orber uses Mnesia to store internal data, where the database can be maintained fully in RAM, as indicated by the
ram_copies options to orber:install(). If database persistence is desired, a Mnesia
schema must first be created, and disc_copies specified, as described in [14].
For our purposes, a RAM-only database is sufficient.
Next, corba:orb_init() is called to initialize the ORB. (In Erlang, functions that have been exported from modules
are called by prefixing the function name by the module they reside in, followed by a colon.)
A large number of options [19]
can be set, but here only the domain, debug level, and port are specified. A value of 0 for the port causes a random, unused
port to be chosen. If the iiop_port option is not provided, the port used defaults to 4001. This prevents
multiple Orber instances from starting, as, on a given machine, multiple sockets cannot listen on the same port simultaneously.
These options are presented as a list (in square brackets) of tuples.
Finally, orber:install() is called to configure Orber to use the current node and in-memory tables, and
orber:start() runs the ORB.
We now define an additional helper method to read a stringified IOR of a server from a file, and return it as a string.
file:read_file() returns a 2-tuple. Upon success, the first element is the atom ok, and the
second element is a binary data object which is the contents of the file. In Erlang, strings are lists of
characters so erlang:binary_to_list() is called to convert the binary object to a string.
readIOR(FileName) ->
{ok, Binary} = file:read_file(FileName),
erlang:binary_to_list(Binary).
We can now write the client test.
client_test(["ior", IORFile, "add", Xp, Yp]) ->
start_orber("client"),
{X, _} = string:to_integer(Xp),
{Y, _} = string:to_integer(Yp),
oe_Math:oe_register(),
Obj = corba:string_to_object(readIOR(IORFile)),
Res = 'MathModule_Math':'Add'(Obj, X, Y),
io:format("Sum: ~p~n", [Res]),
init:stop().
This method accepts a list of strings as parameters, where the first and third are "ior" and "add", respectively.
The function first starts Orber in the client domain, and then converts the X and Y values from their initial
string representation to integers via the string:to_integer() function. The second element of the
2-tuple returned by string:to_integer() is a list of unconverted text — here, all text will be
converted and the list empty, so we use an underscore to match against the tuple element that we do not care to receive.
The Math interface is
registered in the Interface Repository by the call to oe_Math:oe_register(), and the stringified
IOR is converted to an object reference via corba:string_to_object().
Calls to CORBA methods are via the syntax Module:Method(ObjectReference, Parameters), here as
Res = 'MathModule_Math':'Add'(Obj, X, Y). The result is printed to standard output
by io:format(). The test ends with a call to init:stop() which shuts down the currently-running
Erlang node.
Before running the test, the environment variable ERL_ROOT must be set to the root of the Erlang installation.
A typical installation path under 64-bit Windows 7 is C:\Program Files (x86)\erl5.8.2.
The script run_test.pl in the CORBA/Test directory runs the client test with
a command-line (entered on a single line) similar to:
<ERL_ROOT>/bin/erl -pa ../Erlang -noshell
-run crb client_test ior server.ior add 5 7
The -pa argument adds the ../Erlang directory to the start of the module
search path. The -noshell argument runs Erlang without starting an interactive shell.
-run <module> <function> <arguments> is used to invoke the client_test()
function in the crb module, with the arguments ior server.ior add 5 7
passed as a list of strings. These match the arguments ["ior", IORFile, "add", Xp, Yp] of client_test(). The complete list of arguments
that can be passed to erl can be found here [20].
Note that arguments beginning with a dash are directed to erl and not passed as
function arguments, so ior and add are used instead of -ior and -add as is
done with the ORBs in the other programming languages. Save for this difference, the run_test.pl
used here is the same as in the aforementioned Middleware News Brief.
We next write the server test, which is simpler than the client test, as follows:
server_test([IORFile]) ->
start_orber("server"),
Obj = 'MathModule_Math':oe_create(),
writeIOR(IORFile, corba:object_to_string(Obj)).
The server accepts one parameter, IORFile, the name of the file to which it will write the server's IOR.
Orber is started in the server domain, the Math server begins execution with oe_create(), the
returned object reference is stringified, and written to the file via the writeIOR helper function,
defined as:
writeIOR(FileName, IOR) ->
{ok, FileDesc} = file:open(FileName, [write]),
file:write(FileDesc, IOR),
file:close(FileDesc).
As with the client test, the server test is executed by run_test.pl in an analogous way:
<ERL_ROOT>/bin/erl -pa ../Erlang -noshell
-run crb server_test erlang.ior
After compiling all of the .erl files, MathModule_Math.erl, oe_Math.erl,
MathModule_Math_impl.erl and crb.erl, with erlc, running CORBA/Test/run_test.pl
produces output such as:
Starting TAO server Starting orber server Running test: TAO server, TAO client 84+81 => Expected: 165 Actual: 165 => success Running test: TAO server, orber client 89+9 => Expected: 98 Actual: 98 => success Running test: orber server, TAO client 35+16 => Expected: 51 Actual: 51 => success Running test: orber server, orber client 98+3 => Expected: 101 Actual: 101 => success Stopping TAO server Stopping orber server
The output is now similar to that of the prior MNB, showing that Orber can interoperate with TAO.
Unlike CORBA, Erlang does not have direct support for DDS, but can interoperate with C/C++, the implementation language of OpenDDS. We will create a "C Node" [21], an Erlang node implemented in C (C++, in our case), to act as a gateway between Erlang and DDS.
We will use a variant of the DDS Messenger sample [22] for our purposes, with the following IDL:
// DDS/CPP/DDS_IDL/Messenger.idl
module Messenger {
#pragma DCPS_DATA_TYPE "Messenger::Message"
struct Message {
string msg;
long id;
};
};
We wish to create an Erlang subscriber which receives Messenger samples, as well as an Erlang publisher which sends Messenger samples. We will begin by defining a message protocol between the Erlang processes and the C++ gateway.
We need one message to support publishing from Erlang, three to support subscription, and one for Gateway termination. We will define them as tuples, with the first element an atom indicating the message type.
| { publish, { <string>, <long> } } | Send the associated tuple as the DDS sample. |
| { subscribe, <pid> } | Add the ID of the current process as a process to send DDS samples to. |
| { unsubscribe, <pid> } | Remove the ID of the current process as a process to send DDS samples to. |
| { shutdown } | Terminate the gateway. |
In our example, we will publish 10 samples with decreasing values of the ID — when the ID reaches 0,
the test will end. We begin the file dds.erl with a module and export declaration below. We will use
the same Erlang source file for both the publisher and subscriber tests, so the necessary functions for both tests
are exported.
%% DDS/Erlang/dds.erl -module(dds). -export([publisher_test/1, subscriber_test/1, init_stop/0]).
The publisher_test() function calls the internal publisher_test_loop() function
with three arguments: the node name of the gateway, the message to send, and a loop count of 10 iterations.
As before, arguments from the test script are passed as strings, so the node name of the gateway is converted
to an atom before being passed to publisher_test_loop().
publisher_test([Gateway, Message]) ->
publisher_test_loop(list_to_atom(Gateway), Message, 10).
The function publisher_test_loop() is:
publisher_test_loop(Gateway, Message, Count) ->
if
Count >= 0 ->
io:format("[Erlang ~p] Publish: msg='~p' id=~p~n",
[node(), Message, Count]),
{ any, Gateway } ! { publish, { Message, Count } },
wait(1000),
publisher_test_loop(Gateway, Message, Count-1);
true ->
{ any, Gateway } ! { shutdown },
io:format("[Erlang ~p]: done~n", [node()]),
init_stop()
end.
Each element of an Erlang if expression is a guard (boolean expression), followed by an arrow, followed
by a sequence of expressions. Guards are evaluated in the order presented — the first guard that is found that
evaluates to true has its associated expression list evaluated, and the if statement terminates.
The first guard above is true when the value of Count is greater than, or equal to, zero.
When that
condition occurs, text is printed to the console showing the message to be published, and a publish
message is sent to the gateway node. The tuple on the left of the exclamation point is the address (process ID) to send a message
to, the tuple on the right is the message, and the exclamation point is the send operator.
After the message is sent, execution pauses for one second before continuing. The code then proceeds by recursively
calling publisher_test_loop() with the message count reduced by one. Looping by tail recursion is
a common pattern in Erlang, that is, the current state of a computation is passed as parameters to the function being looped,
after being modified, as necessary, during the current loop iteration.
If the first guard of the if statement fails, the next, and, in this case, last, guard is evaluated.
The value true is always true, so, here, acts as an else clause to the if. If this guard
is triggered, then the count has reached zero. In that case, the shutdown message is sent to the
gateway, an indication of completion is printed to the console, and the Erlang node terminated. Exiting the function
would normally be performed by simply not performing another tail-recursive call, but for the purposes of the
test we desire the entire Erlang node to shut down.
The subscriber test begins with a call to the subscriber_test() function. First, the subscribe
message is sent to the gateway, to inform the gateway that the current process is interested in receiving DDS
samples — the function self() returns the process ID of the current process. The
subscriber_test_loop() function is then called to continue the test.
subscriber_test([Gateway]) ->
{ any, list_to_atom(Gateway) } ! { subscribe, self() },
subscriber_test_loop(list_to_atom(Gateway)).
The subscriber_test_loop() provides a selective receive. Messages, when sent to a process,
are stored in a mailbox for later retrieval. The receive...end expression performs a pattern-match
against the messages in the mailbox, returning the first that matches. It is good to
provide a wildcard match against unexpected messages, as otherwise, messages that are not extracted from the mailbox will
remain and continue to consume memory.
subscriber_test_loop(Gateway) ->
receive
{ message, _, 0 } ->
{ any, Gateway } ! { shutdown },
io:format("[Erlang ~p]: done~n", [node()]),
init_stop();
{ message, Msg, Id } ->
io:format("[Erlang ~p] Received: msg='~p' id=~p~n",
[node(), Msg, Id]),
subscriber_test_loop(Gateway);
Any ->
io:format(
"[Erlang ~p]: Received unknown message ~p~n",
[node(), Any]),
subscriber_test_loop(Gateway)
end.
The first pattern matched against is { message, _, 0 }, where the a matching message consists
of a 3-tuple with the atom message in the first position, the value 0 in the third position,
and anything in the second position. As a message with ID 0 is considered the termination message, if this
is received, the shutdown message is sent to the gateway, a status indication is printed
to the console, and, as before for the purpose of the test, the Erlang node is shut down.
The second pattern matched against is { message, Msg, Id }. This message is similar in structure
to the previous, but, if the matching process has proceeded this far, then the ID value cannot be zero — so we
bind the variables Msg and Id to the message text and count respectively. We then
display a status message to the screen indicating that the incoming message was received, and then tail-recurse,
waiting for the next message to arrive.
The final pattern causes any unknown messages to be accepted and discarded.
In the above, two helper functions were used: init_stop() and wait(). init_stop()
is simply:
init_stop() -> init:stop().
where the supplied stop() function in the init module is invoked. wait() is
implemented as:
wait(MS) ->
receive
after MS -> true
end.
The receive...end expression, by default, is blocking, but by adding the after clause, can be
made non-blocking in that an action can be taken after a timeout interval. Here, no messages are matched against, but after, with a delay in milliseconds, is used to
exit the receive...end after the specified time has elapsed.
The gateway consists of three logical pieces: a DDS publisher, a DDS subscriber, and an Erlang node to bridge Erlang and C++. We will begin by developing the Erlang node.
Erlang provides the Erlang Interface C-API [23] for developing an Erlang node in C or C++
(a "C Node" [21]). Before generating project files with MPC [24] and compiling
the code, be sure to set the environment variable ERLI_ROOT to point to the directory containing the root of the Erlang interface
headers and library files. A typical installation path under 64-bit Windows 7 is
C:\Program Files (x86)\erl5.8.2\lib\erl_interface-3.7.2.
In the file ErlangNode.h, begin as follows:
// DDS/CPP/Gateway/ErlangNode.h
#include "XBuff.h"
#include "Runnable.h"
class ErlangNode : public Runnable {
protected:
std::string _shortName;
std::string _secretCookie;
int _port;
int _fd;
The short name of the node is used as the node's address, and corresponds to the Gateway parameter in the Erlang
code above. Erlang nodes must share the same secret cookie value in order to communicate, so the Erlang nodes and C node
must use the same cookie. A C node opens a TCP socket for communication to Erlang, so the socket's port and file descriptor are
maintained.
ErlangNode is designed as a base class for easy implementation of C nodes. As such, two methods to
override are provided. OnMessage() will be called when an Erlang message arrives, and OnIdle()
will be repeatedly called when there are no incoming messages to process, so other work can be performed.
protected:
virtual bool OnMessage(erlang_pid /*from*/,
XBuff& /*buff*/) { return true; }
virtual bool OnIdle() { return true; }
The Listen() method opens a socket on the given port.
int Listen(int port) {
int listen_fd;
struct sockaddr_in addr;
int on = 1;
if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
return (-1);
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR,
(char *)&on, sizeof(on));
memset((void*)&addr, 0, (size_t) sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(listen_fd, (struct sockaddr*) &addr,
sizeof(addr)) < 0)
return (-1);
listen(listen_fd, 5);
return listen_fd;
}
The Send() method sends an Erlang message, stored in an XBuff, to a specified recipient.
void Send(erlang_pid to, XBuff &buff) {
if (ei_send(_fd, &to, buff.get()->buff,
buff.get()->index) < 0)
throw EIException("ei_reg_send error failed",
erl_errno);
}
The constructor stores the node name, cookie, and port for later use.
ErlangNode(std::string shortName, std::string secretCookie,
int port) : _shortName(shortName),
_secretCookie(secretCookie), _port(port) {}
The main work is performed in the Svc() method. We first initialize the ei_cnode
structure. For most API functions, erl_errno is set when an error has occurred.
void *Svc() {
try {
ei_cnode ec;
int n=0;
if (ei_connect_init(&ec, _shortName.c_str(),
_secretCookie.c_str(), n++) < 0)
throw EIException("ei_connect_init failed",
erl_errno);
Next, we create a socket for listening.
int listen;
if ((listen = Listen(_port)) <= 0)
throw EIException("my_listen failed", errno);
A daemon process, epmd, runs on each Erlang host, and the port that we are listening on
must be registered with it.
if (ei_publish(&ec, _port) == -1)
throw EIException("erl_publish failed",
erl_errno);
We now wait for communication with an Erlang node to be established.
ErlConnect conn;
if ((_fd = ei_accept(&ec, listen, &conn))
== ERL_ERROR)
throw EIException("erl_accept failed",
erl_errno);
Once connected, the main loop executes. The function ei_xreceive_msg_tmo() is called to receive Erlang
messages. If the return value from this function is ERL_REG_SEND or ERL_SEND, a message
has arrived, so OnMessage() is called to process it. If the return value is ERL_ERROR, an
error or timeout has occurred, so OnIdle() is called. For our purposes, other return values and message
types can be ignored.
If OnMessage() or OnIdle() return true, the loop continues, else the loop, and Erlang node,
exits.
while (true) {
erlang_msg msg;
XBuff buff(false);
int rcv = ei_xreceive_msg_tmo(_fd, &msg,
buff.get(), 10); // 10 ms
if (rcv == ERL_MSG) {
if ((msg.msgtype == ERL_REG_SEND) ||
(msg.msgtype == ERL_SEND)) {
if (!OnMessage(msg.from, buff))
break;
}
// ignore other message types
}
else if (rcv == ERL_ERROR) {
if (!OnIdle())
break;
}
// ignore other ei_xreceive_msg_tmo return values
}
}
catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
};
We wish the Erlang node to run independently of the main thread of the gateway, so we define the
interface Runnable to represent code which can be executed in its own thread. Although not needed by our
example, this is beneficial as it allows multiple Erlang nodes to be hosted independently within a single C++ application.
// DDS/CPP/Gateway/Runnable.h
#include <ace/Thread.h>
class Runnable {
public:
virtual void *Svc() = 0;
};
Class Runner executes code that is Runnable in the context of an ACE_Thread. The Start() method begins
execution, while Wait() blocks until the code being executed terminates. A more fully-featured class
would implement cancellation as well, but that is not needed for our example.
class Runner {
ACE_hthread_t _thread;
Runnable *_runnable;
static void *Run(void *p) {
Runnable *runnable =
(reinterpret_cast<Runner *>(p))->_runnable;
void *rtn = runnable->Svc();
delete runnable;
return rtn;
}
public:
Runner(Runnable *runnable) : _thread(0),
_runnable(runnable) {}
void Start() {
ACE_Thread::spawn(reinterpret_cast<ACE_THR_FUNC>(Run),
this, THR_NEW_LWP|THR_JOINABLE, 0, &_thread);
}
void Wait() {
if (_thread != 0)
ACE_Thread::join(_thread);
}
};
Erlang messages are packed into ei_x_buff structures — class XBuff provides a thin C++ wrapper
around the life-cycle management, encoding and decoding functions to simplify the use of ei_x_buff. The class XBuff
itself creates and destroys an ei_x_buff, plus provides access to encoder and decoder objects.
// DDS/CPP/Gateway/XBuff.h
class XBuff {
ei_x_buff _buff;
public:
XBuff(bool initWithVersion) {
if (initWithVersion)
ei_x_new_with_version(&_buff);
else
ei_x_new(&_buff);
}
~XBuff() {
ei_x_free(&_buff);
}
ei_x_buff *get() { return &_buff; }
XBuffDecoder GetDecoder() { return XBuffDecoder(_buff); }
XBuffEncoder GetEncoder() { return XBuffEncoder(_buff); }
};
For our example, only a select few types must be encoded: atoms, longs, strings, and tuples.
class XBuffEncoder {
ei_x_buff &_buff;
XBuffEncoder(ei_x_buff &buff) : _buff(buff) {}
public:
void SetTupleHeader(int arity) {
if (ei_x_encode_tuple_header(&_buff, arity) < 0)
throw EIEncodeException(
"ei_x_encode_tuple_header failed");
}
void SetAtom(std::string atom) {
if (ei_x_encode_atom(&_buff, atom.c_str()) < 0)
throw EIEncodeException("ei_x_encode_atom failed");
}
void SetLong(long l) {
if (ei_x_encode_long(&_buff, l) < 0)
throw EIEncodeException("ei_x_encode_long failed");
}
void SetString(std::string str) {
if (ei_x_encode_string(&_buff, str.c_str()) < 0)
throw EIEncodeException("ei_x_encode_atom failed");
}
// more types
friend class XBuff;
};
These types, in addition to process IDs, must be decoded.
class XBuffDecoder {
ei_x_buff &_buff;
int _offset;
XBuffDecoder(ei_x_buff &buff) : _buff(buff), _offset(0) {}
void GetType(int &type, int &size) {
if (ei_get_type(_buff.buff, &_offset, &type, &size) < 0)
throw EIDecodeException("ei_get_type failed");
}
public:
int GetVersion() {
int version;
if (ei_decode_version(_buff.buff, &_offset, &version) < 0)
throw EIDecodeException("ei_decode_version failed");
return version;
}
int GetTupleHeader() {
int arity;
if (ei_decode_tuple_header(
_buff.buff, &_offset, &arity) < 0)
throw EIDecodeException(
"ei_decode_tuple_header failed");
return arity;
}
std::string GetAtom() {
char atom[MAXATOMLEN+1];
if (ei_decode_atom(_buff.buff, &_offset, atom) < 0)
throw EIDecodeException("ei_decode_atom failed");
return atom;
}
std::string GetString() {
int type, size;
GetType(type, size);
char *p = new char[size+1];
if (p == NULL)
throw EIDecodeException("ei_malloc failed");
if (ei_decode_string(_buff.buff, &_offset, p) < 0) {
delete [] p;
throw EIDecodeException("ei_decode_string failed");
}
std::string s = p;
delete [] p;
return s;
}
erlang_pid GetPID() {
erlang_pid pid;
if (ei_decode_pid(_buff.buff, &_offset, &pid) < 0)
throw EIDecodeException("ei_decode_atom failed");
return pid;
}
long GetLong() {
long l;
if (ei_decode_long(_buff.buff, &_offset, &l) < 0)
throw EIDecodeException("ei_decode_long failed");
return l;
}
// more data types
friend class XBuff;
};
We now turn to the main gateway application in main.cpp. We use an ACE_Arg_Shifter to
parse the command-line arguments.
// DDS/CPP/Gateway/main.cpp
void GetArgs(int argc, char *argv[],
std::string &sname, std::string &secretcookie, int &port) {
ACE_Arg_Shifter arg_shifter(argc, argv);
while (arg_shifter.is_anything_left()) {
const ACE_TCHAR *currentArg = 0;
if ((currentArg = arg_shifter.get_the_parameter(
ACE_TEXT("-sname"))) != 0) {
sname = currentArg;
arg_shifter.consume_arg();
}
else if ((currentArg = arg_shifter.get_the_parameter(
ACE_TEXT("-setcookie"))) != 0) {
secretcookie = currentArg;
arg_shifter.consume_arg();
}
else if ((currentArg = arg_shifter.get_the_parameter(
ACE_TEXT("-port"))) != 0) {
port = ACE_OS::atoi(currentArg);
arg_shifter.consume_arg();
}
else
arg_shifter.ignore_arg();
}
}
int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) {
try {
std::string sname("alice"), secretcookie("secretcookie");
int port = 8001;
GetArgs(argc, argv, sname, secretcookie, port);
We now implement standard DDS publisher and subscriber infrastructure:
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
// create domain participant
DDS::DomainParticipant_var participant =
dpf->create_participant(42,
PARTICIPANT_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == participant)
throw MyException("create_participant failed");
// register type
Messenger::MessageTypeSupport_var ts =
new Messenger::MessageTypeSupportImpl();
if (ts->register_type(participant.in(), "") !=
DDS::RETCODE_OK)
throw MyException("reigster_type failed");
// create topic
CORBA::String_var type_name = ts->get_type_name();
DDS::Topic_var topic =
participant->create_topic("MyTopic",
type_name.in(),
TOPIC_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == topic)
throw MyException("create_topic failed");
// create publisher
DDS::Publisher_var publisher =
participant->create_publisher(PUBLISHER_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == publisher)
throw MyException("create_publisher failed");
// create subscriber
DDS::Subscriber_var subscriber =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == subscriber)
throw MyException("create_subscriber failed");
OpenDDS, as of this writing, allows both the publisher and subscriber to share the same transport.
// create and attach the transport
OpenDDS::DCPS::TransportImpl_rch transport_impl =
TheTransportFactory->create_transport_impl(
OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
OpenDDS::DCPS::AUTO_CONFIG);
if (transport_impl->attach(publisher.in()) !=
OpenDDS::DCPS::ATTACH_OK)
throw MyException(
"transport creation for the publisher failed");
if (transport_impl->attach(subscriber.in()) !=
OpenDDS::DCPS::ATTACH_OK)
throw MyException(
"transport creation for the subscriber failed");
We finish the publisher code by creating a DataWriter.
// create and narrow datawriter
DDS::DataWriter_var writer =
publisher->create_datawriter(topic.in(),
DATAWRITER_QOS_DEFAULT,
DDS::DataWriterListener::_nil(),
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == writer)
throw MyException("create_datawriter failed");
Messenger::MessageDataWriter_var message_writer =
Messenger::MessageDataWriter::_narrow(writer.in());
if (0 == message_writer)
throw MyException("writer _narrow failed");
We complete the subscriber code by creating a DataReader, and associated listener.
// create a common message queue
MessageQueue<MessageType> messageQueue;
// create and narrow datareader, assigning listener
DDS::DataReaderListener_var listener(new
DataReaderListenerImpl(messageQueue));
DDS::DataReader_var reader =
subscriber->create_datareader(topic.in(),
DATAREADER_QOS_DEFAULT,
listener.in(),
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == reader)
throw MyException("create_datareader failed");
A MessageQueue<T> is a thread-safe list to hold DDS samples as they are received. The list
itself is implemented by using an ACE_Guard<T> to protect methods of a std::list<T>:
// DDS/CPP/Gateway/MessageQueue.h
#include <list>
template<typename T>
class MessageQueue {
std::list<T> _q;
ACE_Thread_Mutex lock_;
public:
void Insert(T t) {
ACE_Guard<ACE_Thread_Mutex> guard(lock_);
_q.push_back(t);
}
T Remove() {
ACE_Guard<ACE_Thread_Mutex> guard(lock_);
T front = _q.front();
_q.pop_front();
return front;
}
bool empty() {
ACE_Guard<ACE_Thread_Mutex> guard(lock_);
return _q.empty();
}
void clear() {
ACE_Guard<ACE_Thread_Mutex> guard(lock_);
return _q.clear();
}
};
The MessageQueue<T> stores smart pointer-wrapped DDS samples.
// DDS/CPP/Gateway/defs.h typedef std::tr1::shared_ptr<Messenger::Message> MessageType;
Messages are added to the MessageQueue by the DataReader listener's OnDataAvailable() method.
// DDS/CPP/Gateway/DataReaderListenerImpl.cpp
void DataReaderListenerImpl::on_data_available(
DDS::DataReader_ptr reader)
ACE_THROW_SPEC((CORBA::SystemException))
{
Messenger::MessageDataReader_var reader_i =
Messenger::MessageDataReader::_narrow(reader);
if (CORBA::is_nil(reader_i.in())) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("ERROR: %N:%l: on_data_available() -")
ACE_TEXT(" _narrow failed!\n")));
ACE_OS::exit(-1);
}
MessageType message(new Messenger::Message);
DDS::SampleInfo info;
DDS::ReturnCode_t error =
reader_i->take_next_sample(*message, info);
if (error == DDS::RETCODE_OK) {
if (info.valid_data)
_messageQueue.Insert(message);
} else {
ACE_ERROR((LM_ERROR,
ACE_TEXT("ERROR: %N:%l: on_data_available() -")
ACE_TEXT(" take_next_sample failed!\n")));
}
}
We complete ACE_TMAIN() by running the DDSGateway Erlang node, waiting for its
termination, and cleaning up when done.
// DDS/CPP/Gateway/main.cpp
Runner r(new DDSGateway(sname, secretcookie, port,
message_writer.in(), messageQueue));
r.Start();
r.Wait();
// clean up
participant->delete_contained_entities();
dpf->delete_participant(participant.in());
TheTransportFactory->release();
TheServiceParticipant->shutdown();
} catch (const CORBA::Exception& e) {
e._tao_print_exception("Exception caught in main():");
return -1;
} catch (const std::exception& e) {
std::cerr << "Exception caught in main(): " << e.what()
<< std::endl;
return -1;
}
return 0;
}
Now that the infrastructure is in place, we can implement DDSGateway, the core of the gateway process.
DDSGateway maintains a reference to the Message DataWriter, allowing it to receive Erlang messages and
publish them as DDS ones. DDSGateway also maintains a reference to the MessageQueue<T> from
the DDS subscriber, and a collection of Erlang process IDs, allowing it to receive messages from the DDS
subscriber, and send them as Erlang ones. The constructor also sets the Erlang node name, cookie, and listening port.
// DDS/CPP/Gateway/DDSGateway.h
class DDSGateway : public ErlangNode {
Messenger::MessageDataWriter_var _messageWriter;
std::set<erlang_pid> _subscribers;
MessageQueue<MessageType> &_messageQueue;
public:
DDSGateway(std::string shortName, std::string secretCookie,
int port, Messenger::MessageDataWriter_ptr messageWriter,
MessageQueue<MessageType> &messageQueue) :
ErlangNode(shortName, secretCookie, port),
_messageWriter(messageWriter),
_messageQueue(messageQueue) {}
virtual bool OnMessage(erlang_pid /*from*/, XBuff& /*buff*/);
virtual bool OnIdle();
};
Implementation of DDSGateway is straightforward, as we only need to override OnMessage()
and OnIdle(). In OnMessage(), we decode the first part of every incoming message the
same way as the start of a 3-tuple, followed by an atom indicating the type of message being sent.
// DDS/CPP/Gateway/DDSGateway.cpp
bool DDSGateway::OnMessage(erlang_pid /*from*/, XBuff &buff) {
// Process these messages:
// { publish, { <string>, <long> } }
// { subscribe, pid }
// { unsubscribe, pid }
// { shutdown }
XBuffDecoder d = buff.GetDecoder();
d.GetVersion();
d.GetTupleHeader();
std::string cmd = d.GetAtom();
If the message is shutdown, we terminate the Erlang node by returning false.
// on shutdown, exit
if (cmd == "shutdown")
return false;
If the message is publish, we extract the message and ID values, assign them to a DDS
Messenger::Message structure, and publish it as a DDS sample.
// publish
if (cmd == "publish") {
d.GetTupleHeader();
Messenger::Message message;
message.msg = d.GetString().c_str();
message.id = d.GetLong();
DDS::ReturnCode_t err =
_messageWriter->write(message, DDS::HANDLE_NIL);
if (err!=DDS::RETCODE_OK)
throw MyException("DDS write failed");
}
Finally, if a subscribe or unsubscribe message is received, then the supplied PID
is added or removed from the subscriber collection, respectively.
// subscribe
if (cmd == "subscribe")
_subscribers.insert(d.GetPID());
// unsubscribe
if (cmd == "unsubscribe")
_subscribers.erase(d.GetPID());
return true;
}
In OnIdle(), we handle any pending DDS messages that have been received in the MessageQueue<T>.
If there are no Erlang subscribers, however, then any pending messages can be discarded. As an extension, quality of service criteria can be
applied here, such as maintaining a history of samples that can be sent to late-joining subscribers which mirrors the
DDS DURABILITY policy [22].
bool DDSGateway::OnIdle() {
// if there are no subscribers,
// discard pending messages and return
if (_subscribers.empty()) {
_messageQueue.clear();
return true;
}
If there is at least one subscriber, send all pending messages to each subscriber. The subscriber
list cannot change while the loop executes, as OnMessage() will not be called again until OnIdle()
returns. The MessageQueue<T>, however, can change, so repeatedly removing DDS samples until the queue is (at least momentarily)
empty will ensure that all samples are handled properly.
// otherwise, send each waiting message to all subscribers
while (!_messageQueue.empty()) {
MessageType m = _messageQueue.Remove();
for (std::set<erlang_pid>::iterator
subscriber = _subscribers.begin();
subscriber!=_subscribers.end(); subscriber++) {
Each DDS sample is formed into an Erlang 3-tuple containing the atom message, the message text, and
the ID value. The message is then sent to the subscriber in question.
XBuff rtn(true);
XBuffEncoder e=rtn.GetEncoder();
e.SetTupleHeader(3);
e.SetAtom("message");
e.SetString(m->msg.in());
e.SetLong(m->id);
Send(*subscriber, rtn);
}
}
return true;
}
This completes the implementation of the gateway. After compiling all C++ code, as well as dds.erl, we can now run
the publisher and subscriber tests. As with the CORBA tests, we will use Perl test runners, but these will be
based on the test runners in the OpenDDS distribution. The output from the tests below has been abbreviated somewhat
for ease of discussion — additional log messages will be displayed by the test runners.
To execute the publisher test, run DDS/Test/run_test_ep.pl. The test script performs the actions described
below.
As the Erlang daemon, epmd, must be running
in order for the gateway to register its listening socket, we ensure that an instance is started by running an Erlang node, and then
executing the init_stop() function so it immediately terminates. Although the node has shut down,
epmd remains running. The test runs a command similar to:
<ERL_ROOT>/bin/erl -sname dummy -setcookie secretcookie
-pa ../Erlang -noshell -run dds init_stop
Next, we start the DCPSInfoRepo:
<DDS_ROOT>\bin\.\DCPSInfoRepo.EXE -ORBDebugLevel 10
-ORBLogFile DCPSInfoRepo.log -o repo.ior
Next, we start the gateway as Erlang node alice:
..\CPP\Gateway\.\gateway.EXE -ORBSvcConf tcp.conf -sname alice
-setcookie secretcookie -port 8001
Next, we start a C++ DDS subscriber that subscribes to the topic that is being published. As the code is substantially similar to the subscriber-side of the gateway, it is not described here but is present in the code archive.
..\CPP\Subscriber\.\subscriber.EXE -ORBSvcConf tcp.conf
Finally, we start the Erlang publisher_test() function itself in Erlang node bob. We pass the address
of the gateway (here, alice@oci1373) and the message to publish as arguments to publisher_test().
<ERL_ROOT>/bin/erl -sname bob -setcookie secretcookie
-pa ../Erlang -noshell
-run dds publisher_test alice@oci1373 Message
As the test runs, the Erlang node prints the message being sent, the gateway publishes them as DDS samples, and and the C++ subscriber prints the messages as they are received.
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=10 [Subscriber] Received: msg='Message' id=10 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=9 [Subscriber] Received: msg='Message' id=9 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=8 [Subscriber] Received: msg='Message' id=8 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=7 [Subscriber] Received: msg='Message' id=7 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=6 [Subscriber] Received: msg='Message' id=6 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=5 [Subscriber] Received: msg='Message' id=5 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=4 [Subscriber] Received: msg='Message' id=4 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=3 [Subscriber] Received: msg='Message' id=3 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=2 [Subscriber] Received: msg='Message' id=2 [Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=1 [Subscriber] Received: msg='Message' id=1 [Erlang bob@oci1373]: done
The subscriber test starts similarly. When the test script DDS/Test/run_test_es.pl is executed,
an Erlang node is run, executing the init_stop() function, to ensure that
epmd has started. Next, the DCPSInfoRepo is started, as well as the gateway on node alice.
<ERL_ROOT>/bin/erl -sname dummy -setcookie secretcookie
-pa ../Erlang -noshell -run dds init_stop
<DDS_ROOT>\bin\.\DCPSInfoRepo.EXE -ORBDebugLevel 10
-ORBLogFile DCPSInfoRepo.log -o repo.ior
..\CPP\Gateway\.\gateway.EXE -ORBSvcConf tcp.conf -sname alice
-setcookie secretcookie -port 8001
Erlang node bob is now started to run the subscriber_test() function, which takes the address
of the gateway as an argument.
<ERL_ROOT>/bin/erl -sname bob -setcookie secretcookie
-pa ../Erlang -noshell -run dds subscriber_test alice@oci1373
Finally, we start a C++ DDS publisher that publishes Message samples. As the code is
substantially similar to the publisher-side of the gateway, it is not described here
but is present in the code archive.
..\CPP\Publisher\.\publisher.EXE -ORBSvcConf tcp.conf
As the test runs, the C++ publisher prints the message being sent, the gateway publishes them as DDS samples, and and the Erlang subscriber prints the messages as they are received.
[Publisher] Publish: msg='Message' id=10 [Erlang bob@oci1373] Received: msg='"Message"' id=10 [Publisher] Publish: msg='Message' id=9 [Erlang bob@oci1373] Received: msg='"Message"' id=9 [Publisher] Publish: msg='Message' id=8 [Erlang bob@oci1373] Received: msg='"Message"' id=8 [Publisher] Publish: msg='Message' id=7 [Erlang bob@oci1373] Received: msg='"Message"' id=7 [Publisher] Publish: msg='Message' id=6 [Erlang bob@oci1373] Received: msg='"Message"' id=6 [Publisher] Publish: msg='Message' id=5 [Erlang bob@oci1373] Received: msg='"Message"' id=5 [Publisher] Publish: msg='Message' id=4 [Erlang bob@oci1373] Received: msg='"Message"' id=4 [Publisher] Publish: msg='Message' id=3 [Erlang bob@oci1373] Received: msg='"Message"' id=3 [Publisher] Publish: msg='Message' id=2 [Erlang bob@oci1373] Received: msg='"Message"' id=2 [Publisher] Publish: msg='Message' id=1 [Erlang bob@oci1373] Received: msg='"Message"' id=1 [Publisher] Publish: msg='Message' id=0 [Erlang bob@oci1373]: done
As shown in this article, CORBA and DDS can be used with systems written in Erlang. While CORBA has direct support in the Erlang distribution, DDS must be used via the external code interface. By using the framework presented here, supporting different CORBA interfaces and DDS sample types is straightforward.
[1] Open-source Erlang - White Paper
http://erlang.org/white_paper.html
[2] Where is Erlang used and why?
http://stackoverflow.com/questions/1636455/where-is-erlang-used-and-why
[3] The CouchDB Project
http://couchdb.apache.org/
[4] RabbitMQ
http://www.rabbitmq.com/
[5] Advanced Message Queuing Protocol
http://www.amqp.org/
[6] The ACE ORB (TAO)
http://www.theaceorb.com/
[7] OpenDDS
http://www.opendds.org/
[8] Armstrong. Programming Erlang, Software for a Concurrent World. Pragmatic Bookshelf, 2007.
[9] Cesarini, Thompson. Erlang Programming. O'Reilly, 2009.
[10] Logan, Merritt, Carlsson. Erlang and OTP in Action. Manning, 2011.
[11] www.trapexit.org
http://www.trapexit.org/
[12] Open Source Erlang
http://www.erlang.org/
[13] erldocs.com
http://erldocs.com/
[14] Installing Orber
http://www.erlang.org/doc/apps/orber/ch_install.html
[15] Orber Examples
http://erlang.mirror.su.se/documentation/doc-5.4.8/lib/orber-3.6/doc/html/ch_example.html
[16] Multi-Language CORBA Development with C++ (TAO), Java (JacORB), Perl (opalORB), and C# (IIOP.NET)
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-200904.html
[17] OMG IDL to Erlang Mapping
http://www.erlang.org/doc/apps/orber/ch_idl_to_erlang_mapping.html
[18] [Mnesia] Introduction
http://www.erlang.org/doc/apps/mnesia/Mnesia_chap1.html
[19] Orber options
http://www.erlang.org/doc/apps/orber/ch_install.html#config
[20] erl
http://www.erlang.org/doc/man/erl.html
[21] C Nodes
http://www.erlang.org/doc/tutorial/cnode.html
[22] OpenDDS Developer's Guide
http://download.ociweb.com/OpenDDS/OpenDDS-latest.pdf
[23] ei_connect
http://www.erlang.org/doc/man/ei_connect.html
[24] MPC
http://www.ociweb.com/products/mpc
Object Computing, Inc. (OCI) is the leading provider of object-oriented technology training in the Midwest. Thousands of students participate in our training program every year. Targeted toward software engineers and the development community, our extensive program of over 50 hands-on workshops is delivered to corporations and individuals throughout the U.S. and internationally. OCI's Education Services include Private Training, Public Training, and Lab Rentals. Visit www.ociweb.com/training or contact us at training@ociweb.com.
OCI offers downloads and commercial support for a variety of middleware technologies.
Copyright
©2011
Object Computing, Inc. All rights reserved.
OMG, CORBA, IIOP, and all OMG marks and logs are trademarks or registered
trademarks of Object Management Group, Inc. in the United States and/or
other countries.
Java and all
Java-based marks are trademarks or registered trademarks of Sun
Microsystems, Inc. in the United States and/or other countries.
.NET, C#, and .NET-based marks are trademarks or registered trademarks of Microsoft
Corporation in the United States and/or other countries.