Just for fun, I thought I’d show a tiny Erlang app (or library) I started for a project I was going to write in Erlang, but which got cancelled before I could finish up all the infrastructure pieces.
elib_ping, as I call it, enables services running on Erlang nodes to find each other without having to configure them at start time. Of course, this works as long as they’re all running on the same subnet in, I assume, the same data center.
The idea I was working from was that in a given distributed system, you might have dozens of Erlang nodes, all hosting applications of specific types. For instance, one node might be a “worker” node for putting data into a database, and another might be a “reader” node for getting data out of a database. (Think of this as something of a super specialized grid or map/reduce system, but don’t think too much about that: this is just a contrived example scenario.)
What I wanted to do was have all the worker nodes find the reader nodes, and start sending messages to those nodes indicating that the worker was ready to do some work.
What I did NOT want to do was have to pre-configure each node with the location of all the other nodes. Instead, I wanted each node to build up a map of all the others, and then send messages to them according to the work a given node can do. Although not shown in any code presented below, I envisioned the ping application calling net_adm:ping() when it discovered a node, thus registering that node in the nodes() list, which carries with it a lot of functionality, such as heartbeat monitoring.
Because this is my first little article dealing with Erlang, I’ll provide an simple outline of the parts of an Erlang application in case you’re unfamiliar with such things, and then proceed with the code samples making up the initial pass at the idea.
In general, simple Erlang applications are built on a few supplementary files that, taken with the main functional modules, make up an “application” that can be loaded into an Erlang VM and made use of by other applications.
To make this work, you need the following:
an application descriptor file containing metadata describing the modules making up the application,
a main application callback module used to start the application,
and containing any useful “interface” functions you’d like to be the main API of the application,
a supervisor callback module, tasked with monitoring the application’s running processes and keeping them going if they fail,
and at least one module making up the actual application itself, which is, quite often, a gen_server callback module of some sort.
With all that said, let’s get the basic boilerplate part of the application out of the way before moving on to the stuff that makes this work.
The application file describes the modules making up the application and the roles those modules play:
{application, ping, [
{description, "Node Discoverer"},
{vsn, "1.0"},
{modules, [ping_serv, ping_sup, ping_app,ping]},
{registered, [ping_serv,ping_sup,ping_event]},
{applications, [kernel,stdlib]},
{mod, {ping_app, []}},
{start_phases, []}
]}.
The application definition is really just a series of key/value pairs, or properties. When the application is started via:
application:start(ping)
The Erlang OTP will call the start function on the ping_app module defined by the mod property. The registered property notes which processes will have registered names, and the modules property notes all the modules participating in the application. OTP uses this data for, among other things, figuring out what to do if you need to live-upgrade the code.
The main application callback module is nothing more than boilerplate:
-module(ping_app).
-behaviour(application).
-export([start/2,stop/1]).
start(_Type, _StartArgs) ->
ping_sup:start_link().
stop(_State) ->
ok.
As you can see, the start function is responsible for starting the supervisor. Simple and straightforward.
The supervisor is also not much more than boilerplate:
-module(ping_sup).
-behaviour(supervisor).
-export([start_link/0,start_link/1]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
start_link(Args) ->
supervisor:start_link({local, ?SERVER}, ?MODULE, Args).
init([]) ->
PingServ = {tag1, {ping_serv,start_link,[]},
permanent,2000,worker,[ping_serv]},
PingEvent = {tag2, {gen_event,start_link,[{local,ping_event}]},
permanent,2000,worker,dynamic},
{ok, {{one_for_one,3,10}, [PingEvent, PingServ]}}.
The init module is responsible (in this case) for defining the two processes it will monitor:
The Ping Server process, which actually does the work of sending and receiving pings to and from other Erlang nodes,
and the Ping Event process, to which you register handles so that your application can be notified of ping events.
So far, so good.
The functional interface part is not part of the OTP system, but it’s a common convention to have a module named after your application that contains the functions needed to access the application’s functionality, start and stop convenience functions, and so on.
-module(ping).
-export([start/0,stop/0,make/0,set_status/1,set_type/1,add_handler/2]).
start() ->
case node() of
'nonode@nohost' ->
node_name_not_defined;
_ ->
application:start(sasl),
application:start(ping)
end.
stop() ->
application:stop(ping).
make() ->
make:all([load]).
set_status(Status) ->
when_running(fun() -> ping_serv:set_status(Status) end).
set_type(Type) ->
when_running(fun() -> ping_serv:set_type(Type) end).
add_handler(Module, Params) ->
when_running(fun() ->
gen_event:add_sup_handler(ping_event, Module, Params) end).
%% Internal functions
when_running(Fun) ->
case whereis(ping_serv) of
undefined ->
ping_app_not_started;
_Pid ->
Fun()
end.
The start function invokes application:start for you and makes sure that all the applications it depends on (in this case, sasl) are also started. I’ve added a make function for interactive development, and an add_handler function to register callbacks for handling ping events.
Whenever the ping server gets a ping packet from somewhere else, it sends a message to (a.k.a. notifies) the ping_event process, so to make use of the ping server, your specific application will need to register a handler.
The set_status and set_type are convenience functions for configuring the ping server itself. The idea is to set a status (running, paused, busy, sick, etc), and a type (reader, worker, logger, etc) so that other nodes can recognize both what the sending node can do, and whether or not it’s ready to do it.
The main functionality is a gen_server which uses timeouts as a way to generate timed events for sending out packets.
Here’s the code:
-module(ping_serv).
-behaviour(gen_server).
-define(INFO(Fmt, Args),
error_logger:info_msg("INFO: [~p:~p] " ++ Fmt ++ "~n",
[?MODULE, ?LINE | Args])).
-define(ERROR(Fmt, Args),
error_logger:info_msg("ERROR: [~p:~p] " ++ Fmt ++ "~n",
[?MODULE, ?LINE | Args])).
-define(SERVER, ?MODULE).
-define(PULSE, 5000).
%% Should allow these to be configured.
-define(PORT, 40000).
-define(MULTICAST_ADDR, {239, 10, 11, 12}).
-define(LOCAL_ADDR, {0,0,0,0}). % "localhost" nor {127,0,0,1} work
-export([start/0,start_link/0,stop/0,set_status/1,set_type/1]).
-export([init/1,handle_call/3,handle_cast/2,
handle_info/2,code_change/3,terminate/2]).
-record(state, {send, recv, type, status, pids=[]}).
%% API
start() ->
gen_server:start({local, ?SERVER}, ?MODULE, [], []).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
stop() ->
gen_server:call(?SERVER, stop).
set_status(Status) ->
gen_server:cast(?SERVER, {status, Status}).
set_type(Type) ->
gen_server:cast(?SERVER, {type, Type}).
%% gen_server
init([]) ->
?INFO("spawning send socket", []),
SendSocket = make_send_socket(),
RecvSocket = make_recv_socket(),
?INFO("sockets: ~p ~p", [SendSocket, RecvSocket]),
{ok, #state{send=SendSocket, recv=RecvSocket}, ?PULSE}.
handle_call(stop, _From, State) ->
{stop, stopped, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({status, Status}, State) ->
NewState = State#state{status=Status},
{noreply, NewState};
handle_cast({type, Type}, State) ->
NewState = State#state{type=Type},
{noreply, NewState};
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(timeout, #state{send=Send}=State) ->
Packet = make_packet(State),
case gen_udp:send(Send, ?MULTICAST_ADDR, ?PORT, Packet) of
ok ->
ok;
{error, Reason} ->
?ERROR("~p unable to send packet (~p)", [node(), Reason])
end,
{noreply, State, ?PULSE};
handle_info({udp, _Socket, _IP, _InPortNo, Packet}, State) ->
Node = binary_to_term(Packet),
gen_event:notify(ping_event, Node),
{noreply, State, ?PULSE};
handle_info(Info, State) ->
?INFO("info msg: ~p", [Info]),
{noreply, State, ?PULSE}.
code_change(_OldVersion, State, _Extra) ->
{ok, State}.
terminate(Reason, #state{send=undefined, recv=undefined}) ->
?INFO("terminating (~p)", [Reason]),
ok;
terminate(Reason, State) ->
?INFO("terminating (~p)", [Reason]),
gen_udp:close(State#state.send),
gen_udp:close(State#state.recv),
ok.
%% Internal Functions
make_recv_socket() ->
Opts = [ { active, true },
{ ip, ?MULTICAST_ADDR },
{ add_membership, { ?MULTICAST_ADDR, ?LOCAL_ADDR } },
{ multicast_loop, true },
{ reuseaddr, true },
binary ],
{ ok, Socket } = gen_udp:open (?PORT, Opts),
Socket.
make_send_socket() ->
Options = [ { ip, ?LOCAL_ADDR },
{ multicast_ttl, 255 },
{ multicast_loop, true } ],
{ok, Socket} = gen_udp:open(0, Options),
Socket.
make_packet(State) ->
Packet = {presence, [
{node, node()},
{host, inet:gethostname()},
{type, State#state.type},
{status, State#state.status}]},
term_to_binary(Packet).
There’s lots to cover, but I’ll try to be brief. I’m going to assume that if you’re reading this, you’re familiar with the callback nature of the gen_server OTP mechanism. The code contains some constants, definitions and exports, the module’s API, the gen_server callbacks, and internal functions.
The server works by constructing a send socket and a receive socket via gen_udp with settings I discovered in some code for a similar project called, I think, nodefinder on Google Code. Once the sockets are created, they’re added to a state record and returned via the gen_server’s init function with a timeout (named ?PULSE here). When the timeout times out, the handle_info function is sent a timeout message. After matching the appropriate handle_info clause, the gen_server sends out a packet on the send socket, and then returns another ?PULSE timeout.
In effect, we have a timer loop, of sorts.
So far so good.
If there are any other similar listeners out on the network (other ping_serv instances running on other nodes), they’ll get that packet.
When a packet arrives across the multicast interface, OTP will send a message to the ping_serv process, where it is matched against one of the handle_info functions. The packet is received, decoded, and then sent to the ping_event via asynchronous notify.
If there are any handlers registered with ping_event, they’ll get the packet and can do something interesting with it, such as call net_adm:ping(), or keep a local state of the metadata in the packet. Or do nothing.
The packet itself:
{presence, [
{node, node()},
{host, inet:gethostname()},
{type, State#state.type},
{status, State#state.status}]}
is not all that interesting. I’ve modeled it here as a “presence” packet, named after the similar concept in XMPP. It’s a property list containing:
node:
The node name of the sender (which you can use to ping via the net_adm module), which acts as, among other things, a “process-id”, if you want to do some sort of failover scheme,
host:
The host on which the node is running, mostly just for tourist information, but also to facilitate some sort of balancing, or to help identify which server went down if you no longer get any presence packets from them.
type:
A type, which amounts to the purpose of the node sending out the packet. Examples, as mentioned above, would be “reader” or “worker” or what have you.
status:
A status, which represents the state of the node, such as paused, running, busy.
These are really quite arbitrary, but I hope this gives you an idea how to do this sort of thing.
I’d not recommend this as a main transport for valuable data, but it works well for heart-beat style applications in which a few dropped, or garbled messages really don’t matter.
Technorati Tags: distributed, erlang, multicast, discovery