encode_channel_list_response(Header, Channels) when is_list(Channels) ->[{requestId, RequestId}, {circuitId, CircuitId}] = Header,ChannelsBin = lists:map(fun(Chan) ->BinChan = unicode:characters_to_binary(Chan),ChanLen = encode_varint(byte_size(BinChan)),[ChanLen, BinChan]end, Channels),length_encode_fields([<<7>>, CircuitId, RequestId, ChannelsBin, 0]).
-record(state, {db, connections=#{}, channels=#{}, activeOut=#{}, activeIn=#{}}).
%% db: SqliteHandle%%%% #channels{%% #foo => [%% {sent, ReqId, PeerX},%% {received, ReqId, PeerX},%% ]%% }%%%% #peers{%% PeerX => #{%% lastSeenAt => UnixTs,%% bytesTransmitted => [Sent, Received],%% %% TODO:%% address => "so.me:port",%% pubKey => ?%% }%% }%%%%%% helpers to not skim all channels to handle messages%% #activeIn{ ReqId => {PeerX, Msg} }%% #activeOut{ ReqId => {PeerX, Msg} }-record(state, {db, peers=#{}, channels=#{}, activeOut=#{}, activeIn=#{}}).
{ok, Sock} = gen_tcp:connect(Host, Port, [binary, {packet, 0}]),io:format("Connection established (~p:~p)~n", [Host, Port]),EL = proplists:get_value(eventLoop, State),EL ! {newPeer, Sock},
case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}]) of{error, Reason} ->io:format("[Peer] Unable to connect to ~p:~p - ~p~n", [Host, Port, Reason]);{ok, Sock} ->io:format("[Peer] Connecting to ~p:~p~n", [Host, Port]),Handler = proplists:get_value(eventLoop, State),spawn_link(fun() -> decode_loop(Handler, Sock) end),EL = proplists:get_value(eventLoop, State),EL ! {newPeer, Sock}end,
{newPeer, Peer} ->EachChan = fun(Chan, AccActiveOut) ->ReqId = send_channel_state_request(Peer, Chan, true),{ReqId, maps:put(ReqId, {sent, Peer}, AccActiveOut)}
{peerLost, Peer} ->%% handle disconnects by removing active requets{ #{ lastSeenAt := LastSeen,bytesTransmitted := [Sent, Received]},NewPeers} = maps:take(Peer, Peers),io:format("[Peer:~p] lost | LastSeen: ~p | Sent: ~p | Received: ~p~n", [Peer, LastSeen, human_bytesize(Sent), human_bytesize(Received)]),%% fetch all request IDs for this peerFilterReqs = fun(_Chan, ChanReqs, MatchedReqs) ->MatchRequests = fun(Req, Matched) ->case Req of{_, ReqId, Peer} ->maps:put(ReqId, true, Matched);_Other -> % other requestsMatchedendend,NewMatched = lists:foldl(MatchRequests, MatchedReqs, ChanReqs),NewMatchedend,PeersRequestIds = maps:fold(FilterReqs, #{}, Chans),%% create new versions of active in and out without thoseFilterReqs = fun(ReqId) -> not maps:is_key(ReqId, PeersRequestIds) end,NewIn = lists:filter(FilterReqs, ActiveIn),NewOut = lists:filter(FilterReqs, ActiveOut),%% now mutate all channelsRmRequests = fun(ChanName, AccChans) ->ChanReqs = maps:take(ChanName, AccChans),FilterReqsOfPeer = fun(Req) ->case Req of{Direction, ReqId, Peer} ->io:format("[DEBUG Peer:~p] dropping ~p request ~p~n", [Peer, Direction, hex:binary_to_hexstr(ReqId)]),false;_Other -> % other requeststrueendend,NewChan = lists:filter(FilterReqsOfPeer, ChanReqs),NewChanend,NewChans = lists:foldl(RmRequests, Chans, maps:keys(Chans)),NewState = State#state{peers = NewPeers,channels = NewChans,activeOut = NewOut,activeIn = NewIn},event_loop(NewState);{peerNew, Peer} ->%% For each channel we are in, send our usual requests and update the state mapsEachChan = fun(Chan, {AccChannels, PeerSent}) ->{ok, {ReqId, Msg, Size}} = send_channel_state_request(Peer, Chan, true),ReqsForChan = maps:get(Chan, AccChannels, []),{{ReqId, Msg},{maps:update(Chan, ReqsForChan ++ [{sent, ReqId, Peer}]),PeerSent + Size}}
{ChanReqs, NewActiveOut} = lists:mapfoldl(EachChan, ActiveOut, maps:keys(Chans)),NewConns = maps:put(Peer, ChanReqs, Peers),NewState = State#state{connections = NewConns, activeOut = NewActiveOut},
{NewRequests, {NewChans, PeerSent}} = lists:mapfoldl(EachChan, {Chans, 0}, maps:keys(Chans)),%% Update the helper map for active outgoing onesNewOut = lists:foldl(fun({ReqId, Msg}, AccActiveOut) ->map:put(ReqId, {Peer, Msg}, AccActiveOut)end, ActiveOut, NewRequests),{ok, {Addr, Port}} = inet:peername(Peer),PeerMetadata = #{address => io_lib:format("~p:~p", [Addr, Port]),lastSeenAt => os:system_time(1000),bytesTransmitted => [PeerSent, 0]},NewState = State#state{peers = maps:put(Peer, PeerMetadata, Peers),channels = NewChans,activeOut = NewOut},
{ok, ReqId} = send_channel_state_request(Peer, Chan, true),{ReqId, maps:put(ReqId, {sent, Peer}, AccActives)}
%% TODO: push post into open requests%% TODO: handle send fail%% TODO: handle size{ok, {ReqId, Msg, Size}} = send_channel_state_request(Peer, Chan, true),{{sent, ReqId, Peer},{maps:put(ReqId, {Peer, Msg}, AccActiveOuts),update_peer_sent(AccPeers, Peer, Size)}}
{ReqIds, NewActiveOut} = lists:mapfoldl(EachPeer, ActiveOut, maps:keys(Peers)),NewChans = maps:put(Chan, ReqIds, Chans),NewState = State#state{channels = NewChans, activeOut = NewActiveOut},event_loop(NewState)
{Reqs, {NewPeers, NewActiveOut}} = lists:mapfoldl(EachPeer, {Peers, ActiveOut}, maps:keys(Peers)),event_loop(State#state{channels = maps:put(Chan, Reqs, Chans),peers = NewPeers,activeOut = NewActiveOut})
{{sent, Conn}, NewActive} = maps:take(ReqId, AccActives),CancelReqId = send_cancel_request(Conn, ReqId),{ok, maps:put(CancelReqId, {sent, Conn}, NewActive)}
%% TODO: push post into open requests{{Peer, _Msg}, NewActiveOuts} = maps:take(ReqId, AccActiveOuts),%% TODO: handle send fail{ok, {CancelReqId, Msg, Size}} = send_cancel_request(Peer, ReqId),{maps:put(CancelReqId, {Peer, Msg}, NewActiveOuts),update_peer_sent(AccPeers, Peer, Size)}
{_OKs, NewActiveOut} = lists:mapfoldl(EachReq, ActiveOut, ReqIds),NewState = State#state{channels = NewChans, activeOut = NewActiveOut},event_loop(NewState)
{NewActiveOut, NewPeers} = lists:foldl(EachReq, {ActiveOut, Peers}, ChanRequests),event_loop(State#state{channels = NewChans,activeOut = NewActiveOut,peers = NewPeers})
{newOutRequest, ReqId, Conn} ->UpdatedOut = maps:put(ReqId, {sent, Conn}, ActiveOut),NewState = State#state{activeOut = UpdatedOut},event_loop(NewState);{incomingMsg, Peer, Msg} ->
{incomingMsg, Peer, Msg, MsgSize} ->
PostReqId = send_post_request(Peer, Hashes),UpdatedOut = maps:put(PostReqId, {sent, Peer}, ActiveOut),NewState = State#state{activeOut = UpdatedOut},event_loop(NewState);
{ok, {PostReqId, _PostReqMsg, Size}} = send_post_request(Peer, Hashes),UpdatedOut = maps:put(PostReqId, Peer, ActiveOut),%% update loop stateevent_loop(State#state{peers = update_peer_sent(Peers, Peer, Size),activeOut = UpdatedOut});
false ->NewState = State#state{activeIn = maps:put(ReqId, {received, Peer, Msg}, ActiveIn)},io:format("Received incoming RequestId:~p from ~p~n", [ReqIdStr, Peer]),event_loop(NewState)
false -> %% incoming requestcase MsgType of%% 0, 1 and 7 are responses [hash, post, channel list]2 -> % post reqerlang:error("TODO post req");3 -> % cancel reqerlang:error("TODO cancel req");4 -> % channel time reqerlang:error("TODO channel time req");5 -> % channel state reqRequestedChan = proplists:get_value(channel, Body),case maps:take(RequestedChan, Chans) oferror -> ok; % ignore{ChanReqs, TmpChans} ->%% TODO: send recent channel state hashesio:format("[TEMP] Received incoming state req for ~p from ~p~n", [RequestedChan, Peer]),NewChanReqs = ChanReqs ++ [{received, ReqId, Peer}],event_loop(State#state{peers = update_peer_received(Peers, Peer, MsgSize),channels = maps:put(RequestedChan, NewChanReqs, TmpChans),activeIn = maps:put(ReqId, {Peer, Msg}, ActiveIn)})end;6 -> % channel list requestChanList = maps:keys(Chans),send_channel_list_response(Peer, ReqId, ChanList)end
Message = wire:encode_cancel_request(Header, CancelId),ok = gen_tcp:send(Peer, Message),io:format("DEBUG: sending cancel for request(~p) to ~p~n", [hex:bin_to_hexstr(CancelId), Peer]),ReqId.
Binary = wire:encode_cancel_request(Header, CancelId),send_binary_to_peer(Peer, Binary).
Message = wire:encode_channel_state_request(Header, Channel, Future),ok = gen_tcp:send(Peer, Message),io:format("DEBUG: sending channel state request(~p) for ~p to ~p~n", [hex:bin_to_hexstr(ReqId),Channel, Peer]),ReqId.
Binary = wire:encode_channel_state_request(Header, Channel, Future),send_binary_to_peer(Peer, Binary).
{ttl, 0}],Message = wire:encode_post_request(Header, Hashes),ok = gen_tcp:send(Peer, Message),io:format("DEBUG: sending post request(~p) for ~p hashes to ~p~n", [hex:bin_to_hexstr(ReqId),length(Hashes), Peer]),ReqId.
{ttl, 3}],Binary = wire:encode_post_request(Header, Hashes),send_binary_to_peer(Peer, Binary).send_channel_list_response(Peer, ReqId, Channels) ->Header = [{requestId, ReqId},{circuitId, <<0,0,0,0>>}],Binary = wire:encode_channel_list_response(Header, Channels),send_binary_to_peer(Peer, Binary).%%%%%%%%%%%% helpers %%%%%%%%%%%%send_binary_to_peer(Peer, Binary) ->io:format("[DEBUG] sending ~p to peer ~p~n", [Binary, Peer]),BinarySize = byte_size(Binary),{MsgLen, Payload} = wire:decode_varint(Binary),case byte_size(Payload) =:= MsgLen offalse ->erlang:error("unexpected message size");true ->{ok, Msg} = wire:decode(Payload),case gen_tcp:send(Peer, Binary) ofok ->[Header, _] = Msg,ReqId = proplists:get_value(requestId, Header),{ok, {ReqId, Msg, BinarySize}};{error, Reason} ->{failedToSend, Reason}endend.update_peer_sent(Peers, Peer, Size) when is_map(Peers) and is_integer(Size) ->PeerMeta = maps:get(Peer, Peers),[Tx, Rx] = maps:get(bytesTransmitted, PeerMeta),NewPeers = Peers#{Peer => maps:update(bytesTransmitted, [Tx+Size, Rx], PeerMeta)},NewPeers.update_peer_received(Peers, Peer, Size) when is_map(Peers) and is_integer(Size) ->PeerMeta = maps:get(Peer, Peers),[Tx, Rx] = maps:get(bytesTransmitted, PeerMeta),Peers#{Peer => PeerMeta#{bytesTransmitted => [Tx, Rx+Size],lastSeenAt => os:system_time(1000)}}.human_bytesize(Size) -> human_bytesize(Size, ["b","kb","mb","gb","tb","pb", "eb"]).human_bytesize(S, [_|[_|_] = L]) when S >= 1024 -> human_bytesize(S/1024, L);human_bytesize(S, [M|_]) ->io_lib:format("~.2f ~s", [float(S), M]).