Implement connecting and subscribing
This commit is contained in:
parent
0d608be29c
commit
e3fdb8c8c7
6 changed files with 201 additions and 5 deletions
|
@ -9,7 +9,7 @@ It provides an application called `stomp` which takes configuration of the form:
|
||||||
|
|
||||||
```erlang
|
```erlang
|
||||||
[{stomp, #{host => "stomp-server.somedomain.sexy", % required
|
[{stomp, #{host => "stomp-server.somedomain.sexy", % required
|
||||||
port => 61613, % required
|
port => 61613, % optional
|
||||||
login => <<"someuser">>, % optional
|
login => <<"someuser">>, % optional
|
||||||
passcode => <<"hunter2>>, % optional
|
passcode => <<"hunter2>>, % optional
|
||||||
}}].
|
}}].
|
||||||
|
@ -47,12 +47,11 @@ Once the application starts it will register a process under the name
|
||||||
%% Subscribe to a destination, receive the subscription ID
|
%% Subscribe to a destination, receive the subscription ID
|
||||||
-spec subscribe(binary(), % Destination (e.g. <<"/queue/lizards">>)
|
-spec subscribe(binary(), % Destination (e.g. <<"/queue/lizards">>)
|
||||||
ack_mode(), % Client-acknowledgement mode
|
ack_mode(), % Client-acknowledgement mode
|
||||||
pid()) % PID of the process that wants to receive messages
|
|
||||||
-> {ok, sub_id()}.
|
-> {ok, sub_id()}.
|
||||||
```
|
```
|
||||||
|
|
||||||
This synchronous call subscribes to a message queue. The `stomp_worker` will
|
This synchronous call subscribes to a message queue. The `stomp_worker` will
|
||||||
link itself to the PID and forward received messages as
|
link itself to the caller and forward received messages as
|
||||||
`{msg, sub_id(), stomp_msg()}`.
|
`{msg, sub_id(), stomp_msg()}`.
|
||||||
|
|
||||||
Depending on the acknowledgement mode specified on connecting, the subscriber
|
Depending on the acknowledgement mode specified on connecting, the subscriber
|
||||||
|
|
19
include/stomp.hrl
Normal file
19
include/stomp.hrl
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
%% Client ack modes, refer to the STOMP protocol documentation
|
||||||
|
-type ack_mode() :: client | client_individual | auto.
|
||||||
|
|
||||||
|
%% Subscriptions are enumerated from 0
|
||||||
|
-type sub_id() :: integer().
|
||||||
|
|
||||||
|
%% Message IDs (for acknowledgements) are simple strings. They are
|
||||||
|
%% extracted from the 'ack' field of the header in client or client-individual
|
||||||
|
%% mode, and from the 'message-id' field in auto mode.
|
||||||
|
-type message_id() :: binary().
|
||||||
|
|
||||||
|
%% A destination can be a queue, or something else.
|
||||||
|
%% Example: <<"/queue/lizards">>
|
||||||
|
-type destination() :: binary().
|
||||||
|
|
||||||
|
%% A STOMP message as received from a queue subscription
|
||||||
|
-record(stomp_msg, { headers :: #{ binary() => binary() },
|
||||||
|
body :: binary() }).
|
||||||
|
-type stomp_msg() :: #stomp_msg{}.
|
7
src/stomp.app.src
Normal file
7
src/stomp.app.src
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
{application, stomp, [{description, "STOMP client for Erlang"},
|
||||||
|
{vsn, "0.1.0"},
|
||||||
|
{modules, [stomp_app, stomp_sup, stomp_worker]},
|
||||||
|
{registered, [stomp_worker]},
|
||||||
|
{env, []},
|
||||||
|
{applications, [kernel, stdlib]},
|
||||||
|
{mod, {stomp_app, []}}].
|
|
@ -5,7 +5,7 @@
|
||||||
-export([stop/1]).
|
-export([stop/1]).
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
stomp.erl_sup:start_link().
|
stomp_sup:start_link().
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -19,4 +19,4 @@ stomp_spec() ->
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
shutdown => 5000,
|
shutdown => 5000,
|
||||||
type => worker,
|
type => worker,
|
||||||
module => [stomp_worker]}
|
module => [stomp_worker]}.
|
||||||
|
|
171
src/stomp_worker.erl
Normal file
171
src/stomp_worker.erl
Normal file
|
@ -0,0 +1,171 @@
|
||||||
|
-module(stomp_worker).
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% API.
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
%% gen_server.
|
||||||
|
-export([init/1]).
|
||||||
|
-export([handle_call/3]).
|
||||||
|
-export([handle_cast/2]).
|
||||||
|
-export([handle_info/2]).
|
||||||
|
-export([terminate/2]).
|
||||||
|
-export([code_change/3]).
|
||||||
|
|
||||||
|
-include("stomp.hrl").
|
||||||
|
|
||||||
|
%% State of a stomp_worker
|
||||||
|
-record(state, {connection :: port(),
|
||||||
|
next_sub :: sub_id(),
|
||||||
|
subscriptions :: #{ destination() => sub_id() },
|
||||||
|
subscribers :: #{ destination() => pid() }
|
||||||
|
}).
|
||||||
|
-type state() :: #state{}.
|
||||||
|
|
||||||
|
%% API implementation
|
||||||
|
|
||||||
|
-spec start_link() -> {ok, pid()}.
|
||||||
|
start_link() ->
|
||||||
|
{ok, Pid} = gen_server:start_link(?MODULE, [], []),
|
||||||
|
register(?MODULE, Pid),
|
||||||
|
{ok, Pid}.
|
||||||
|
|
||||||
|
%% gen_server implementation
|
||||||
|
|
||||||
|
-spec init(any()) -> {ok, state()}.
|
||||||
|
init(_Args) ->
|
||||||
|
%% Fetch configuration from app config
|
||||||
|
{ok, Host} = application:get_env(stomp, host),
|
||||||
|
Port = application:get_env(stomp, port, 61613),
|
||||||
|
Login = application:get_env(stomp, login),
|
||||||
|
Pass = application:get_env(stomp, passcode),
|
||||||
|
|
||||||
|
%% Catch exit signals from linked processes (subscribers dying)
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
|
||||||
|
%% Establish connection
|
||||||
|
{ok, Conn} = connect(Host, Port, Login, Pass),
|
||||||
|
|
||||||
|
{ok, #state{connection = Conn,
|
||||||
|
next_sub = 0,
|
||||||
|
subscriptions = #{},
|
||||||
|
subscribers = #{}}}.
|
||||||
|
|
||||||
|
%% Handle subscription calls
|
||||||
|
handle_call({subscribe, Dest, Ack}, From, State) ->
|
||||||
|
%% Subscribe to new destination
|
||||||
|
SubId = State#state.next_sub,
|
||||||
|
ok = subscribe(State#state.connection, SubId, Dest, Ack),
|
||||||
|
|
||||||
|
%% Add subscription and subscriber to state
|
||||||
|
Subscriptions = maps:put(SubId, Dest, State#state.subscriptions),
|
||||||
|
Subscribers = maps:put(SubId, From, State#state.subscribers),
|
||||||
|
NextSub = SubId + 1,
|
||||||
|
NewState = State#state{subscriptions = Subscriptions,
|
||||||
|
subscribers = Subscribers,
|
||||||
|
next_sub = NextSub },
|
||||||
|
|
||||||
|
{reply, {ok, SubId}, NewState};
|
||||||
|
handle_call(_Req, _From, State) ->
|
||||||
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
%% Unused gen_server callbacks
|
||||||
|
|
||||||
|
handle_info(_Msg, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_cast(_Msg, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%% Private functions
|
||||||
|
|
||||||
|
-spec connect(list(), integer(), any(), any()) -> {ok, port()}.
|
||||||
|
connect(Host, Port, Login, Pass) ->
|
||||||
|
%% STOMP CONNECT frame
|
||||||
|
Connect = connect_frame(Host, Login, Pass),
|
||||||
|
|
||||||
|
%% TODO: Configurable buffer size
|
||||||
|
%% Frames larger than the user-level buffer will be truncated, so it should
|
||||||
|
%% never be smaller than the largest expected messages.
|
||||||
|
{ok, Socket} = gen_tcp:connect(Host, Port, [binary,
|
||||||
|
{packet, line},
|
||||||
|
{line_delimiter, $\0},
|
||||||
|
{buffer, 262144}]),
|
||||||
|
|
||||||
|
ok = gen_tcp:send(Socket, Connect),
|
||||||
|
{ok, Socket}.
|
||||||
|
|
||||||
|
-spec subscribe(port(), sub_id(), destination(), ack_mode()) -> ok.
|
||||||
|
subscribe(Socket, Id, Queue, Ack) ->
|
||||||
|
{ok, SubscribeFrame} = subscribe_frame(Id, Queue, Ack),
|
||||||
|
gen_tcp:send(Socket, SubscribeFrame).
|
||||||
|
|
||||||
|
|
||||||
|
%%% Making STOMP protocol frames
|
||||||
|
|
||||||
|
%% Format a header
|
||||||
|
-spec format_header({binary(), binary()}) -> binary().
|
||||||
|
format_header({Key, Val}) ->
|
||||||
|
<<Key, ":", Val, "\n">>.
|
||||||
|
|
||||||
|
%% Build a single STOMP frame
|
||||||
|
-spec make_frame(binary(),
|
||||||
|
list({binary(), binary()}),
|
||||||
|
binary())
|
||||||
|
-> {ok, iolist()}.
|
||||||
|
make_frame(Command, HeaderMap, Body) ->
|
||||||
|
Headers = lists:map(fun format_header/1, HeaderMap),
|
||||||
|
Frame = [Command, <<"\n">>, Headers, <<"\n">>, Body, <<0>>],
|
||||||
|
{ok, Frame}.
|
||||||
|
|
||||||
|
%%% Default frames
|
||||||
|
|
||||||
|
-spec connect_frame(list(), any(), any()) -> iolist().
|
||||||
|
connect_frame(Host, {ok, Login}, {ok, Pass}) ->
|
||||||
|
make_frame(<<"CONNECT">>,
|
||||||
|
[{"accept-version", "1.2"},
|
||||||
|
{"host", Host},
|
||||||
|
{"login", Login},
|
||||||
|
{"passcode", Pass},
|
||||||
|
{"heart-beat", "0,5000"}],
|
||||||
|
[]);
|
||||||
|
connect_frame(Host, _Login, _Pass) ->
|
||||||
|
make_frame(<<"CONNECT">>,
|
||||||
|
[{"accept-version", "1.2"},
|
||||||
|
{"host", Host},
|
||||||
|
%% Expect a server heartbeat every 5 seconds, let the server
|
||||||
|
%% expect one every 10. We don't actually check this and just
|
||||||
|
%% echo server heartbeats.
|
||||||
|
%% TODO: For now the server is told not to expect replies due to
|
||||||
|
%% a weird behaviour.
|
||||||
|
{"heart-beat", "0,5000"}],
|
||||||
|
[]).
|
||||||
|
|
||||||
|
|
||||||
|
-spec subscribe_frame(sub_id(), destination(), ack_mode()) -> iolist().
|
||||||
|
subscribe_frame(Id, Queue, Ack) ->
|
||||||
|
make_frame(<<"SUBSCRIBE">>,
|
||||||
|
[{"id", integer_to_binary(Id)},
|
||||||
|
{"destination", Queue},
|
||||||
|
{"ack", ack_mode_to_binary(Ack)}],
|
||||||
|
[]).
|
||||||
|
|
||||||
|
-spec ack_mode_to_binary(ack_mode()) -> binary().
|
||||||
|
ack_mode_to_binary(AckMode) ->
|
||||||
|
case AckMode of
|
||||||
|
auto -> <<"auto">>;
|
||||||
|
client -> <<"client">>;
|
||||||
|
client_individual -> <<"client-individual">>
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% -spec ack_frame(binary()) -> iolist().
|
||||||
|
%% ack_frame(MessageID) ->
|
||||||
|
%% make_frame(<<"ACK">>,
|
||||||
|
%% [{"id", MessageID}],
|
||||||
|
%% []).
|
Loading…
Add table
Reference in a new issue