end a binary message to a peersend(Pid, Peer, Binary) ->gen_server:call(Pid, {send, Peer, Binary}).
%% Send a binary message to a peer via a specific connection%% ConnPid is the connection process ID (unique per connection)send(Pid, ConnPid, Binary) ->gen_server:call(Pid, {send, ConnPid, Binary}).
handle_call({send, Peer, Binary}, _From, State = #state{peer_to_conn = PtoC}) ->% Find connection pid by peer address directlycase maps:get(Peer, PtoC, undefined) of
handle_call({send, ConnPid, Binary}, _From, State = #state{connections = Conns}) ->% Verify the connection existscase maps:get(ConnPid, Conns, undefined) of
io:format("[Transport] Cannot send to unknown peer: ~p~n", [Peer]),{reply, {error, unknown_peer}, State};ConnPid ->
io:format("[Transport] Cannot send to unknown connection: ~p~n", [ConnPid]),{reply, {error, unknown_connection}, State};ConnInfo ->PeerAddr = maps:get(addr, ConnInfo),
% Store mapping of ConnPid -> PeerAddr and PeerAddr -> ConnPidNewCtoP = maps:put(ConnPid, PeerAddr, CtoP),NewPtoC = maps:put(PeerAddr, ConnPid, PtoC),
% Store connection info using ConnPid as the unique keyConnInfo = #{addr => PeerAddr},NewConns = maps:put(ConnPid, ConnInfo, Conns),
handle_info({cable_transport, ConnPid, Data}, State = #state{event_handler = Handler, conn_to_peer = CtoP}) ->case maps:get(ConnPid, CtoP, undefined) of
handle_info({cable_transport, ConnPid, Data}, State = #state{event_handler = Handler, connections = Conns}) ->case maps:get(ConnPid, Conns, undefined) of
PeerAddr ->io:format("[Transport] Received ~p bytes from ~p~n", [byte_size(Data), PeerAddr]),% Forward data to handlerHandler ! {peerData, PeerAddr, Data},
ConnInfo ->PeerAddr = maps:get(addr, ConnInfo),io:format("[Transport] Received ~p bytes from ~p (conn: ~p)~n", [byte_size(Data), PeerAddr, ConnPid]),% Forward data to handler with ConnPid as the unique identifierHandler ! {peerData, ConnPid, Data},
PeerAddr ->io:format("[Transport] Connection closed with peer ~p~n", [PeerAddr]),% Clean up mappingsNewCtoP = maps:remove(ConnPid, CtoP),NewPtoC = maps:remove(PeerAddr, PtoC),
ConnInfo ->PeerAddr = maps:get(addr, ConnInfo),io:format("[Transport] Connection ~p closed with peer ~p~n", [ConnPid, PeerAddr]),% Clean up connectionNewConns = maps:remove(ConnPid, Conns),
{peerLost, Peer} ->NewState = handle_peer_messages(peerLost, {Peer}, State),
{peerLost, ConnPid, PeerAddr} ->NewState = handle_peer_messages(peerLost, {ConnPid, PeerAddr}, State),
"[Peer:~p] LastSeen: ~p | Sent: ~p | Received: ~p~n",[Peer, LastSeen, human_bytesize(Sent), human_bytesize(Received)]
"[Peer:~p->~p] LastSeen: ~p | Sent: ~p | Received: ~p~n",[ConnPid, PeerAddr, LastSeen, human_bytesize(Sent), human_bytesize(Received)]
%% Send a channel list request to the specified peercase send_channel_list_request(State, Peer, 0, 100) of
%% Send a channel list request to the specified connectioncase send_channel_list_request(State, ConnPid, 0, 100) of
io:format("[Peer] Sent channel list request (~p) to ~p~n", [hex:bin_to_hexstr(ReqId), Peer]),NewActiveOut = maps:put(ReqId, {Peer, channel_list_request}, ActiveOut),NewPeers = update_peer_sent(Peers, Peer, Size),
io:format("[Peer] Sent channel list request (~p) to conn ~p~n", [hex:bin_to_hexstr(ReqId), ConnPid]),NewActiveOut = maps:put(ReqId, {ConnPid, channel_list_request}, ActiveOut),NewPeers = update_peer_sent(Peers, ConnPid, Size),
{ok, {_, _, Size}} = send_hash_response(State, Peer, ReqId, [PostHash]),update_peer_sent(AccPeers, Peer, Size);
{ok, {_, _, Size}} = send_hash_response(State, ConnPid, ReqId, [PostHash]),update_peer_sent(AccPeers, ConnPid, Size);
EachReq = fun({_Direction, ReqId, Peer}, {AccActiveOuts, AccPeers}) ->{{Peer, _}, NewActiveOuts} = maps:take(ReqId, AccActiveOuts),
EachReq = fun({_Direction, ReqId, ConnPid}, {AccActiveOuts, AccPeers}) ->{{ConnPid, _}, NewActiveOuts} = maps:take(ReqId, AccActiveOuts),
{ok, {_, _, Size}} = send_hash_response(State, Peer, ReqId, [PostHash]),update_peer_sent(AccPeers, Peer, Size);
{ok, {_, _, Size}} = send_hash_response(State, ConnPid, ReqId, [PostHash]),update_peer_sent(AccPeers, ConnPid, Size);
send_hash_response(State, Peer, ReqId, [PostHash]),update_peer_sent(AccPeers, Peer, Size);
send_hash_response(State, ConnPid, ReqId, [PostHash]),update_peer_sent(AccPeers, ConnPid, Size);
{ok, {_ReqId, _Msg, Size}} = send_post_response(S, Peer, ReqId, KnownPosts),S#state{peers = update_peer_sent(Peers, Peer, Size)};
{ok, {_ReqId, _Msg, Size}} = send_post_response(S, ConnPid, ReqId, KnownPosts),S#state{peers = update_peer_sent(Peers, ConnPid, Size)};
"[Peer] sending post request(~p) for ~p hashes to ~p~n",[hex:bin_to_hexstr(ReqId), length(Hashes), Peer]
"[Peer] sending post request(~p) for ~p hashes to conn ~p~n",[hex:bin_to_hexstr(ReqId), length(Hashes), ConnPid]
"[Peer] sending post response(~p) for ~p posts to ~p~n",[hex:bin_to_hexstr(ReqId), length(Posts), Peer]
"[Peer] sending post response(~p) for ~p posts to conn ~p~n",[hex:bin_to_hexstr(ReqId), length(Posts), ConnPid]
"[Peer] sending hash response(~p) for ~p hashes to ~p~n",[hex:bin_to_hexstr(ReqId), length(Hashes), Peer]
"[Peer] sending hash response(~p) for ~p hashes to conn ~p~n",[hex:bin_to_hexstr(ReqId), length(Hashes), ConnPid]
send_binary_to_peer(State, Peer, Binary) ->io:format("[Peer] Sending ~p bytes to to peer ~p~n", [byte_size(Binary), Peer]),
send_binary_to_peer(State, ConnPid, Binary) ->io:format("[Peer] Sending ~p bytes to connection ~p~n", [byte_size(Binary), ConnPid]),
update_peer_sent(Peers, Peer, Size) when is_map(Peers), is_integer(Size) ->PeerMeta = maps:get(Peer, Peers),
update_peer_sent(Peers, ConnPid, Size) when is_map(Peers), is_integer(Size) ->PeerMeta = maps:get(ConnPid, Peers),
update_peer_received(Peers, Peer, Size) when is_map(Peers), is_integer(Size) ->case maps:take(Peer, Peers) of
update_peer_received(Peers, ConnPid, Size) when is_map(Peers), is_integer(Size) ->case maps:take(ConnPid, Peers) of
%% Handle TCP connection closure%% Note: Socket here is a TCP port handle (e.g., #Port<0.15>), not a {Host, Port} tuplehandle_info({tcp_closed, Socket}, State = #state{mode = connection, socket = StateSocket}) ->case Socket =:= StateSocket oftrue ->io:format(user, "[enoise_cable connection ~p] TCP connection closed by peer~n", [self()]),{stop, normal, State};false ->io:format(user, "[enoise_cable connection ~p] Received tcp_closed for unknown socket~n", [self()]),{noreply, State}end;%% Handle TCP errorshandle_info({tcp_error, Socket, Reason}, State = #state{mode = connection, socket = StateSocket}) ->case Socket =:= StateSocket oftrue ->io:format(user, "[enoise_cable connection ~p] TCP error: ~p~n", [self(), Reason]),{stop, {tcp_error, Reason}, State};false ->io:format(user, "[enoise_cable connection ~p] Received tcp_error for unknown socket~n", [self()]),{noreply, State}end;