merge(stomp.erl): Integrate at //net/stomp_erl
This currently has no build configuration.
This commit is contained in:
commit
1a281d3bb9
9 changed files with 6870 additions and 0 deletions
10
net/stomp_erl/.gitignore
vendored
Normal file
10
net/stomp_erl/.gitignore
vendored
Normal file
|
@ -0,0 +1,10 @@
|
|||
.eunit
|
||||
deps
|
||||
*.o
|
||||
*.beam
|
||||
*.plt
|
||||
erl_crash.dump
|
||||
ebin
|
||||
rel/example_project
|
||||
.concrete/DEV_MODE
|
||||
.rebar
|
8
net/stomp_erl/Makefile
Normal file
8
net/stomp_erl/Makefile
Normal file
|
@ -0,0 +1,8 @@
|
|||
PROJECT = stomp
|
||||
PROJECT_DESCRIPTION = STOMP client for Erlang
|
||||
PROJECT_VERSION = 0.1.0
|
||||
|
||||
# Whitespace to be used when creating files from templates.
|
||||
SP = 4
|
||||
|
||||
include erlang.mk
|
78
net/stomp_erl/README.md
Normal file
78
net/stomp_erl/README.md
Normal file
|
@ -0,0 +1,78 @@
|
|||
STOMP on Erlang
|
||||
===============
|
||||
|
||||
`stomp.erl` is a simple Erlang client for the [STOMP protocol][] in version 1.2.
|
||||
|
||||
Currently only subscribing to queues is supported.
|
||||
|
||||
It provides an application called `stomp` which takes configuration of the form:
|
||||
|
||||
```erlang
|
||||
[{stomp, #{host => "stomp-server.somedomain.sexy", % required
|
||||
port => 61613, % optional
|
||||
login => <<"someuser">>, % optional
|
||||
passcode => <<"hunter2>>, % optional
|
||||
}}].
|
||||
```
|
||||
|
||||
## Types
|
||||
|
||||
The following types are used in `stomp.erl`, you can include them from
|
||||
`stomp.hrl`:
|
||||
|
||||
```erlang
|
||||
%% Client ack modes, refer to the STOMP protocol documentation
|
||||
-type ack_mode() :: client | client_individual | auto.
|
||||
|
||||
%% Subscriptions are enumerated from 0
|
||||
-type sub_id() :: integer().
|
||||
|
||||
%% Message IDs (for acknowledgements) are simple strings. They are
|
||||
%% extracted from the 'ack' field of the header in client or client-individual
|
||||
%% mode, and from the 'message-id' field in auto mode.
|
||||
-type message_id() :: binary().
|
||||
|
||||
%% A STOMP message as received from a queue subscription
|
||||
-record(stomp_msg, { headers :: #{ binary() => binary() },
|
||||
body :: binary() }.
|
||||
-type stomp_msg() :: #stomp_msg{}.
|
||||
```
|
||||
|
||||
Once the application starts it will register a process under the name
|
||||
`stomp_worker` and expose the following API:
|
||||
|
||||
## Subscribing to a queue
|
||||
|
||||
```erlang
|
||||
%% Subscribe to a destination, receive the subscription ID
|
||||
-spec subscribe(binary(), % Destination (e.g. <<"/queue/lizards">>)
|
||||
ack_mode(), % Client-acknowledgement mode
|
||||
-> {ok, sub_id()}.
|
||||
```
|
||||
|
||||
This synchronous call subscribes to a message queue. The `stomp_worker` will
|
||||
link itself to the caller and forward received messages as
|
||||
`{msg, sub_id(), stomp_msg()}`.
|
||||
|
||||
Depending on the acknowledgement mode specified on connecting, the subscriber
|
||||
may have to acknowledge receival of messages.
|
||||
|
||||
## Acknowledging messages
|
||||
|
||||
```erlang
|
||||
%% Acknowledge a message ID.
|
||||
%% This is not required in auto mode. In client mode it will acknowledge the
|
||||
%% received messages up to the ID specified. In client-individual mode every
|
||||
%% single message has to be acknowledged.
|
||||
-spec ack(sub_id(), message_id()) -> ok.
|
||||
|
||||
%% Explicitly "unacknowledge" a message
|
||||
-spec nack(sub_id(), message_id()) -> ok.
|
||||
```
|
||||
|
||||
Both of these calls are asynchronous and will return immediately. Note that in
|
||||
the case of the `stomp_worker` crashing before a message acknowledgement is
|
||||
handled, the message *may* be delivered again. Your consumer needs to be able to
|
||||
handle this.
|
||||
|
||||
[STOMP protocol]: https://stomp.github.io/stomp-specification-1.2.html
|
6519
net/stomp_erl/erlang.mk
vendored
Normal file
6519
net/stomp_erl/erlang.mk
vendored
Normal file
File diff suppressed because it is too large
Load diff
22
net/stomp_erl/include/stomp.hrl
Normal file
22
net/stomp_erl/include/stomp.hrl
Normal file
|
@ -0,0 +1,22 @@
|
|||
%% Client ack modes, refer to the STOMP protocol documentation
|
||||
-type ack_mode() :: client | client_individual | auto.
|
||||
|
||||
%% Subscriptions are enumerated from 0
|
||||
-type sub_id() :: integer().
|
||||
|
||||
%% Message IDs (for acknowledgements) are simple strings. They are
|
||||
%% extracted from the 'ack' field of the header in client or client-individual
|
||||
%% mode, and from the 'message-id' field in auto mode.
|
||||
-type message_id() :: binary().
|
||||
|
||||
%% A destination can be a queue, or something else.
|
||||
%% Example: <<"/queue/lizards">>
|
||||
-type destination() :: binary().
|
||||
|
||||
%% A STOMP message as received from a queue subscription
|
||||
-record(stomp_msg, { headers :: #{ binary() => binary() },
|
||||
body :: binary() }).
|
||||
-type stomp_msg() :: #stomp_msg{}.
|
||||
|
||||
%% STOMP frame components
|
||||
-type headers() :: #{binary() => binary()}.
|
7
net/stomp_erl/src/stomp.app.src
Normal file
7
net/stomp_erl/src/stomp.app.src
Normal file
|
@ -0,0 +1,7 @@
|
|||
{application, stomp, [{description, "STOMP client for Erlang"},
|
||||
{vsn, "0.1.0"},
|
||||
{modules, [stomp_app, stomp_sup, stomp_worker]},
|
||||
{registered, [stomp_worker]},
|
||||
{env, []},
|
||||
{applications, [kernel, stdlib]},
|
||||
{mod, {stomp_app, []}}]}.
|
11
net/stomp_erl/src/stomp_app.erl
Normal file
11
net/stomp_erl/src/stomp_app.erl
Normal file
|
@ -0,0 +1,11 @@
|
|||
-module(stomp_app).
|
||||
-behaviour(application).
|
||||
|
||||
-export([start/2]).
|
||||
-export([stop/1]).
|
||||
|
||||
start(_Type, _Args) ->
|
||||
stomp_sup:start_link().
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
22
net/stomp_erl/src/stomp_sup.erl
Normal file
22
net/stomp_erl/src/stomp_sup.erl
Normal file
|
@ -0,0 +1,22 @@
|
|||
-module(stomp_sup).
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
Procs = [stomp_spec()],
|
||||
{ok, {{one_for_one, 1, 5}, Procs}}.
|
||||
|
||||
%% Private
|
||||
|
||||
stomp_spec() ->
|
||||
#{id => stomp_proc,
|
||||
start => {stomp_worker, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 5000,
|
||||
type => worker,
|
||||
module => [stomp_worker]}.
|
193
net/stomp_erl/src/stomp_worker.erl
Normal file
193
net/stomp_erl/src/stomp_worker.erl
Normal file
|
@ -0,0 +1,193 @@
|
|||
-module(stomp_worker).
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API.
|
||||
-export([start_link/0]).
|
||||
|
||||
%% gen_server.
|
||||
-export([init/1]).
|
||||
-export([handle_call/3]).
|
||||
-export([handle_cast/2]).
|
||||
-export([handle_info/2]).
|
||||
-export([terminate/2]).
|
||||
-export([code_change/3]).
|
||||
|
||||
%% Testing
|
||||
-compile(export_all).
|
||||
|
||||
-include("stomp.hrl").
|
||||
|
||||
%% State of a stomp_worker
|
||||
-record(state, {connection :: port(),
|
||||
next_sub :: sub_id(),
|
||||
subscriptions :: #{ destination() => sub_id() },
|
||||
subscribers :: #{ destination() => pid() }
|
||||
}).
|
||||
-type state() :: #state{}.
|
||||
|
||||
%% API implementation
|
||||
|
||||
-spec start_link() -> {ok, pid()}.
|
||||
start_link() ->
|
||||
{ok, Pid} = gen_server:start_link(?MODULE, [], []),
|
||||
register(?MODULE, Pid),
|
||||
{ok, Pid}.
|
||||
|
||||
%% gen_server implementation
|
||||
|
||||
-spec init(any()) -> {ok, state()}.
|
||||
init(_Args) ->
|
||||
%% Fetch configuration from app config
|
||||
{ok, Host} = application:get_env(stomp, host),
|
||||
Port = application:get_env(stomp, port, 61613),
|
||||
Login = application:get_env(stomp, login),
|
||||
Pass = application:get_env(stomp, passcode),
|
||||
|
||||
%% Catch exit signals from linked processes (subscribers dying)
|
||||
process_flag(trap_exit, true),
|
||||
|
||||
%% Establish connection
|
||||
{ok, Conn} = connect(Host, Port, Login, Pass),
|
||||
|
||||
{ok, #state{connection = Conn,
|
||||
next_sub = 0,
|
||||
subscriptions = #{},
|
||||
subscribers = #{}}}.
|
||||
|
||||
%% Handle subscription calls
|
||||
handle_call({subscribe, Dest, Ack}, From, State) ->
|
||||
%% Subscribe to new destination
|
||||
SubId = State#state.next_sub,
|
||||
ok = subscribe(State#state.connection, SubId, Dest, Ack),
|
||||
|
||||
%% Add subscription and subscriber to state
|
||||
Subscriptions = maps:put(SubId, Dest, State#state.subscriptions),
|
||||
Subscribers = maps:put(SubId, From, State#state.subscribers),
|
||||
NextSub = SubId + 1,
|
||||
NewState = State#state{subscriptions = Subscriptions,
|
||||
subscribers = Subscribers,
|
||||
next_sub = NextSub },
|
||||
|
||||
{reply, {ok, SubId}, NewState};
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_info({tcp, Conn, Frame}, State) when Conn =:= State#state.connection ->
|
||||
handle_frame(Frame, State);
|
||||
handle_info(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% Unused gen_server callbacks
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%% Private functions
|
||||
|
||||
-spec connect(list(), integer(), any(), any()) -> {ok, port()}.
|
||||
connect(Host, Port, Login, Pass) ->
|
||||
%% STOMP CONNECT frame
|
||||
Connect = connect_frame(Host, Login, Pass),
|
||||
|
||||
%% TODO: Configurable buffer size
|
||||
%% Frames larger than the user-level buffer will be truncated, so it should
|
||||
%% never be smaller than the largest expected messages.
|
||||
{ok, Socket} = gen_tcp:connect(Host, Port, [binary,
|
||||
{packet, line},
|
||||
{line_delimiter, $\0},
|
||||
{buffer, 262144}]),
|
||||
|
||||
ok = gen_tcp:send(Socket, Connect),
|
||||
{ok, Socket}.
|
||||
|
||||
-spec subscribe(port(), sub_id(), destination(), ack_mode()) -> ok.
|
||||
subscribe(Socket, Id, Queue, Ack) ->
|
||||
{ok, SubscribeFrame} = subscribe_frame(Id, Queue, Ack),
|
||||
gen_tcp:send(Socket, SubscribeFrame).
|
||||
|
||||
%%% Parsing STOMP frames
|
||||
|
||||
handle_frame(<<"MESSAGE", "\n", _Frame/binary>>, State) ->
|
||||
{noreply, State};
|
||||
handle_frame(Frame, State) ->
|
||||
io:format("Received unknown frame ~p", [Frame]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
%% Parse out headers into a map
|
||||
-spec parse_headers(binary()) -> headers().
|
||||
parse_headers(HeadersBin) ->
|
||||
Headers = binary:split(HeadersBin, <<"\n">>, [global]),
|
||||
ToPairs = fun(H, M) -> [K,V | []] = binary:split(H, <<":">>),
|
||||
maps:put(K, V, M)
|
||||
end,
|
||||
{ok, lists:mapfoldl(ToPairs, #{}, Headers)}.
|
||||
|
||||
%%% Making STOMP protocol frames
|
||||
|
||||
%% Format a header
|
||||
-spec format_header({binary(), binary()}) -> binary().
|
||||
format_header({Key, Val}) ->
|
||||
<<Key/binary, ":", Val/binary, "\n">>.
|
||||
|
||||
%% Build a single STOMP frame
|
||||
-spec make_frame(binary(),
|
||||
headers(),
|
||||
binary())
|
||||
-> {ok, iolist()}.
|
||||
make_frame(Command, HeaderMap, Body) ->
|
||||
Headers = lists:map(fun format_header/1, maps:to_list(HeaderMap)),
|
||||
Frame = [Command, <<"\n">>, Headers, <<"\n">>, Body, <<0>>],
|
||||
{ok, Frame}.
|
||||
|
||||
%%% Default frames
|
||||
|
||||
-spec connect_frame(list(), any(), any()) -> iolist().
|
||||
connect_frame(Host, {ok, Login}, {ok, Pass}) ->
|
||||
make_frame(<<"CONNECT">>,
|
||||
#{<<"accept-version">> => <<"1.2">>,
|
||||
<<"host">> => Host,
|
||||
<<"login">> => Login,
|
||||
<<"passcode">> => Pass,
|
||||
<<"heart-beat">> => <<"0,5000">>},
|
||||
[]);
|
||||
connect_frame(Host, _Login, _Pass) ->
|
||||
make_frame(<<"CONNECT">>,
|
||||
#{<<"accept-version">> => <<"1.2">>,
|
||||
<<"host">> => Host,
|
||||
%% Expect a server heartbeat every 5 seconds, let the server
|
||||
%% expect one every 10. We don't actually check this and just
|
||||
%% echo server heartbeats.
|
||||
%% TODO: For now the server is told not to expect replies due to
|
||||
%% a weird behaviour.
|
||||
<<"heart-beat">> => <<"0,5000">>},
|
||||
[]).
|
||||
|
||||
|
||||
-spec subscribe_frame(sub_id(), destination(), ack_mode()) -> iolist().
|
||||
subscribe_frame(Id, Queue, Ack) ->
|
||||
make_frame(<<"SUBSCRIBE">>,
|
||||
#{<<"id">> => integer_to_binary(Id),
|
||||
<<"destination">> => Queue,
|
||||
<<"ack">> => ack_mode_to_binary(Ack)},
|
||||
[]).
|
||||
|
||||
-spec ack_mode_to_binary(ack_mode()) -> binary().
|
||||
ack_mode_to_binary(AckMode) ->
|
||||
case AckMode of
|
||||
auto -> <<"auto">>;
|
||||
client -> <<"client">>;
|
||||
client_individual -> <<"client-individual">>
|
||||
end.
|
||||
|
||||
%% -spec ack_frame(binary()) -> iolist().
|
||||
%% ack_frame(MessageID) ->
|
||||
%% make_frame(<<"ACK">>,
|
||||
%% [{"id", MessageID}],
|
||||
%% []).
|
Loading…
Reference in a new issue