IHBNW3GI2XB6KAWUYRLL6KDOBUNUOU3N7RRLS6BFNW6SE7ZDHLWQC ULS4X3VORQZFBSNPXN76UFH2PWS2MVBA64CJQS3IR4YUHIKEF6GQC 4NDLPBWK3HHQ53RB3R3NHXFJGSZDIBNDHWGU5V7KPBKJUTGICKFAC 55WLMLEEVBRSTAFRZ5RGF7TOGUF5OPVCPA2TMHAQK45OUO7PA3YQC YWCRGWVDIMCPXBQFM23MKCYZCXJJSD733NJEPB6WUU4G4BSREILAC YDZ3R5NY7II7WX4Q5O2UQ72TMCABDPPRLEMTTIBH5DGKOLAOPOPAC PFC6VDDZ55HDVMZYGJJOBUA4PTKDYIOBV7GKHT2FCWUJGXHXAPCAC MONVI5STEDKY5ALVMEXJJXDUX6XQRKTFLP7BBNOQML3VSJEM2JAAC -record(state, {handler, db, connections=0, activeOut = #{}, activeIn= #{}}).
dial(Pid, Addr) ->[HostStr, PortStr] = string:split(Addr, ":"),Host = inet:getaddr(HostStr, inet),Port = list_to_integer(PortStr),gen_server:cast(Pid, {dialPeer, [Host, Port]}).
dial(Host, Port) ->{ok, Sock} = gen_tcp:connect(Host, Port, [binary, {packet, 0}]),io:format("Connection established (~p:~p)~n", [Host, Port]),[{binary, Bin}, _ ] = examples:post_request(),gen_tcp:send(Sock, Bin),timer:sleep(1000),[{binary, Bin2}, _ ] = examples:cancel_request(),gen_tcp:send(Sock, Bin2),timer:sleep(1000),[{binary, Bin3}, _ ] = examples:channel_time_range_request(),gen_tcp:send(Sock, Bin3),timer:sleep(1000),gen_tcp:close(Sock).
join(Pid, Chan) when is_list(Chan) ->gen_server:cast(Pid, {channelJoin, Chan}).
start_server(Host, Port) ->
part(Pid, Chan) when is_list(Chan) ->gen_server:cast(Pid, {channelPart, Chan}).channelList(Pid) ->gen_server:call(Pid, {channelList}).%%%%%%%%%%%%%%%%%% gen_server %%%%%%%%%%%%%%%%%%%% {ok, P} = peer:start_link("localhost", 3133). peer:join(P, "default"). peer:channelList(P).start_link(Host, Port) ->gen_server:start_link({local, cabalPeer}, peer, [{listener, [Host, Port]}], []).init(Args) ->process_flag(trap_exit, true),[Host, Port] = proplists:get_value(listener, Args),
Handler = spawn(fun() -> handle_messages(State) end),server_loop(Handler, Lis).
% TODO: supervise these processesHandler = spawn_link(fun() -> event_loop(State) end),ListenProc = spawn_link(fun() -> listen_loop(Handler, Lis) end),{ok, [{listenProc, ListenProc}, {eventLoop, Handler}]}.
server_loop(Handler, Lis) ->
handle_call({channelList}, From, State) ->EL = proplists:get_value(eventLoop, State),EL ! {channelList, From}, % reply via event loop{noreply, State}.handle_cast({dialPeer, [Host, Port]}, State) ->{ok, Sock} = gen_tcp:connect(Host, Port, [binary, {packet, 0}]),io:format("Connection established (~p:~p)~n", [Host, Port]),EL = proplists:get_value(eventLoop, State),EL ! {newPeer, Sock},{noreply, State};handle_cast({channelJoin, Chan}, State) ->io:format("joining ~p~n", [Chan]),EL = proplists:get_value(eventLoop, State),EL ! {channelJoin, Chan},{noreply, State};handle_cast({channelPart, Chan}, State) ->io:format("leaving ~p~n", [Chan]),EL = proplists:get_value(eventLoop, State),EL ! {channelPart, Chan},{noreply, State};handle_cast(Msg, State) ->io:format("Unexpected cast: ~p~n", [Msg]),{noreply, State}.handle_info(Msg, State) ->io:format("Unexpected message: ~p~n", [Msg]),{noreply, State}.terminate(TermReason, _State) ->io:format("terminating peer: ~p~n", [TermReason]),%% LisProc = proplists:get_value(listenProc, State),%% unlink(LisProc),%% EventLoopProc = proplists:get_value(eventLoop, State),%% unlink(EventLoopProc),ok.code_change(_OldVsn, State, _Extra) ->%% no change planned. The functionm is there for the behaviour,%% but will not be used.{ok, State}.%%%%%%%%%%%%%%% private %%%%%%%%%%%%%%%listen_loop(Handler, Lis) ->
% read incoming messagescase gen_tcp:recv(Conn, 0) of{ok, Data} ->{ok, Msgs} = wire:split_messages(Data),lists:foreach(fun(Chunk) ->{ok, Msg} = wire:decode(Chunk),Handler ! {incomingMsg, Conn, Msg}end, Msgs),decode_loop(Handler, Conn);{error, closed} ->io:format("Connection closed ~p~n", [Conn])end.
%% read incoming messagescase gen_tcp:recv(Conn, 0) of{ok, Data} ->{ok, Msgs} = wire:split_messages(Data),EachMsg = fun(Chunk) ->{ok, Msg} = wire:decode(Chunk),Handler ! {incomingMsg, Conn, Msg}end,lists:foreach(EachMsg, Msgs),decode_loop(Handler, Conn);{error, closed} ->io:format("Connection closed ~p~n", [Conn])end.
{newPeer, Peer} ->EachChan = fun(Chan, AccActiveOut) ->ReqId = send_channel_state_request(Peer, Chan, true),{ReqId, maps:put(ReqId, {sent, Peer}, AccActiveOut)}end,{ChanReqs, NewActiveOut} = lists:mapfoldl(EachChan, ActiveOut, maps:keys(Chans)),NewConns = maps:put(Peer, ChanReqs, Peers),NewState = State#state{connections = NewConns, activeOut = NewActiveOut},event_loop(NewState);{channelJoin, Chan} ->case maps:is_key(Chan, Chans) oftrue -> % already joinedevent_loop(State);false ->%% send requests to each connectionEachPeer = fun(Peer, AccActives) ->%% TODO: Post = posts:encode_join(Key, Chan),{ok, ReqId} = send_channel_state_request(Peer, Chan, true),{ReqId, maps:put(ReqId, {sent, Peer}, AccActives)}end,{ReqIds, NewActiveOut} = lists:mapfoldl(EachPeer, ActiveOut, maps:keys(Peers)),NewChans = maps:put(Chan, ReqIds, Chans),NewState = State#state{channels = NewChans, activeOut = NewActiveOut},event_loop(NewState)end;{channelPart, Chan} ->case maps:is_key(Chan, Chans) offalse -> % not even joinedevent_loop(State);true ->{ReqIds, NewChans} = maps:take(Chan, Chans),EachReq = fun(ReqId, AccActives) ->%% TODO: Post = posts:encode_part(Key, Chan),{{sent, Conn}, NewActive} = maps:take(ReqId, AccActives),CancelReqId = send_cancel_request(Conn, ReqId),{ok, maps:put(CancelReqId, {sent, Conn}, NewActive)}end,{_OKs, NewActiveOut} = lists:mapfoldl(EachReq, ActiveOut, ReqIds),NewState = State#state{channels = NewChans, activeOut = NewActiveOut},event_loop(NewState)end;{channelList, From} ->gen_server:reply(From, maps:keys(Chans)),event_loop(State);
NewState = State#state{activeIn = maps:put(ReqId, {received, Conn, Msg}, ActiveIn)},io:format("Received incoming RequestId:~p from ~p~n", [ReqIdStr, Conn]),handle_messages(NewState)
NewState = State#state{activeIn = maps:put(ReqId, {received, Peer, Msg}, ActiveIn)},io:format("Received incoming RequestId:~p from ~p~n", [ReqIdStr, Peer]),event_loop(NewState)