ubKey = maps:get(public, KeyPair),gen_server:reply(From, {ok, PubKey}),event_loop(State);
NewState = handle_system_messages(nodePubKey, {From}, State),event_loop(NewState);{peerList, From} ->NewState = handle_system_messages(peerList, {From}, State),event_loop(NewState);{channelsList, From} ->NewState = handle_system_messages(channelsList, {From}, State),event_loop(NewState);{stateChange, Chan} ->NewState = handle_system_messages(stateChange, {Chan}, State),event_loop(NewState);
%% handle disconnects by removing active requets{ #{ lastSeenAt := LastSeen,bytesTransmitted := {Sent, Received}},NewPeers} = maps:take(Peer, Peers),io:format("[Peer:~p] lost | LastSeen: ~p | Sent: ~p | Received: ~p~n",[Peer, LastSeen, human_bytesize(Sent), human_bytesize(Received)]),%% fetch all request IDs for this peerFilterReqs = fun(_Chan, ChanReqs, MatchedReqs) ->MatchRequests = fun(Req, Matched) ->case Req of{_, ReqId, Peer} ->maps:put(ReqId, true, Matched);_Other -> % other requestsMatchedendend,NewMatched = lists:foldl(MatchRequests, MatchedReqs, ChanReqs),NewMatchedend,PeersRequestIds = maps:fold(FilterReqs, #{}, Chans),%% create new versions of active in and out without thoseRmActives = fun(ReqId, _) -> not maps:is_key(ReqId, PeersRequestIds) end,NewIn = maps:filter(RmActives, ActiveIn),NewOut = maps:filter(RmActives, ActiveOut),%% now mutate all channelsRmRequests = fun(ChanName, AccChans) ->{ChanReqs, NewChanAcc} = maps:take(ChanName, AccChans),FilterReqsOfPeer = fun(Req) ->case Req of{Direction, ReqId, Peer} ->io:format("[DEBUG Peer:~p] dropping ~p request ~p~n",[Peer, Direction, hex:bin_to_hexstr(ReqId)]),false;_Other -> % other requeststrueendend,NewReqs = lists:filter(FilterReqsOfPeer, ChanReqs),maps:put(ChanName, NewReqs, NewChanAcc)end,NewChans = lists:foldl(RmRequests, Chans, maps:keys(Chans)),event_loop(State#state{peers = NewPeers,channels = NewChans,activeOut = NewOut,activeIn = NewIn});
NewState = handle_peer_messages(peerLost, {Peer}, State),event_loop(NewState);
%% For each channel we are in, send our usual requests and update the state mapsEachChan = fun(Chan, {AccNewReqs, AccChannels, PeerSent}) ->{ok, {StateReqId, StateMsg, StateSize}} = send_channel_state_request(Peer, Chan, true),End = 0,Start = os:system_time(1000) - (12*60*60 * 1000),{ok, {TimeReqId, TimeMsg, TimeSize}} = send_channel_time_range_request(Peer, Chan, Start, End, 200),ReqsForChan = maps:get(Chan, AccChannels, [])++ [{sent, StateReqId, Peer}]++ [{sent, TimeReqId, Peer}],{AccNewReqs ++ [{StateReqId, StateMsg}, {TimeReqId, TimeMsg}],maps:update(Chan, ReqsForChan, AccChannels),PeerSent + StateSize + TimeSize
NewState = handle_peer_messages(peerNew, {Peer}, State),event_loop(NewState);
}end,{NewRequests, NewChans, PeerSent} = lists:foldl(EachChan, {[], Chans, 0}, maps:keys(Chans)),
%% Channel messages{setOwnNick, From, Nick} ->NewState = handle_channel_messages(setOwnNick, {From, Nick}, State),event_loop(NewState);{channelsJoin, Chan} ->NewState = handle_channel_messages(channelsJoin, {Chan}, State),event_loop(NewState);{channelsLeave, Chan} ->NewState = handle_channel_messages(channelsLeave, {Chan}, State),event_loop(NewState);{channelsSetTopic, From, Chan, Topic} ->NewState = handle_channel_messages(channelsSetTopic, {From, Chan, Topic}, State),event_loop(NewState);{channelsMembers, From, Chan} ->NewState = handle_channel_messages(channelsMembers, {From, Chan}, State),event_loop(NewState);
%% Update the helper map for active outgoing reqsNewOut = lists:foldl(fun({ReqId, Msg}, AccActiveOut) ->maps:put(ReqId, {Peer, Msg}, AccActiveOut)end, ActiveOut, NewRequests),{ok, {Addr, Port}} = inet:peername(Peer),PeerMetadata = #{address => {Addr, Port},lastSeenAt => os:system_time(1000),bytesTransmitted => {PeerSent, 0}},NewState = State#state{peers = maps:put(Peer, PeerMetadata, Peers),channels = NewChans,activeOut = NewOut},io:format("[Peer] Tracking new connection ~p ~n", [Peer]),%Handler = proplists:get_value(eventLoop, State),Handler = self(),spawn(fun() -> decode_loop(Handler, Peer) end),
%% Database messages{readTextsFromChannel, From, Chan} ->NewState = handle_database_messages(readTextsFromChannel, {From, Chan}, State),event_loop(NewState);{writeTextToChannel, From, Chan, Text} ->NewState = handle_database_messages(writeTextToChannel, {From, Chan, Text}, State),
{peerList, From} ->EachPeer = fun(PeerId, PeerMeta, Acc) ->Acc ++ [{PeerId, PeerMeta}]end,PeerList = maps:fold(EachPeer, [], Peers),gen_server:reply(From, PeerList),event_loop(State);
%% Network messages{incomingMsg, Peer, Msg, MsgSize} ->NewState = handle_network_messages(incomingMsg, {Peer, Msg, MsgSize}, State),event_loop(NewState);
{setOwnNick, From, Nick} ->{ok, Links} = db:get_channel_heads(Db, null),Bin = posts:encode(KeyPair, Links, {info, {name, Nick}}),{ok, _, PostHash} = db:save_post(Db, Bin),gen_server:reply(From, ok),%% sent out to (all?) open channel state requestsF = fun(ReqId, {Peer, [Header, _]}, AccPeers) ->case proplists:get_value(type, Header) of5 ->{ok, {_, _, Size}} = send_hash_response(Peer, ReqId, [PostHash]),update_peer_sent(AccPeers, Peer, Size);_ -> AccPeersendend,SentPeers = maps:fold(F, Peers, ActiveIn),event_loop(State#state{peers = SentPeers});
%% Unhandled messagesOther ->io:format("Unhandled message in event_loop: ~p~n", [Other]),event_loop(State)end.
{readTextsFromChannel, From, Chan} ->{ok, Texts} = db:get_texts_for_channel(Db, Chan),gen_server:reply(From, {ok, Texts}),event_loop(State);
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Message Handler Functions - Modular Design %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Handle system-level messageshandle_system_messages(nodePubKey, {From}, State = #state{keyPair = KeyPair}) ->PubKey = maps:get(public, KeyPair),gen_server:reply(From, {ok, PubKey}),State;handle_system_messages(peerList, {From}, State = #state{peers = Peers}) ->EachPeer = fun(PeerId, PeerMeta, Acc) ->Acc ++ [{PeerId, PeerMeta}]end,PeerList = maps:fold(EachPeer, [], Peers),gen_server:reply(From, PeerList),State;handle_system_messages(channelsList, {From}, State = #state{channels = Chans}) ->gen_server:reply(From, {ok, maps:keys(Chans)}),State;handle_system_messages(stateChange, {Chan}, State) ->io:format("[DEBUG] state change of ~p~n", [Chan]),State;handle_system_messages(Message, Args, State) ->io:format("Unhandled system message: ~p with args ~p~n", [Message, Args]),State.
{writeTextToChannel, From, Chan, Text} ->case maps:is_key(Chan, Chans) offalse ->gen_server:reply(From, {error, notInChannel}),event_loop(State);true ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {text, Chan, Text}),{ok, _, PostHash} = db:save_post(Db, Bin),%% find incoming channel time range (type:4) requests which want this postF = fun({Direction, ReqId, Peer}, AccPeers) ->case Direction ofreceived ->{Peer, [Header, _]} = maps:get(ReqId, ActiveIn),case proplists:get_value(msgType, Header) of4 ->{ok, {_, _, Size}}= send_hash_response(Peer, ReqId, [PostHash]),update_peer_sent(AccPeers, Peer, Size);_ -> AccPeersend;sent -> AccPeersendend,SentPeers = lists:foldl(F, Peers, maps:get(Chan, Chans)),io:format("[Wrote] #~p: ~p~n", [Chan, Text]),gen_server:reply(From, ok),event_loop(State#state{peers = SentPeers})end;
%% Handle peer-related messageshandle_peer_messages(peerLost, {Peer}, State = #state{peers = Peers, channels = Chans, activeIn = ActiveIn, activeOut = ActiveOut}) ->%% handle disconnects by removing active requests{ #{ lastSeenAt := LastSeen,bytesTransmitted := {Sent, Received}},NewPeers} = maps:take(Peer, Peers),io:format("[Peer:~p] lost | LastSeen: ~p | Sent: ~p | Received: ~p~n",[Peer, LastSeen, human_bytesize(Sent), human_bytesize(Received)]),%% fetch all request IDs for this peerFilterReqs = fun(_Chan, ChanReqs, MatchedReqs) ->MatchRequests = fun(Req, Matched) ->case Req of{_, ReqId, Peer} ->maps:put(ReqId, true, Matched);_Other -> % other requestsMatchedendend,NewMatched = lists:foldl(MatchRequests, MatchedReqs, ChanReqs),NewMatchedend,PeersRequestIds = maps:fold(FilterReqs, #{}, Chans),%% create new versions of active in and out without thoseRmActives = fun(ReqId, _) -> not maps:is_key(ReqId, PeersRequestIds) end,NewIn = maps:filter(RmActives, ActiveIn),NewOut = maps:filter(RmActives, ActiveOut),%% now mutate all channelsRmRequests = fun(ChanName, AccChans) ->{ChanReqs, NewChanAcc} = maps:take(ChanName, AccChans),FilterReqsOfPeer = fun(Req) ->case Req of{Direction, ReqId, Peer} ->io:format("[DEBUG Peer:~p] dropping ~p request ~p~n",[Peer, Direction, hex:bin_to_hexstr(ReqId)]),false;_Other -> % other requeststrueend
{channelsSetTopic, From, Chan, Topic} ->case maps:is_key(Chan, Chans) offalse -> % not even joinedgen_server:reply(From, {error, notInChannel}),event_loop(State);true ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {topic, Chan, Topic}),{ok, _, PostHash} = db:save_post(Db, Bin),SentPeers = lists:foldl(fun({received, ReqId, Peer}, AccPeers) ->
end,NewReqs = lists:filter(FilterReqsOfPeer, ChanReqs),maps:put(ChanName, NewReqs, NewChanAcc)end,NewChans = lists:foldl(RmRequests, Chans, maps:keys(Chans)),State#state{peers = NewPeers,channels = NewChans,activeOut = NewOut,activeIn = NewIn};handle_peer_messages(peerNew, {Peer}, State = #state{channels = Chans, peers = Peers, activeOut = ActiveOut, db = Db, keyPair = KeyPair}) ->%% For each channel we are in, send our usual requests and update the state mapsEachChan = fun(Chan, {AccNewReqs, AccChannels, PeerSent}) ->{ok, {StateReqId, StateMsg, StateSize}} = send_channel_state_request(Peer, Chan, true),End = 0,Start = os:system_time(1000) - (12*60*60 * 1000),{ok, {TimeReqId, TimeMsg, TimeSize}} = send_channel_time_range_request(Peer, Chan, Start, End, 200),ReqsForChan = maps:get(Chan, AccChannels, [])++ [{sent, StateReqId, Peer}]++ [{sent, TimeReqId, Peer}],{AccNewReqs ++ [{StateReqId, StateMsg}, {TimeReqId, TimeMsg}],maps:update(Chan, ReqsForChan, AccChannels),PeerSent + StateSize + TimeSize}end,{NewRequests, NewChans, PeerSent} = lists:foldl(EachChan, {[], Chans, 0}, maps:keys(Chans)),%% Update the helper map for active outgoing reqsNewOut = lists:foldl(fun({ReqId, Msg}, AccActiveOut) ->maps:put(ReqId, {Peer, Msg}, AccActiveOut)end, ActiveOut, NewRequests),{ok, {Addr, Port}} = inet:peername(Peer),PeerMetadata = #{address => {Addr, Port},lastSeenAt => os:system_time(1000),bytesTransmitted => {PeerSent, 0}},NewState = State#state{peers = maps:put(Peer, PeerMetadata, Peers),channels = NewChans,activeOut = NewOut},io:format("[Peer] Tracking new connection ~p ~n", [Peer]),%Handler = proplists:get_value(eventLoop, State),Handler = self(),spawn(fun() -> decode_loop(Handler, Peer) end),NewState;handle_peer_messages(Message, Args, State) ->io:format("Unhandled peer message: ~p with args ~p~n", [Message, Args]),State.%% Handle channel-related messageshandle_channel_messages(setOwnNick, {From, Nick}, State = #state{db = Db, keyPair = KeyPair, activeIn = ActiveIn, peers = Peers}) ->{ok, Links} = db:get_channel_heads(Db, null),Bin = posts:encode(KeyPair, Links, {info, {name, Nick}}),{ok, _, PostHash} = db:save_post(Db, Bin),gen_server:reply(From, ok),%% sent out to (all?) open channel state requestsF = fun(ReqId, {Peer, [Header, _]}, AccPeers) ->case proplists:get_value(type, Header) of5 ->{ok, {_, _, Size}} = send_hash_response(Peer, ReqId, [PostHash]),update_peer_sent(AccPeers, Peer, Size);_ -> AccPeersendend,SentPeers = maps:fold(F, Peers, ActiveIn),State#state{peers = SentPeers};handle_channel_messages(channelsJoin, {Chan}, State = #state{channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeOut = ActiveOut}) ->case maps:is_key(Chan, Chans) oftrue -> % already joinedState;false ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {join, Chan}),{ok, _, _PostHash} = db:save_post(Db, Bin),ok = db:channels_join(Db, Chan),self() ! {stateChange, Chan},%% send requests to each connectionEachPeer = fun(Peer, {AccPeers, AccActiveOuts}) ->%% TODO: handle send fail{ok, {ReqId, Msg, Size}} = send_channel_state_request(Peer, Chan, true),{{sent, ReqId, Peer},{maps:put(ReqId, {Peer, Msg}, AccActiveOuts),update_peer_sent(AccPeers, Peer, Size)}}end,{Reqs, {NewPeers, NewActiveOut}} = lists:mapfoldl(EachPeer, {Peers, ActiveOut}, maps:keys(Peers)),State#state{channels = maps:put(Chan, Reqs, Chans),peers = NewPeers,activeOut = NewActiveOut}end;handle_channel_messages(channelsLeave, {Chan}, State = #state{channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeOut = ActiveOut}) ->case maps:is_key(Chan, Chans) offalse -> % not even joinedState;true ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {leave, Chan}),{ok, _Id, _Hash} = db:save_post(Db, Bin),ok = db:channels_leave(Db, Chan),self() ! {stateChange, Chan},%% drop the channel and all requests we had for it{ChanRequests, NewChans} = maps:take(Chan, Chans),EachReq = fun({_Direction, ReqId, Peer}, {AccActiveOuts, AccPeers}) ->{{Peer, _}, NewActiveOuts} = maps:take(ReqId, AccActiveOuts),%% TODO: handle send fail{ok, {_, _, Size}} = send_cancel_request(Peer, ReqId),{NewActiveOuts,update_peer_sent(AccPeers, Peer, Size)}end,{NewActiveOut, NewPeers} = lists:foldl(EachReq, {ActiveOut, Peers}, ChanRequests),State#state{channels = NewChans,activeOut = NewActiveOut,peers = NewPeers}end;handle_channel_messages(channelsSetTopic, {From, Chan, Topic}, State = #state{channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeIn = ActiveIn}) ->case maps:is_key(Chan, Chans) offalse -> % not even joinedgen_server:reply(From, {error, notInChannel}),State;true ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {topic, Chan, Topic}),{ok, _, PostHash} = db:save_post(Db, Bin),SentPeers = lists:foldl(fun({received, ReqId, Peer}, AccPeers) ->
{channelsMembers, From, Chan} ->case maps:is_key(Chan, Chans) offalse -> % already joinedgen_server:reply(From, {error, notInChannel});true ->Result = db:get_channel_members(Db, Chan),gen_server:reply(From, Result)end,event_loop(State);
handle_channel_messages(channelsMembers, {From, Chan}, State = #state{channels = Chans, db = Db}) ->case maps:is_key(Chan, Chans) offalse -> % already joinedgen_server:reply(From, {error, notInChannel});true ->Result = db:get_channel_members(Db, Chan),gen_server:reply(From, Result)end,State;
{channelsJoin, Chan} ->case maps:is_key(Chan, Chans) oftrue -> % already joinedevent_loop(State);false ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {join, Chan}),{ok, _, _PostHash} = db:save_post(Db, Bin),ok = db:channels_join(Db, Chan),self() ! {stateChange, Chan},%% send requests to each connectionEachPeer = fun(Peer, {AccPeers, AccActiveOuts}) ->%% TODO: handle send fail{ok, {ReqId, Msg, Size}} = send_channel_state_request(Peer, Chan, true),{{sent, ReqId, Peer},{maps:put(ReqId, {Peer, Msg}, AccActiveOuts),update_peer_sent(AccPeers, Peer, Size)}}end,{Reqs, {NewPeers, NewActiveOut}} = lists:mapfoldl(EachPeer, {Peers, ActiveOut}, maps:keys(Peers)),event_loop(State#state{channels = maps:put(Chan, Reqs, Chans),peers = NewPeers,activeOut = NewActiveOut})end;
handle_channel_messages(Message, Args, State) ->io:format("Unhandled channel message: ~p with args ~p~n", [Message, Args]),State.%% Handle database-related messageshandle_database_messages(readTextsFromChannel, {From, Chan}, State = #state{db = Db}) ->{ok, Texts} = db:get_texts_for_channel(Db, Chan),gen_server:reply(From, {ok, Texts}),State;handle_database_messages(writeTextToChannel, {From, Chan, Text}, State = #state{channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeIn = ActiveIn}) ->case maps:is_key(Chan, Chans) offalse ->gen_server:reply(From, {error, notInChannel}),State;true ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {text, Chan, Text}),{ok, _, PostHash} = db:save_post(Db, Bin),%% find incoming channel time range (type:4) requests which want this postF = fun({Direction, ReqId, Peer}, AccPeers) ->case Direction ofreceived ->{Peer, [Header, _]} = maps:get(ReqId, ActiveIn),case proplists:get_value(msgType, Header) of4 ->{ok, {_, _, Size}}= send_hash_response(Peer, ReqId, [PostHash]),update_peer_sent(AccPeers, Peer, Size);_ -> AccPeersend;sent -> AccPeersendend,SentPeers = lists:foldl(F, Peers, maps:get(Chan, Chans)),io:format("[Wrote] #~p: ~p~n", [Chan, Text]),gen_server:reply(From, ok),State#state{peers = SentPeers}end;
{channelsLeave, Chan} ->case maps:is_key(Chan, Chans) offalse -> % not even joinedevent_loop(State);true ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {leave, Chan}),{ok, _Id, _Hash} = db:save_post(Db, Bin),ok = db:channels_leave(Db, Chan),self() ! {stateChange, Chan},%% drop the channel and all requests we had for it{ChanRequests, NewChans} = maps:take(Chan, Chans),EachReq = fun({_Direction, ReqId, Peer}, {AccActiveOuts, AccPeers}) ->{{Peer, _}, NewActiveOuts} = maps:take(ReqId, AccActiveOuts),%% TODO: handle send fail{ok, {_, _, Size}} = send_cancel_request(Peer, ReqId),{NewActiveOuts,update_peer_sent(AccPeers, Peer, Size)}end,{NewActiveOut, NewPeers} = lists:foldl(EachReq, {ActiveOut, Peers}, ChanRequests),event_loop(State#state{channels = NewChans,activeOut = NewActiveOut,peers = NewPeers})end;
handle_database_messages(Message, Args, State) ->io:format("Unhandled database message: ~p with args ~p~n", [Message, Args]),State.
{channelsList, From} ->gen_server:reply(From, {ok, maps:keys(Chans)}),event_loop(State);
%% Handle network-related messages (incoming messages processing)handle_network_messages(incomingMsg, {Peer, Msg, MsgSize}, State) ->handle_incoming_message(State, Peer, Msg, MsgSize);
{stateChange, Chan} ->io:format("[DEBUG] state change of ~p~n", [Chan]),event_loop(State);
handle_network_messages(Message, Args, State) ->io:format("Unhandled network message: ~p with args ~p~n", [Message, Args]),State.