make_test_args() ->make_test_args("unnamed").
-module(transport).-behavior(gen_server).-export([start_link/1, stop/1, listener_port/1, get_address/1]).-export([register_handler/2, dial/3, send/3]).-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).-record(state, {% Socket for accepting connectionslisten_socket,% Port we're listening onlisten_port,% Node's keypair for encryptionkey_pair,% PID of event handler to notifyevent_handler,% Map of Socket -> {IP, Port}socket_to_peer = #{},% Map of {IP, Port} -> Socketpeer_to_socket = #{}}).%% Public API%% Start the transport server%% Args:%% - {listen_addr, {IP, Port}}: Address to listen on%% - {key_pair, KeyPair}: Node's keypair for encryption%% - {event_handler, HandlerPid}: PID of event handler to process eventsstart_link(Args) ->gen_server:start_link(?MODULE, Args, []).%% Stop the transport serverstop(Pid) ->gen_server:call(Pid, stop).%% Register a process to handle connection eventsregister_handler(Pid, HandlerPid) ->gen_server:call(Pid, {register_handler, HandlerPid}).%% Connect to a remote peerdial(Pid, Host, Port) ->gen_server:call(Pid, {dial, Host, Port}).%% Send a binary message to a peersend(Pid, Peer, Binary) ->gen_server:call(Pid, {send, Peer, Binary}).listener_port(Pid) ->gen_server:call(Pid, {listener_port}).get_address(Pid) ->gen_server:call(Pid, {get_address}).%% gen_server callbacksinit(Args) ->% Extract configuration{IP, Port} = proplists:get_value(listen_addr, Args),KeyPair = proplists:get_value(key_pair, Args),Handler = proplists:get_value(event_handler, Args),% Check that we have a handlerifHandler =:= undefined ->{stop, {error, no_event_handler_provided}};true ->% Open a listening socketListenOpts = [binary,{packet, 0},{active, false},{reuseaddr, true}],{ok, ListenSocket} = gen_tcp:listen(Port, ListenOpts),{ok, ListenPort} = inet:port(ListenSocket),% Capture the correct process ID for the transport serverTransportPid = self(),io:format("[Transport] Starting server on port ~p (pid: ~p)~n", [ListenPort, TransportPid]),% Start accepting connections in a separate processspawn_link(fun() -> accept_loop(ListenSocket, TransportPid) end),% Initialize state{ok, #state{listen_socket = ListenSocket,listen_port = ListenPort,key_pair = KeyPair,event_handler = Handler,socket_to_peer = #{},peer_to_socket = #{}}}end.handle_call({register_handler, HandlerPid}, _From, State) ->io:format("[Transport] Registering handler: ~p~n", [HandlerPid]),{reply, ok, State#state{event_handler = HandlerPid}};handle_call({dial, Host, Port}, _From, State) ->% For "unnamed" literal strings, use loopback address insteadActualHost =case Host of"unnamed" -> {127, 0, 0, 1};_ -> Hostend,io:format("[Transport] Attempting to connect to ~p:~p~n", [ActualHost, Port]),spawn_link(fun() -> do_dial(ActualHost, Port, self()) end),{reply, ok, State};handle_call({send, Peer, Binary}, _From, State = #state{peer_to_socket = PtoS}) ->% Find socket by peer address directlycase maps:get(Peer, PtoS, undefined) ofundefined ->io:format("[Transport] Cannot send to unknown peer: ~p~n", [Peer]),{reply, {error, unknown_peer}, State};Socket ->io:format("[Transport] Sending ~p bytes to ~p via socket ~p~n",[byte_size(Binary), Peer, Socket]),Result = gen_tcp:send(Socket, Binary),{reply, Result, State}end;handle_call(stop, _From, State) ->{stop, normal, ok, State};handle_call({listener_port}, _From, State = #state{listen_port = Port}) ->{reply, {ok, Port}, State};handle_call({get_address}, _From, State = #state{listen_port = Port}) ->% Return IP address as tuple for direct connection{reply, {ok, {{127, 0, 0, 1}, Port}}, State};handle_call(Request, _From, State) ->io:format("Unhandled call: ~p~n", [Request]),{reply, {error, unknown_call}, State}.handle_cast(Msg, State) ->io:format("Unhandled cast: ~p~n", [Msg]),{noreply, State}.%% Handle new connection from dial operationhandle_info({new_connection, Socket, PeerAddr},State = #state{event_handler = Handler, socket_to_peer = StoP, peer_to_socket = PtoS}) ->io:format("[Transport] New connection established with ~p~n", [PeerAddr]),% Store mapping of Socket -> PeerAddr and PeerAddr -> SocketNewStoP = maps:put(Socket, PeerAddr, StoP),NewPtoS = maps:put(PeerAddr, Socket, PtoS),% Start a socket loop to handle incoming dataspawn_link(fun() -> socket_loop(Socket, PeerAddr, self()) end),% Notify the peer handler of the new peerHandler ! {peerNew, PeerAddr},{noreply, State#state{socket_to_peer = NewStoP, peer_to_socket = NewPtoS}};%% Handle incoming message from a peerhandle_info({peer_data, Socket, PeerAddr, Data}, State = #state{event_handler = Handler}) ->io:format("[Transport] Received ~p bytes from ~p~n", [byte_size(Data), PeerAddr]),% Forward data to handlerHandler ! {peerData, PeerAddr, Data},{noreply, State};%% Handle peer disconnectionhandle_info({peer_closed, Socket, PeerAddr},State = #state{event_handler = Handler,socket_to_peer = StoP,peer_to_socket = PtoS}) ->io:format("[Transport] Connection closed by peer ~p~n", [PeerAddr]),% Clean up mappingsNewStoP = maps:remove(Socket, StoP),NewPtoS = maps:remove(PeerAddr, PtoS),% Notify handler of lost peerifPeerAddr =/= undefined -> Handler ! {peerLost, PeerAddr};true -> okend,{noreply, State#state{socket_to_peer = NewStoP, peer_to_socket = NewPtoS}};handle_info(Info, State) ->io:format("Unhandled info: ~p~n", [Info]),{noreply, State}.code_change(_OldVsn, State, _Extra) ->{ok, State}.terminate(_Reason, #state{listen_socket = ListenSocket}) ->gen_tcp:close(ListenSocket),ok.%% Private functions%% Accept loop for incoming connectionsaccept_loop(ListenSocket, TransportPid) ->io:format("[Transport] Waiting for connections on ~p (transport pid: ~p)~n",[ListenSocket, TransportPid]),case gen_tcp:accept(ListenSocket) of{ok, Socket} ->io:format("[Transport] Accepted new connection: ~p~n", [Socket]),{ok, {PeerIP, PeerPort}} = inet:peername(Socket),PeerAddr = {PeerIP, PeerPort},TransportPid ! {new_connection, Socket, PeerAddr},accept_loop(ListenSocket, TransportPid);{error, Reason} ->io:format("[Transport] Accept error: ~p~n", [Reason]),timer:sleep(1000),accept_loop(ListenSocket, TransportPid)end.%% Socket receive loopsocket_loop(Socket, PeerAddr, TransportPid) ->case gen_tcp:recv(Socket, 0) of{ok, Data} ->TransportPid ! {peer_data, Socket, PeerAddr, Data},socket_loop(Socket, PeerAddr, TransportPid);{error, closed} ->io:format("[Transport] Socket ~p closed by peer~n", [Socket]),TransportPid ! {peer_closed, Socket, PeerAddr};{error, Reason} ->io:format("[Transport] Socket error: ~p~n", [Reason]),TransportPid ! {peer_closed, Socket, PeerAddr}end.%% Establish outgoing connectiondo_dial(Host, Port, TransportPid) ->case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}]) of{ok, Socket} ->io:format("[Transport] Connected to ~p:~p~n", [Host, Port]),{ok, {PeerIP, PeerPort}} = inet:peername(Socket),PeerAddr = {PeerIP, PeerPort},TransportPid ! {new_connection, Socket, PeerAddr},% Don't exit - keep running to maintain the socket connectionsocket_loop(Socket, PeerAddr, TransportPid),ok;{error, Reason} ->io:format("[Transport] Connection error to ~p:~p: ~p~n", [Host, Port, Reason])end.
-record(state, {db, keyPair, peers = #{}, channels = #{}, activeOut = #{}, activeIn = #{}}).
-record(state, {% SQLite DB handledb,% Node's keypair for encryptionkeyPair,% PID of the transport servertransportPid,% Map of active peerspeers = #{},% Map of channels and their statechannels = #{},% Map of active outgoing requestsactiveOut = #{},% Map of active incoming requestsactiveIn = #{}}).
%% open network socket[Host, WantPort] = proplists:get_value(listener, Args),{ok, LisAddr} = inet:getaddr(Host, inet),LisOpts = [binary,{packet, 0},{active, false},{reuseaddr, true},{ifaddr, LisAddr}],{ok, Lis} = gen_tcp:listen(WantPort, LisOpts),{ok, Port} = inet:port(Lis),io:format("Listening on ~p:~p~n", [LisAddr, Port]),
%% start event loop and incoming socket handler%% TODO: supervise these processesHandler = spawn(fun() -> event_loop(State) end),io:format("[DEBUG] Event Loop: ~p~n", [Handler]),ListenProc = spawn(fun() -> listen_loop(Handler, Lis) end),io:format("[DEBUG] Socket listener: ~p~n", [ListenProc]),
%% Create the event handler firstHandler = spawn(fun() -> event_loop(InitialState) end),io:format("[Peer] Event Loop: ~p~n", [Handler]),%% open network socket via transport and pass the handler[Host, WantPort] = proplists:get_value(listener, Args),{ok, TransportPid} = transport:start_link([{listen_addr, {Host, WantPort}},{event_handler, Handler}]),%% Update the event loop state with the transport PIDHandler ! {updateTransportPid, TransportPid},%% Get the listening address{ok, {LisAddr, Port}} = transport:get_address(TransportPid),io:format("[Peer] Listening on ~p:~p~n", [LisAddr, Port]),
case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}]) of{error, Reason} ->io:format("[Peer] Unable to connect to ~p:~p - ~p~n", [Host, Port, Reason]);{ok, Sock} ->io:format("[Peer] Dialed ~p:~p as ~p~n", [Host, Port, Sock]),EL = proplists:get_value(eventLoop, State),EL ! {peerNew, Sock}end,
TPid = proplists:get_value(transport, State),transport:dial(TPid, Host, Port),
listen_loop(Handler, Lis) ->case gen_tcp:accept(Lis, infinity) of{error, closed} ->%% io:format("[Peer] Incoming listener closed"),ok;{ok, Sock} ->{ok, {RemoteIp, Port}} = inet:peername(Sock),io:format("[Peer] Incoming Connection ~p:~p (~p)~n", [RemoteIp, Port, Sock]),Handler ! {peerNew, Sock},listen_loop(Handler, Lis)end.decode_loop(Handler, Conn) ->%% read incoming messagescase gen_tcp:recv(Conn, 0) of{ok, Data} ->%% TODO: figure out how to do this without copying Data{ok, Msgs} = wire:split_messages(Data),EachMsg = fun(Chunk) ->{ok, Msg} = wire:decode(Chunk),%% TODO: this ommits the byte(s) for the length prefixMsgSize = byte_size(Chunk),Handler ! {incomingMsg, Conn, Msg, MsgSize}end,lists:foreach(EachMsg, Msgs),decode_loop(Handler, Conn);{error, closed} ->Handler ! {peerLost, Conn}end.
handle_peer_messages(peerData,{PeerAddr, Data},State) ->{ok, Msgs} = wire:split_messages(Data),EachMsg = fun(Chunk) ->{ok, Msg} = wire:decode(Chunk),%% TODO: this omits the byte(s) for the length prefixMsgSize = byte_size(Chunk),self() ! {incomingMsg, PeerAddr, Msg, MsgSize}end,lists:foreach(EachMsg, Msgs),State;
%% handle disconnects by removing active requests{#{lastSeenAt := LastSeen,bytesTransmitted := {Sent, Received}},NewPeers} = maps:take(Peer, Peers),io:format("[Peer:~p] lost | LastSeen: ~p | Sent: ~p | Received: ~p~n",[Peer, LastSeen, human_bytesize(Sent), human_bytesize(Received)]),%% fetch all request IDs for this peerFilterReqs = fun(_Chan, ChanReqs, MatchedReqs) ->MatchRequests = fun(Req, Matched) ->case Req of{_, ReqId, Peer} ->maps:put(ReqId, true, Matched);% other requests_Other ->Matchedendend,NewMatched = lists:foldl(MatchRequests, MatchedReqs, ChanReqs),NewMatchedend,PeersRequestIds = maps:fold(FilterReqs, #{}, Chans),%% create new versions of active in and out without thoseRmActives = fun(ReqId, _) -> not maps:is_key(ReqId, PeersRequestIds) end,NewIn = maps:filter(RmActives, ActiveIn),NewOut = maps:filter(RmActives, ActiveOut),%% now mutate all channelsRmRequests = fun(ChanName, AccChans) ->{ChanReqs, NewChanAcc} = maps:take(ChanName, AccChans),FilterReqsOfPeer = fun(Req) ->case Req of{Direction, ReqId, Peer} ->io:format("[DEBUG Peer:~p] dropping ~p request ~p~n",[Peer, Direction, hex:bin_to_hexstr(ReqId)]),false;% other requests_Other ->trueendend,NewReqs = lists:filter(FilterReqsOfPeer, ChanReqs),maps:put(ChanName, NewReqs, NewChanAcc)end,NewChans = lists:foldl(RmRequests, Chans, maps:keys(Chans)),State#state{peers = NewPeers,channels = NewChans,activeOut = NewOut,activeIn = NewIn};
case maps:is_key(Peer, Peers) offalse ->io:format("[Peer] Lost connection to unknown peer: ~p~n", [Peer]),State;true ->io:format("Peer:~p] lost connection~n", [Peer]),%% handle disconnects by removing active requests{#{lastSeenAt := LastSeen,bytesTransmitted := {Sent, Received}},NewPeers} = maps:take(Peer, Peers),io:format("[Peer:~p] LastSeen: ~p | Sent: ~p | Received: ~p~n",[Peer, LastSeen, human_bytesize(Sent), human_bytesize(Received)]),State#state{peers = NewPeers}end;
{ok, {StateReqId, StateMsg, StateSize}} = send_channel_state_request(Peer, Chan, true),
io:format("[Peer] Processing channel ~p for new peer~n", [Chan]),{ok, {StateReqId, StateMsg, StateSize}} = send_channel_state_request(State, Peer, Chan, true),
io:format("[Peer] Tracking new connection ~p ~n", [Peer]),%Handler = proplists:get_value(eventLoop, State),Handler = self(),spawn(fun() -> decode_loop(Handler, Peer) end),
io:format("[Peer] Tracking new connection ~p with metadata ~p~n", [Peer, PeerMetadata]),
send_cancel_request(Peer, CancelId) ->%%io:format("[DEBUG] sending cancel for request ~p to ~p~n"%% , [hex:bin_to_hexstr(CancelId), Peer]),
send_cancel_request(State, Peer, CancelId) ->io:format("[Peer] sending cancel for request ~p to ~p~n",[hex:bin_to_hexstr(CancelId), Peer]),
send_binary_to_peer(Peer, Binary) ->%% io:format("[DEBUG] sending ~p to peer ~p~n", [Binary, Peer]),
send_binary_to_peer(State, Peer, Binary) ->io:format("[Peer] Sending ~p bytes to to peer ~p~n", [byte_size(Binary), Peer]),