a cabal implementation in erlang
-module(cabal).
-behavior(gen_server).
-export([start_link/1, stop/1]).
-export([node_addr/1, node_public_key/1, dial/2, dial/3, peer_list/1]).
-export([channels_joined/1, channels_known/1, channel_members/2, join/2, leave/2]).
-export([set_topic/3, set_nick/2, read/2, write/3]).
-export([request_channel_list/2, peer_channel_list/2]).
-export([fetch_history/3, fetch_history/4, unfetched_count/2, await_writes/3]).
-export([persistent_peer_add/2, persistent_peer_remove/2, persistent_peer_list/1]).
-export([register_event_handler/3, unregister_event_handler/2]).
%% Supervisor helper functions
-export([get_db/1, update_peer_events_pid/2]).
%% Caberl_sup helper functions
-export([default_caberl_location/0, create_or_load_transport_keypair/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).

%% Message handler modules
-export([handle_peer_messages/3, handle_channel_messages/3, handle_database_messages/3]).
-export([handle_network_messages/3, handle_system_messages/3]).

%% start/stop server
start_link(Args) ->
    gen_server:start_link(cabal, Args, []).

stop(Pid) ->
    gen_server:cast(Pid, stop).

%% Supervisor helper functions
get_db(Pid) ->
    gen_server:call(Pid, {get_db}).

update_peer_events_pid(Pid, PeerEventsPid) ->
    gen_server:call(Pid, {update_peer_events_pid, PeerEventsPid}).

%% TODO: calls:
%% - query remote channels

%% cabal_db: SqliteHandle
%%
%% #channels{
%%   #foo => [
%%     {sent, ReqId, PeerX},
%%     {received, ReqId, PeerX},
%%   ]
%% }
%%
%% #peers{
%%  PeerX => #{
%%    lastSeenAt => UnixTs,
%%    bytesTransmitted => {Sent, Received},
%%    %% TODO:
%%    address => "so.me:port",
%%    pubKey => ?
%%  }
%% }
%%
%%
%% helpers to not skim all channels to handle messages
%% #activeIn{ ReqId => {PeerX, Msg} }
%% #activeOut{ ReqId => {PeerX, Msg} }
-record(state, {
    % SQLite DB handle
    db,
    % Node's keypair for encryption
    keyPair,
    % PID of the transport server
    transportPid,
    % PID of the transport supervisor (for recovery)
    transportSup,
    % PID of the cabal_peer_events server (owns timers and event handlers)
    peerEventsPid,
    % Listen address
    listenAddr,
    % Map of active peers
    peers = #{},
    % Map of channels and their state
    channels = #{},
    % Map of active outgoing requests
    activeOut = #{},
    % Map of active incoming requests
    activeIn = #{},
    % Map of received channel lists from peers: #{PeerAddr => [Channels]}
    peerChannelLists = #{}
}).

dial(Pid, {Host, Port}) ->
    gen_server:cast(Pid, {peerDial, {Host, Port}});
dial(Pid, Addr) when is_list(Addr) ->
    [HostStr, PortStr] = string:split(Addr, ":"),
    {ok, Host} = inet:getaddr(HostStr, inet),
    Port = list_to_integer(PortStr),
    gen_server:cast(Pid, {peerDial, {Host, Port}}).

%% Synchronous dial with await_sync option
dial(Pid, Addr, #{await_sync := true, timeout := Timeout}) ->
    %% Start the dial
    case Addr of
        {Host, Port} ->
            gen_server:cast(Pid, {peerDial, {Host, Port}});
        AddrStr when is_list(AddrStr) ->
            [HostStr, PortStr] = string:split(AddrStr, ":"),
            {ok, Host} = inet:getaddr(HostStr, inet),
            Port = list_to_integer(PortStr),
            gen_server:cast(Pid, {peerDial, {Host, Port}})
    end,
    %% Poll until we have an active connection
    poll_for_connection(Pid, Timeout);
dial(Pid, Addr, Opts) ->
    %% If await_sync not specified, fall back to async
    case maps:get(await_sync, Opts, false) of
        false -> dial(Pid, Addr);
        true -> dial(Pid, Addr, Opts#{timeout => maps:get(timeout, Opts, 5000)})
    end.

node_addr(Pid) ->
    gen_server:call(Pid, {nodeAddr}).

node_public_key(Pid) ->
    gen_server:call(Pid, {nodePubKey}).

peer_list(Pid) ->
    gen_server:call(Pid, {peerList}).

request_channel_list(Pid, Peer) ->
    gen_server:cast(Pid, {requestChannelList, Peer}).

peer_channel_list(Pid, Peer) ->
    gen_server:call(Pid, {peerChannelList, Peer}).

%% Lazy loading: fetch unfetched history for a channel
fetch_history(Pid, Channel, Limit) ->
    gen_server:call(Pid, {fetchHistory, Channel, Limit}).

%% Synchronous fetch_history that blocks until complete
fetch_history(Pid, Channel, Limit, #{sync := true, timeout := Timeout}) ->
    StartTime = erlang:monotonic_time(millisecond),
    {ok, InitialUnfetched} = unfetched_count(Pid, Channel),
    {ok, RequestedCount} = fetch_history(Pid, Channel, Limit),

    %% Poll until the unfetched count decreases by RequestedCount
    poll_for_fetch_completion(Pid, Channel, InitialUnfetched, RequestedCount, StartTime, Timeout);
fetch_history(Pid, Channel, Limit, _Opts) ->
    fetch_history(Pid, Channel, Limit).

%% Query how many unfetched posts exist for a channel
unfetched_count(Pid, Channel) ->
    gen_server:call(Pid, {unfetchedCount, Channel}).

join(Pid, Chan) when is_list(Chan) ->
    gen_server:cast(Pid, {channelsJoin, Chan}).

leave(Pid, Chan) when is_list(Chan) ->
    gen_server:cast(Pid, {channelsLeave, Chan}).

channels_joined(Pid) ->
    gen_server:call(Pid, {channelsList}).

channels_known(Pid) ->
    gen_server:call(Pid, {channelsKnown}).

channel_members(Pid, Chan) ->
    gen_server:call(Pid, {channelsMembers, Chan}).

set_topic(Pid, Chan, Topic) ->
    gen_server:call(Pid, {channelsSetTopic, Chan, Topic}).

set_nick(Pid, Nick) ->
    gen_server:call(Pid, {setOwnNick, Nick}).

read(Pid, Chan) ->
    gen_server:call(Pid, {readTextsFromChannel, Chan}).

write(Pid, Chan, Text) ->
    gen_server:call(Pid, {writeTextToChannel, Chan, Text}).

%% Block until all writes to a channel have been processed and propagated
await_writes(Pid, Channel, Timeout) ->
    %% Get initial message count
    {ok, {InitialTexts, _}} = read(Pid, Channel),
    InitialCount = length(InitialTexts),

    %% Do a sync call to ensure all pending writes are processed
    {ok, _} = channels_joined(Pid),

    %% Now wait for message count to stabilize (no new messages for a period)
    StartTime = erlang:monotonic_time(millisecond),
    poll_for_writes_complete(Pid, Channel, InitialCount, StartTime, Timeout).

%% manage persistent peer connections
persistent_peer_add(Pid, Address) ->
    gen_server:call(Pid, {persistentPeerAdd, Address}).

persistent_peer_remove(Pid, Address) ->
    gen_server:call(Pid, {persistentPeerRemove, Address}).

persistent_peer_list(Pid) ->
    gen_server:call(Pid, {persistentPeerList}).

%% register for channel message event notifications
%% IntervalMs must be >= 100
register_event_handler(Pid, HandlerPid, IntervalMs) when IntervalMs >= 100 ->
    gen_server:call(Pid, {registerEventHandler, HandlerPid, IntervalMs}).

unregister_event_handler(Pid, HandlerPid) ->
    gen_server:call(Pid, {unregisterEventHandler, HandlerPid}).

%%%%%%%%%%%%%%%%
%% gen_server %%
%%%%%%%%%%%%%%%%

init(Args) ->
    process_flag(trap_exit, true),

    %% open persistence layer
    StorePath = proplists:get_value(storage, Args),
    Kp = create_or_load_keypair(StorePath),
    {ok, Db} = cabal_db:open(StorePath),

    %% restore active channels from db
    {ok, Chans} = cabal_db:channels_list(Db),

    %% provided by peer_sup
    TransportSup = proplists:get_value(transport_sup, Args),
    {ok, TPid} = cabal_transport_sup:get_transport_pid(TransportSup),
    erlang:monitor(process, TPid),
    ok = cabal_transport:register_handler(TPid, self()),

    %% Get the listening address from transport
    {ok, {LisAddr, Port}} = cabal_transport:get_address(TPid),
    io:format("[Peer] Listening on ~p:~p~n", [LisAddr, Port]),

    %% cabal_peer_events will be started by peer_sup and connected via update_peer_events_pid
    io:format("[Peer] Started, transport=~p (cabal_peer_events pending)~n", [TPid]),

    State = #state{
        db = Db,
        keyPair = Kp,
        transportPid = TPid,
        transportSup = TransportSup,
        % set by peer_sup
        peerEventsPid = undefined,
        listenAddr = {LisAddr, Port},
        channels = maps:from_keys(Chans, [])
    },
    {ok, State}.

%% Database calls
handle_call({readTextsFromChannel, Chan}, _From, State = #state{db = Db}) ->
    {ok, Texts} = cabal_db:get_texts_for_channel(Db, Chan),
    {reply, {ok, Texts}, State};
handle_call({writeTextToChannel, Chan, Text}, _From, State) ->
    NewState = handle_database_messages(writeTextToChannel, {Chan, Text}, State),
    {reply, ok, NewState};
%% System calls
handle_call({nodeAddr}, _From, State = #state{listenAddr = LisAddr}) ->
    {reply, {ok, LisAddr}, State};
handle_call({nodePubKey}, _From, State = #state{keyPair = KeyPair}) ->
    PubKey = maps:get(public, KeyPair),
    {reply, {ok, PubKey}, State};
handle_call({peerList}, _From, State = #state{peers = Peers}) ->
    EachPeer = fun(PeerId, PeerMeta, Acc) ->
        Acc ++ [{PeerId, PeerMeta}]
    end,
    PeerList = maps:fold(EachPeer, [], Peers),
    {reply, PeerList, State};
%% Channel calls
handle_call({channelsList}, _From, State = #state{channels = Chans}) ->
    {reply, {ok, maps:keys(Chans)}, State};
handle_call({channelsKnown}, _From, State = #state{db = Db}) ->
    Result = cabal_db:channels_known(Db),
    {reply, Result, State};
handle_call({channelsMembers, Chan}, _From, State = #state{channels = Chans, db = Db}) ->
    case maps:is_key(Chan, Chans) of
        false ->
            {reply, {error, notInChannel}, State};
        true ->
            Result = cabal_db:get_channel_members(Db, Chan),
            {reply, Result, State}
    end;
handle_call({channelsSetTopic, Chan, Topic}, _From, State = #state{channels = Chans}) ->
    case maps:is_key(Chan, Chans) of
        false ->
            {reply, {error, notInChannel}, State};
        true ->
            NewState = handle_channel_messages(channelsSetTopic, {Chan, Topic}, State),
            {reply, ok, NewState}
    end;
handle_call({setOwnNick, Nick}, _From, State) ->
    NewState = handle_channel_messages(setOwnNick, {Nick}, State),
    {reply, ok, NewState};
%% Peer channel list
handle_call({peerChannelList, Peer}, _From, State = #state{peerChannelLists = Lists}) ->
    Result =
        case maps:get(Peer, Lists, undefined) of
            undefined -> {ok, []};
            Channels -> {ok, Channels}
        end,
    {reply, Result, State};
%% History fetching
handle_call({fetchHistory, Channel, Limit}, _From, State) ->
    {Reply, NewState} = handle_system_messages(fetchHistory, {Channel, Limit}, State),
    {reply, Reply, NewState};
handle_call({unfetchedCount, Channel}, _From, State = #state{db = Db}) ->
    Result = cabal_db:unfetched_count(Db, Channel),
    {reply, Result, State};
%% Persistent peer management
handle_call({persistentPeerAdd, Address}, _From, State = #state{db = Db}) ->
    Result = cabal_db:peer_add(Db, Address),
    {reply, Result, State};
handle_call({persistentPeerRemove, Address}, _From, State = #state{db = Db}) ->
    Result = cabal_db:peer_delete(Db, Address),
    {reply, Result, State};
handle_call({persistentPeerList}, _From, State = #state{db = Db}) ->
    Result = cabal_db:peer_list(Db),
    {reply, Result, State};
%% Event handler management - forward to cabal_peer_events
handle_call(
    {registerEventHandler, HandlerPid, IntervalMs},
    _From,
    State = #state{peerEventsPid = PeerEventsPid}
) ->
    Result = cabal_peer_events:register_event_handler(PeerEventsPid, HandlerPid, IntervalMs),
    {reply, Result, State};
handle_call(
    {unregisterEventHandler, HandlerPid}, _From, State = #state{peerEventsPid = PeerEventsPid}
) ->
    Result = cabal_peer_events:unregister_event_handler(PeerEventsPid, HandlerPid),
    {reply, Result, State};
%% Internal call for cabal_peer_events to get peer list
handle_call({get_peers}, _From, State = #state{peers = Peers}) ->
    {reply, {ok, Peers}, State};
%% Supervisor helper calls
handle_call({get_db}, _From, State = #state{db = Db}) ->
    {reply, {ok, Db}, State};
handle_call({update_peer_events_pid, PeerEventsPid}, _From, State) ->
    io:format("[Peer] Updated cabal_peer_events PID to ~p~n", [PeerEventsPid]),
    link(PeerEventsPid),
    {reply, ok, State#state{peerEventsPid = PeerEventsPid}}.

handle_cast({peerDial, {Host, Port}}, State = #state{transportPid = TPid}) ->
    cabal_transport:dial(TPid, Host, Port),
    {noreply, State};
handle_cast({requestChannelList, Peer}, State) ->
    NewState = handle_peer_messages(requestChannelList, {Peer}, State),
    {noreply, NewState};
handle_cast({channelsJoin, Chan}, State) ->
    io:format("[Chan] joining ~p~n", [Chan]),
    NewState = handle_channel_messages(channelsJoin, {Chan}, State),
    {noreply, NewState};
handle_cast({channelsLeave, Chan}, State) ->
    io:format("[Chan] leaving ~p~n", [Chan]),
    NewState = handle_channel_messages(channelsLeave, {Chan}, State),
    {noreply, NewState};
handle_cast(stop, State = #state{db = Db}) ->
    ok = cabal_db:close(Db),
    {stop, normal, State};
handle_cast(Msg, State) ->
    io:format("Unexpected cast: ~p~n", [Msg]),
    {noreply, State}.

%% Transport event messages
handle_info({peerNew, ConnPid, PeerAddr}, State) ->
    NewState = handle_peer_messages(peerNew, {ConnPid, PeerAddr}, State),
    {noreply, NewState};
handle_info({peerLost, ConnPid, PeerAddr}, State) ->
    NewState = handle_peer_messages(peerLost, {ConnPid, PeerAddr}, State),
    {noreply, NewState};
handle_info({peerData, ConnPid, Data}, State) ->
    NewState = handle_peer_messages(peerData, {ConnPid, Data}, State),
    {noreply, NewState};
%% Network messages (decoded from peerData)
handle_info({incomingMsg, Peer, Msg, MsgSize}, State) ->
    NewState = handle_network_messages(incomingMsg, {Peer, Msg, MsgSize}, State),
    {noreply, NewState};
%% EXIT signals
handle_info(
    {'EXIT', Pid, Reason},
    State = #state{db = Db, peerEventsPid = PeerEventsPid}
) ->
    case Pid of
        Db ->
            io:format("[Peer] Database process died: ~p~n", [Reason]),
            {stop, {database_died, Reason}, State};
        _ when Pid =:= PeerEventsPid andalso PeerEventsPid =/= undefined ->
            case Reason of
                shutdown ->
                    {noreply, State};
                normal ->
                    {noreply, State};
                _ ->
                    io:format("[Peer] Peer events process died: ~p~n", [Reason]),
                    {stop, {cabal_peer_events_died, Reason}, State}
            end;
        _ when Pid =:= State#state.transportPid ->
            io:format("[Peer] Transport process died: ~p. Attempting recovery...~n", [Reason]),
            %% Transport died, query supervisor for new PID
            TransportSup = State#state.transportSup,
            %% Wait a moment for supervisor to restart it
            timer:sleep(200),
            case cabal_transport_sup:get_transport_pid(TransportSup) of
                {ok, NewTPid} ->
                    io:format("[Peer] Reacquired transport PID: ~p~n", [NewTPid]),
                    erlang:monitor(process, NewTPid),
                    ok = cabal_transport:register_handler(NewTPid, self()),
                    {noreply, State#state{transportPid = NewTPid}};
                {error, _} ->
                    io:format("[Peer] Failed to reacquire transport PID! Stopping.~n"),
                    {stop, transport_died_permanently, State}
            end;
        _ ->
            io:format("[Peer] Linked process ~p died: ~p~n", [Pid, Reason]),
            {noreply, State}
    end;
handle_info(Msg, State) ->
    io:format("Unexpected message: ~p~n", [Msg]),
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    %% no change planned. The functionm is there for the behaviour,
    %% but will not be used.
    {ok, State}.

%%%%%%%%%%%%%
%% private %%
%%%%%%%%%%%%%

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Message Handler Functions - Modular Design %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% Handle system-level messages
handle_system_messages(nodePubKey, {From}, State = #state{keyPair = KeyPair}) ->
    PubKey = maps:get(public, KeyPair),
    gen_server:reply(From, {ok, PubKey}),
    State;
handle_system_messages(peerList, {From}, State = #state{peers = Peers}) ->
    EachPeer = fun(PeerId, PeerMeta, Acc) ->
        Acc ++ [{PeerId, PeerMeta}]
    end,
    PeerList = maps:fold(EachPeer, [], Peers),
    gen_server:reply(From, PeerList),
    State;
handle_system_messages(channelsList, {From}, State = #state{channels = Chans}) ->
    gen_server:reply(From, {ok, maps:keys(Chans)}),
    State;
handle_system_messages(channelsKnown, {From}, State = #state{db = Db}) ->
    Result = cabal_db:channels_known(Db),
    gen_server:reply(From, Result),
    State;
handle_system_messages(peerChannelList, {Peer, From}, State = #state{peerChannelLists = Lists}) ->
    Result =
        case maps:get(Peer, Lists, undefined) of
            undefined -> {ok, []};
            Channels -> {ok, Channels}
        end,
    gen_server:reply(From, Result),
    State;
handle_system_messages(
    fetchHistory, {Channel, Limit}, State = #state{db = Db, peers = Peers}
) ->
    %% Get unfetched hashes from database
    {ok, UnfetchedHashes} = cabal_db:unfetched_get(Db, Channel, Limit),
    case UnfetchedHashes of
        [] ->
            {{ok, 0}, State};
        _ ->
            %% Pick a random connected peer to request from
            PeerPids = maps:keys(Peers),
            case PeerPids of
                [] ->
                    {{error, no_connected_peers}, State};
                _ ->
                    ConnPid = lists:nth(rand:uniform(length(PeerPids)), PeerPids),
                    {ok, {PostReqId, _PostReqMsg, SentSize}} = send_post_request(
                        State, ConnPid, UnfetchedHashes
                    ),
                    ActiveOut = State#state.activeOut,
                    UpdatedOut = maps:put(PostReqId, {ConnPid, post_request}, ActiveOut),
                    NewState = State#state{
                        activeOut = UpdatedOut,
                        peers = update_peer_sent(Peers, ConnPid, SentSize)
                    },
                    {{ok, length(UnfetchedHashes)}, NewState}
            end
    end;
handle_system_messages(unfetchedCount, {Channel, From}, State = #state{db = Db}) ->
    Result = cabal_db:unfetched_count(Db, Channel),
    gen_server:reply(From, Result),
    State;
handle_system_messages(stateChange, {Chan}, State) ->
    io:format("[DEBUG] state change of ~p~n", [Chan]),
    State;
handle_system_messages(Message, Args, State) ->
    io:format("Unhandled system message: ~p with args ~p~n", [Message, Args]),
    State.

%% Handle peer-related messages
handle_peer_messages(
    peerData,
    {ConnPid, Data},
    State
) ->
    {ok, Msgs} = cabal_wire:split_messages(Data),
    EachMsg = fun(Chunk) ->
        {ok, Msg} = cabal_wire:decode(Chunk),
        %% TODO: this omits the byte(s) for the length prefix
        MsgSize = byte_size(Chunk),
        self() ! {incomingMsg, ConnPid, Msg, MsgSize}
    end,
    lists:foreach(EachMsg, Msgs),
    State;
handle_peer_messages(
    peerLost,
    {ConnPid, PeerAddr},
    State = #state{peers = Peers, activeIn = ActiveIn, activeOut = ActiveOut, channels = Chans}
) ->
    %% Normalize address to string format for consistency with database
    NormalizedAddr = format_address(PeerAddr),

    case maps:is_key(ConnPid, Peers) of
        false ->
            io:format("[Peer] Lost connection ~p to unknown peer: ~p~n", [ConnPid, NormalizedAddr]),
            State;
        true ->
            io:format("[Peer:~p] lost connection to ~p~n", [ConnPid, NormalizedAddr]),
            %% handle disconnects by removing active requests
            {
                #{
                    lastSeenAt := LastSeen,
                    bytesTransmitted := {Sent, Received}
                },
                NewPeers
            } = maps:take(ConnPid, Peers),
            io:format(
                "[Peer:~p->~p] LastSeen: ~p | Sent: ~p | Received: ~p~n",
                [ConnPid, NormalizedAddr, LastSeen, human_bytesize(Sent), human_bytesize(Received)]
            ),

            %% Clean up activeOut: remove all requests associated with this peer
            NewActiveOut = maps:filter(
                fun(_ReqId, {ReqConnPid, _Msg}) ->
                    ReqConnPid =/= ConnPid
                end,
                ActiveOut
            ),
            RemovedOutCount = maps:size(ActiveOut) - maps:size(NewActiveOut),

            %% Clean up activeIn: remove all requests associated with this peer
            NewActiveIn = maps:filter(
                fun(_ReqId, {ReqConnPid, _Msg}) ->
                    ReqConnPid =/= ConnPid
                end,
                ActiveIn
            ),
            RemovedInCount = maps:size(ActiveIn) - maps:size(NewActiveIn),

            %% Clean up channels: remove all request entries for this peer
            CleanupChannelReqs = fun(_ChanName, ChanReqs) ->
                FilteredReqs = lists:filter(
                    fun({_Direction, _ReqId, ReqConnPid}) ->
                        ReqConnPid =/= ConnPid
                    end,
                    ChanReqs
                ),
                {true, FilteredReqs}
            end,
            NewChans = maps:filtermap(CleanupChannelReqs, Chans),

            io:format(
                "[Peer:~p] Cleaned up ~p outgoing and ~p incoming requests~n",
                [ConnPid, RemovedOutCount, RemovedInCount]
            ),

            %% Update persistent peer database if this peer is in the list
            Db = State#state.db,
            case cabal_db:peer_get(Db, NormalizedAddr) of
                {ok, PeerInfo} ->
                    %% Peer is in persistent list - decrease score
                    CurrentScore = maps:get(score, PeerInfo, 0),
                    Now = os:system_time(millisecond),
                    cabal_db:peer_update(Db, NormalizedAddr, [
                        {score, max(0, CurrentScore - 5)},
                        {last_seen, Now}
                    ]),
                    io:format("[Peer] Updated persistent peer ~s: score -5~n", [NormalizedAddr]);
                not_found ->
                    % Not a persistent peer, no action needed
                    ok
            end,

            State#state{
                peers = NewPeers,
                activeIn = NewActiveIn,
                activeOut = NewActiveOut,
                channels = NewChans
            }
    end;
handle_peer_messages(
    peerNew,
    {ConnPid, PeerAddr},
    State = #state{
        channels = Chans, peers = Peers, activeOut = ActiveOut
    }
) ->
    %% Normalize address to string format for consistency with database
    NormalizedAddr = format_address(PeerAddr),

    %% For each channel we are in, send our usual requests and update the state maps
    EachChan = fun(Chan, {AccNewReqs, AccChannels, PeerSent}) ->
        io:format("[Peer] Processing channel ~p for new connection ~p~n", [Chan, ConnPid]),
        {ok, {StateReqId, StateMsg, StateSize}} = send_channel_state_request(
            State, ConnPid, Chan, true
        ),
        End = 0,
        Start = os:system_time(1000) - (12 * 60 * 60 * 1000),
        %% Request up to 1000 messages (100 eager + 900 lazy)
        %% This limit will be used for automatic pagination
        Limit = 1000,
        {ok, {TimeReqId, TimeMsg, TimeSize}} = send_channel_time_range_request(
            State, ConnPid, Chan, Start, End, Limit
        ),
        %% Annotate the request with pagination metadata
        AnnotatedTimeMsg = [TimeMsg, {pagination_limit, Limit}],
        ReqsForChan =
            maps:get(Chan, AccChannels, []) ++
                [{sent, StateReqId, ConnPid}] ++
                [{sent, TimeReqId, ConnPid}],
        {
            AccNewReqs ++ [{StateReqId, StateMsg}, {TimeReqId, AnnotatedTimeMsg}],
            maps:update(Chan, ReqsForChan, AccChannels),
            PeerSent + StateSize + TimeSize
        }
    end,
    {NewRequests, NewChans, PeerSent} = lists:foldl(EachChan, {[], Chans, 0}, maps:keys(Chans)),

    io:format("[Peer] Created ~p new requests for ~p channels~n", [
        length(NewRequests), maps:size(Chans)
    ]),

    %% Update the helper map for active outgoing reqs
    NewOut = lists:foldl(
        fun({ReqId, Msg}, AccActiveOut) ->
            maps:put(ReqId, {ConnPid, Msg}, AccActiveOut)
        end,
        ActiveOut,
        NewRequests
    ),
    PeerMetadata = #{
        address => NormalizedAddr,
        lastSeenAt => os:system_time(1000),
        bytesTransmitted => {PeerSent, 0}
    },
    NewState = State#state{
        peers = maps:put(ConnPid, PeerMetadata, Peers),
        channels = NewChans,
        activeOut = NewOut
    },
    io:format("[Peer] Tracking new connection ~p to ~p with metadata ~p~n", [
        ConnPid, NormalizedAddr, PeerMetadata
    ]),

    %% Update persistent peer database if this peer is in the list
    Db = State#state.db,
    case cabal_db:peer_get(Db, NormalizedAddr) of
        {ok, PeerInfo} ->
            %% Peer is in persistent list - reset backoff and increase score
            CurrentScore = maps:get(score, PeerInfo, 0),
            Now = os:system_time(millisecond),
            cabal_db:peer_update(Db, NormalizedAddr, [
                {score, CurrentScore + 10},
                {last_seen, Now},
                {attempt_count, 0}
            ]),
            io:format("[Peer] Updated persistent peer ~s: score +10, reset backoff~n", [
                NormalizedAddr
            ]);
        not_found ->
            % Not a persistent peer, no action needed
            ok
    end,

    NewState;
handle_peer_messages(
    requestChannelList,
    {ConnPid},
    State = #state{activeOut = ActiveOut, peers = Peers}
) ->
    %% Send a channel list request to the specified connection
    case send_channel_list_request(State, ConnPid, 0, 100) of
        {ok, {ReqId, _Msg, Size}} ->
            io:format("[Peer] Sent channel list request (~p) to conn ~p~n", [
                hex:bin_to_hexstr(ReqId), ConnPid
            ]),
            NewActiveOut = maps:put(ReqId, {ConnPid, channel_list_request}, ActiveOut),
            NewPeers = update_peer_sent(Peers, ConnPid, Size),
            State#state{activeOut = NewActiveOut, peers = NewPeers};
        {error, Reason} ->
            io:format("[Peer] Failed to send channel list request to conn ~p: ~p~n", [
                ConnPid, Reason
            ]),
            State
    end;
handle_peer_messages(Message, Args, State) ->
    io:format("Unhandled peer message: ~p with args ~p~n", [Message, Args]),
    State.

%% Handle channel-related messages
handle_channel_messages(
    setOwnNick,
    {Nick},
    State = #state{db = Db, keyPair = KeyPair, activeIn = ActiveIn, peers = Peers}
) ->
    {ok, Links} = cabal_db:get_channel_heads(Db, null),
    Bin = cabal_posts:encode(KeyPair, Links, {info, {name, Nick}}),
    {ok, _, PostHash} = cabal_db:save_post(Db, Bin),
    %% sent out to (all?) open channel state requests
    F = fun(ReqId, {ConnPid, [Header, _]}, AccPeers) ->
        case proplists:get_value(type, Header) of
            5 ->
                case send_hash_response(State, ConnPid, ReqId, [PostHash]) of
                    {ok, {_, _, Size}} ->
                        update_peer_sent(AccPeers, ConnPid, Size);
                    {error, _Reason} ->
                        %% Send failed (peer disconnected), skip update
                        AccPeers
                end;
            _ ->
                AccPeers
        end
    end,
    SentPeers = maps:fold(F, Peers, ActiveIn),
    State#state{peers = SentPeers};
handle_channel_messages(
    channelsJoin,
    {Chan},
    State = #state{
        channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeOut = ActiveOut
    }
) ->
    case maps:is_key(Chan, Chans) of
        % already joined
        true ->
            State;
        false ->
            {ok, Links} = cabal_db:get_channel_heads(Db, Chan),
            Bin = cabal_posts:encode(KeyPair, Links, {join, Chan}),
            {ok, _, _PostHash} = cabal_db:save_post(Db, Bin),
            ok = cabal_db:channels_join(Db, Chan),
            self() ! {stateChange, Chan},
            %% send requests to each connection
            EachConn = fun(ConnPid, {AccPeers, AccActiveOuts}) ->
                case send_channel_state_request(State, ConnPid, Chan, true) of
                    {ok, {ReqId, Msg, Size}} ->
                        {
                            {sent, ReqId, ConnPid},
                            {
                                update_peer_sent(AccPeers, ConnPid, Size),
                                maps:put(ReqId, {ConnPid, Msg}, AccActiveOuts)
                            }
                        };
                    {error, _Reason} ->
                        %% If send fails, we don't track the request
                        {
                            {error, send_failed},
                            {AccPeers, AccActiveOuts}
                        }
                end
            end,
            {Reqs, {NewPeers, NewActiveOut}} = lists:mapfoldl(
                EachConn, {Peers, ActiveOut}, maps:keys(Peers)
            ),
            %% Filter out failed requests from the list
            ValidReqs = [R || R <- Reqs, element(1, R) =/= error],
            State#state{
                channels = maps:put(Chan, ValidReqs, Chans),
                peers = NewPeers,
                activeOut = NewActiveOut
            }
    end;
handle_channel_messages(
    channelsLeave,
    {Chan},
    State = #state{
        channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeOut = ActiveOut
    }
) ->
    case maps:is_key(Chan, Chans) of
        % not even joined
        false ->
            State;
        true ->
            {ok, Links} = cabal_db:get_channel_heads(Db, Chan),
            Bin = cabal_posts:encode(KeyPair, Links, {leave, Chan}),
            {ok, _Id, _Hash} = cabal_db:save_post(Db, Bin),
            ok = cabal_db:channels_leave(Db, Chan),
            self() ! {stateChange, Chan},
            %% drop the channel and all requests we had for it
            {ChanRequests, NewChans} = maps:take(Chan, Chans),
            EachReq = fun({_Direction, ReqId, ConnPid}, {AccActiveOuts, AccPeers}) ->
                {{ConnPid, _}, NewActiveOuts} = maps:take(ReqId, AccActiveOuts),
                case send_cancel_request(State, ConnPid, ReqId) of
                    {ok, {_, _, Size}} ->
                        {
                            NewActiveOuts,
                            update_peer_sent(AccPeers, ConnPid, Size)
                        };
                    {error, _} ->
                        {NewActiveOuts, AccPeers}
                end
            end,
            {NewActiveOut, NewPeers} = lists:foldl(EachReq, {ActiveOut, Peers}, ChanRequests),
            State#state{
                channels = NewChans,
                activeOut = NewActiveOut,
                peers = NewPeers
            }
    end;
handle_channel_messages(
    channelsSetTopic,
    {Chan, Topic},
    State = #state{channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeIn = ActiveIn}
) ->
    {ok, Links} = cabal_db:get_channel_heads(Db, Chan),
    Bin = cabal_posts:encode(KeyPair, Links, {topic, Chan, Topic}),
    {ok, _, PostHash} = cabal_db:save_post(Db, Bin),
    SentPeers = lists:foldl(
        fun({received, ReqId, ConnPid}, AccPeers) ->
            case maps:get(ReqId, ActiveIn, undefined) of
                undefined ->
                    %% Request no longer active (peer disconnected)
                    AccPeers;
                {ConnPid, [Header, _]} ->
                    case proplists:get_value(type, Header) of
                        4 ->
                            case send_hash_response(State, ConnPid, ReqId, [PostHash]) of
                                {ok, {_, _, Size}} ->
                                    update_peer_sent(AccPeers, ConnPid, Size);
                                {error, _Reason} ->
                                    %% Send failed (peer disconnected), skip update
                                    AccPeers
                            end;
                        % ignored
                        _ ->
                            AccPeers
                    end
            end
        end,
        Peers,
        maps:get(Chan, Chans)
    ),
    State#state{peers = SentPeers};
handle_channel_messages(channelsMembers, {From, Chan}, State = #state{channels = Chans, db = Db}) ->
    case maps:is_key(Chan, Chans) of
        % already joined
        false ->
            gen_server:reply(From, {error, notInChannel});
        true ->
            Result = cabal_db:get_channel_members(Db, Chan),
            gen_server:reply(From, Result)
    end,
    State;
handle_channel_messages(Message, Args, State) ->
    io:format("Unhandled channel message: ~p with args ~p~n", [Message, Args]),
    State.

%% Handle database-related messages
handle_database_messages(readTextsFromChannel, {From, Chan}, State = #state{db = Db}) ->
    {ok, Texts} = cabal_db:get_texts_for_channel(Db, Chan),
    gen_server:reply(From, {ok, Texts}),
    State;
handle_database_messages(
    writeTextToChannel,
    {Chan, Text},
    State = #state{channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeIn = ActiveIn}
) ->
    {ok, Links} = cabal_db:get_channel_heads(Db, Chan),
    Bin = cabal_posts:encode(KeyPair, Links, {text, Chan, Text}),
    {ok, _, PostHash} = cabal_db:save_post(Db, Bin),
    %% Mark channel for notification (we wrote a message to it)
    StateWithNotif = mark_channels_for_notification(State, [Chan]),
    %% find incoming channel time range (type:4) requests which want this post
    F = fun({Direction, ReqId, ConnPid}, AccPeers) ->
        case Direction of
            received ->
                case maps:get(ReqId, ActiveIn, undefined) of
                    undefined ->
                        %% Req no longer active (peer disconnected)
                        AccPeers;
                    {ConnPid, [Header, _]} ->
                        case proplists:get_value(msgType, Header) of
                            4 ->
                                case send_hash_response(State, ConnPid, ReqId, [PostHash]) of
                                    {ok, {_, _, Size}} ->
                                        update_peer_sent(AccPeers, ConnPid, Size);
                                    {error, _Reason} ->
                                        %% Send failed (peer disconnected), skip update
                                        AccPeers
                                end;
                            _ ->
                                AccPeers
                        end
                end;
            sent ->
                AccPeers
        end
    end,
    SentPeers = lists:foldl(F, Peers, maps:get(Chan, Chans)),
    StateWithNotif#state{peers = SentPeers};
handle_database_messages(Message, Args, State) ->
    io:format("Unhandled database message: ~p with args ~p~n", [Message, Args]),
    State.

%% Handle network-related messages (incoming messages processing)
handle_network_messages(incomingMsg, {ConnPid, Msg, MsgSize}, State) ->
    handle_incoming_message(State, ConnPid, Msg, MsgSize);
handle_network_messages(Message, Args, State) ->
    io:format("Unhandled network message: ~p with args ~p~n", [Message, Args]),
    State.

%% Extract the existing handle_incoming_message function as-is
handle_incoming_message(S, ConnPid, Msg, MsgSize) ->
    #state{activeOut = ActiveOut, activeIn = ActiveIn, peers = Peers, channels = Chans, db = Db} =
        S,
    NewPeers = update_peer_received(Peers, ConnPid, MsgSize),
    [Header, Body] = Msg,
    [{msgType, MsgType}, _, {requestId, ReqId}] = Header,
    ReqIdStr = hex:bin_to_hexstr(ReqId),
    IsActiveOut = maps:is_key(ReqId, ActiveOut),
    io:format(
        "[Peer/Req  ~p] incoming: Type:~p  Size:~p\tIsActiveOut:~p~n",
        [ReqIdStr, MsgType, MsgSize, IsActiveOut]
    ),
    %io:format("[DEBUG] Body: ~p~n", [Body]),
    case IsActiveOut of
        true ->
            %% io:format("[Peer:~p] Received reply to ~p!~n", [Peer, ReqIdStr]),
            case MsgType of
                %% hash response
                0 ->
                    FilterUnknownPosts = fun(H) -> not cabal_db:has_post(Db, H) end,
                    AllHashes = proplists:get_value(hashes, Body),
                    NewHashes = lists:filter(FilterUnknownPosts, AllHashes),
                    case length(NewHashes) of
                        % nothing to request
                        0 ->
                            S#state{peers = NewPeers};
                        _ ->
                            %% Determine request type to apply hybrid fetch strategy
                            {_OrigConnPid, OrigMsg} = maps:get(
                                ReqId, ActiveOut, {ConnPid, undefined}
                            ),
                            OrigMsgType =
                                case OrigMsg of
                                    undefined ->
                                        unknown;
                                    [[OrigHeader, _OrigBody], {pagination_limit, _}, {depth, _}] ->
                                        proplists:get_value(msgType, OrigHeader, unknown);
                                    [[OrigHeader, _OrigBody], {pagination_limit, _}] ->
                                        proplists:get_value(msgType, OrigHeader, unknown);
                                    [OrigHeader, _OrigBody] ->
                                        proplists:get_value(msgType, OrigHeader, unknown);
                                    _ ->
                                        unknown
                                end,

                            %% Hybrid fetch logic:
                            %% - channel_state_request (type 5): fetch ALL (metadata like join/leave/topic)
                            %% - channel_time_range_request (type 4): fetch first 100, store rest as unfetched
                            %% - unknown: default to eager fetch (backward compat)
                            {HashesToFetch, HashesToStore, Channel} =
                                case OrigMsgType of
                                    % channel_state_request - fetch all eagerly
                                    5 ->
                                        io:format(
                                            "[Peer] Hash response for channel_state_request: fetching all ~p hashes eagerly~n",
                                            [length(NewHashes)]
                                        ),
                                        {NewHashes, [], undefined};
                                    % channel_time_range_request - hybrid strategy
                                    4 ->
                                        Chan =
                                            case OrigMsg of
                                                [
                                                    [_OH, OrigBody],
                                                    {pagination_limit, _},
                                                    {depth, _}
                                                ] ->
                                                    proplists:get_value(
                                                        channel, OrigBody, undefined
                                                    );
                                                [[_OH, OrigBody], {pagination_limit, _}] ->
                                                    proplists:get_value(
                                                        channel, OrigBody, undefined
                                                    );
                                                [_OH, OrigBody] ->
                                                    proplists:get_value(
                                                        channel, OrigBody, undefined
                                                    );
                                                _ ->
                                                    undefined
                                            end,
                                        EagerCount = min(100, length(NewHashes)),
                                        Eager = lists:sublist(NewHashes, EagerCount),
                                        Lazy = lists:sublist(
                                            NewHashes, EagerCount + 1, length(NewHashes)
                                        ),
                                        io:format(
                                            "[Peer] Hash response for channel_time_range_request (~s): fetching ~p hashes eagerly, storing ~p as unfetched~n",
                                            [Chan, length(Eager), length(Lazy)]
                                        ),
                                        {Eager, Lazy, Chan};
                                    % unknown - default to eager
                                    _ ->
                                        io:format(
                                            "[Peer] Hash response for unknown request type: fetching all ~p hashes eagerly (fallback)~n",
                                            [length(NewHashes)]
                                        ),
                                        {NewHashes, [], undefined}
                                end,

                            %% Store unfetched hashes in database (internal operation)
                            case HashesToStore of
                                [] ->
                                    ok;
                                _ when Channel =/= undefined ->
                                    cabal_db:unfetched_add(Db, HashesToStore, Channel, "hash_response");
                                _ ->
                                    io:format(
                                        "[Peer] Warning: Cannot store unfetched hashes without channel info~n"
                                    ),
                                    ok
                            end,

                            %% Check if pagination is needed (we hit the limit)
                            %% Extract pagination metadata from original request
                            {PaginationLimit, OrigRequestBody, PaginationDepth} =
                                case OrigMsg of
                                    [
                                        [_OrigHeader, OBody],
                                        {pagination_limit, PLimit},
                                        {depth, Depth}
                                    ] ->
                                        {PLimit, OBody, Depth};
                                    [[_OrigHeader, OBody], {pagination_limit, PLimit}] ->
                                        {PLimit, OBody, 0};
                                    [_OrigHeader, OBody] when is_list(OBody) ->
                                        %% Check if this is a mismatched pattern - OBody should be a proplist
                                        case proplists:is_defined(channel, OBody) of
                                            true -> {undefined, OBody, 0};
                                            false -> {undefined, undefined, 0}
                                        end;
                                    _ ->
                                        {undefined, undefined, 0}
                                end,

                            %% Determine if we need to paginate (with depth limit to prevent infinite loops)
                            MaxPaginationDepth = 10,
                            ShouldPaginate =
                                PaginationLimit =/= undefined andalso
                                    OrigMsgType == 4 andalso
                                    Channel =/= undefined andalso
                                    length(AllHashes) == PaginationLimit andalso
                                    PaginationDepth < MaxPaginationDepth,

                            %% Log if we're paginating
                            case ShouldPaginate of
                                true ->
                                    io:format(
                                        "[Peer] Pagination: fetched ~p hashes (limit ~p), depth ~p/~p for channel ~s~n",
                                        [
                                            length(AllHashes),
                                            PaginationLimit,
                                            PaginationDepth + 1,
                                            MaxPaginationDepth,
                                            Channel
                                        ]
                                    );
                                false when PaginationDepth >= MaxPaginationDepth ->
                                    io:format(
                                        "[Peer] Warning: Max pagination depth reached for channel ~s~n",
                                        [Channel]
                                    );
                                false ->
                                    ok
                            end,

                            %% Request posts for eager hashes
                            StateAfterPosts =
                                case HashesToFetch of
                                    [] ->
                                        S#state{peers = NewPeers};
                                    _ ->
                                        {ok, {PostReqId, _PostReqMsg, SentSize}} = send_post_request(
                                            S, ConnPid, HashesToFetch
                                        ),
                                        %% Store pagination metadata if needed
                                        PostMetadata =
                                            case ShouldPaginate of
                                                true ->
                                                    {ConnPid, post_request, #{
                                                        pagination => true,
                                                        channel => Channel,
                                                        orig_start => proplists:get_value(
                                                            timestart, OrigRequestBody, undefined
                                                        ),
                                                        limit => PaginationLimit,
                                                        depth => PaginationDepth,
                                                        conn_pid => ConnPid
                                                    }};
                                                false ->
                                                    {ConnPid, post_request}
                                            end,
                                        UpdatedOut = maps:put(
                                            PostReqId, PostMetadata, ActiveOut
                                        ),
                                        S#state{
                                            peers = update_peer_sent(NewPeers, ConnPid, SentSize),
                                            activeOut = UpdatedOut
                                        }
                                end,

                            %% Pagination will be triggered after posts are saved
                            StateAfterPosts
                    end;
                %% post response
                1 ->
                    %% TODO: check we actually asked for this
                    Posts = proplists:get_value(posts, Body),
                    FilterUnknownPosts = fun(Bin) ->
                        %% TODO: avoid double decode by pushing this into the db module
                        [NewHeader, _] = cabal_posts:decode(Bin),
                        H = proplists:get_value(hash, NewHeader),
                        HasNot = not cabal_db:has_post(Db, H),
                        %%io:format("[TEMP] got new(~p) post reply:~n~p~n", [HasNot, NewHeader]),
                        HasNot
                    end,
                    NewPosts = lists:filter(FilterUnknownPosts, Posts),
                    SavedPosts = [cabal_db:save_post(Db, P) || P <- NewPosts],

                    %% Remove successfully fetched hashes from unfetched_hashes table (internal operation)
                    FetchedHashes = [Hash || {ok, _, Hash} <- SavedPosts],
                    case FetchedHashes of
                        [] ->
                            ok;
                        _ ->
                            cabal_db:unfetched_delete(Db, FetchedHashes),
                            io:format("[Peer] Removed ~p fetched hashes from unfetched table~n", [
                                length(FetchedHashes)
                            ])
                    end,

                    %% Check if pagination is needed (check metadata from post_request)
                    OrigMeta = maps:get(ReqId, ActiveOut, {ConnPid, post_request}),

                    StateAfterPagination =
                        case OrigMeta of
                            {_OrigConn, post_request, #{pagination := true} = PagMeta} ->
                                %% Pagination needed! Get oldest timestamp and send request
                                PagChannel = maps:get(channel, PagMeta),
                                OrigStart = maps:get(orig_start, PagMeta),
                                PagLimit = maps:get(limit, PagMeta),
                                PagDepth = maps:get(depth, PagMeta),
                                PagConnPid = maps:get(conn_pid, PagMeta),

                                {ok, OldestTs} = cabal_db:get_oldest_timestamp(Db, PagChannel),

                                case OldestTs of
                                    undefined ->
                                        io:format(
                                            "[Peer] Warning: Cannot paginate - no timestamp found for channel ~s~n",
                                            [PagChannel]
                                        ),
                                        S;
                                    _ when OldestTs =< OrigStart ->
                                        %% All messages are within original window, no more to fetch
                                        io:format(
                                            "[Peer] Pagination not needed - oldest message (~p) within original window (~p)~n",
                                            [OldestTs, OrigStart]
                                        ),
                                        S;
                                    _ ->
                                        %% Send pagination request for older messages
                                        TimeWindow = 12 * 60 * 60 * 1000,
                                        NewEnd = OldestTs - 1,
                                        NewStart = max(OrigStart, NewEnd - TimeWindow),

                                        io:format(
                                            "[Peer] Sending pagination request for ~s: Start=~p, End=~p (OldestTs=~p), Limit=~p, Depth=~p~n",
                                            [
                                                PagChannel,
                                                NewStart,
                                                NewEnd,
                                                OldestTs,
                                                PagLimit,
                                                PagDepth + 1
                                            ]
                                        ),

                                        {ok, {PagReqId, PagMsg, PagSize}} =
                                            send_channel_time_range_request(
                                                S,
                                                PagConnPid,
                                                PagChannel,
                                                NewStart,
                                                NewEnd,
                                                PagLimit
                                            ),

                                        %% Annotate with pagination metadata
                                        NewDepth = PagDepth + 1,
                                        AnnotatedPagMsg = [
                                            PagMsg,
                                            {pagination_limit, PagLimit},
                                            {depth, NewDepth}
                                        ],

                                        %% Update state
                                        NewActiveOut = maps:put(
                                            PagReqId, {PagConnPid, AnnotatedPagMsg}, ActiveOut
                                        ),
                                        NewChans = maps:update_with(
                                            PagChannel,
                                            fun(Reqs) -> Reqs ++ [{sent, PagReqId, PagConnPid}] end,
                                            [{sent, PagReqId, PagConnPid}],
                                            S#state.channels
                                        ),
                                        S#state{
                                            activeOut = NewActiveOut,
                                            channels = NewChans,
                                            peers = update_peer_sent(
                                                S#state.peers, PagConnPid, PagSize
                                            )
                                        }
                                end;
                            _ ->
                                %% No pagination needed
                                S
                        end,

                    %% Extract channels from saved posts and mark for notification
                    Channels = lists:filtermap(
                        fun(P) ->
                            [_PostHeader, PostBody] = cabal_posts:decode(P),
                            case proplists:get_value(channel, PostBody, undefined) of
                                undefined -> false;
                                Chan -> {true, Chan}
                            end
                        end,
                        NewPosts
                    ),
                    NewState = mark_channels_for_notification(StateAfterPagination, Channels),
                    NewState#state{peers = NewPeers};
                %% channel list response
                7 ->
                    Channels = proplists:get_value(channels, Body),
                    io:format(
                        "[Peer] Received channel list response from conn ~p: ~p channels~n",
                        [ConnPid, length(Channels)]
                    ),
                    io:format("  Channels: ~p~n", [Channels]),
                    %% Store the received channel list (keyed by connection ID)
                    PeerChannelLists = S#state.peerChannelLists,
                    NewPeerChannelLists = maps:put(ConnPid, Channels, PeerChannelLists),
                    S#state{peers = NewPeers, peerChannelLists = NewPeerChannelLists}
            end;
        %% incoming request
        false ->
            case MsgType of
                %% 0, 1 and 7 are responses [hash, post, channel list]

                % post req
                2 ->
                    Hashes = proplists:get_value(hashes, Body),
                    LoadKnownPosts = fun(H) ->
                        case cabal_db:load_post(Db, H) of
                            notFound -> false;
                            {ok, [_, _, Bin]} -> {true, Bin}
                        end
                    end,
                    KnownPosts = lists:filtermap(LoadKnownPosts, Hashes),
                    {ok, {_ReqId, _Msg, Size}} = send_post_response(S, ConnPid, ReqId, KnownPosts),
                    S#state{peers = update_peer_sent(Peers, ConnPid, Size)};
                % cancel req
                3 ->
                    CancelId = proplists:get_value(cancelId, Body, <<>>),
                    case maps:take(CancelId, ActiveIn) of
                        % ignore
                        error ->
                            S#state{peers = NewPeers};
                        {_, NewActiveIn} ->
                            UpdateChans = fun(_Chan, ChanReqs) ->
                                F = fun({received, ChanReqId, _}) ->
                                    CancelId =/= ChanReqId
                                end,
                                UpdatedReqs = lists:filter(F, ChanReqs),
                                {true, UpdatedReqs}
                            end,
                            S#state{
                                peers = NewPeers,
                                channels = maps:filtermap(UpdateChans, Chans),
                                activeIn = NewActiveIn
                            }
                    end;
                % channel time req
                4 ->
                    RequestedChan = proplists:get_value(channel, Body),
                    case maps:take(RequestedChan, Chans) of
                        % ignore
                        error ->
                            S#state{peers = NewPeers};
                        {ChanReqs, TmpChans} ->
                            %%io:format("[DEBUG] Received incoming chan time req(~p) for ~p from ~p~n",
                            %%          [ReqIdStr, RequestedChan, Peer]),
                            Params = [
                                proplists:get_value(F, Body)
                             || F <- [timestart, timeend, limit]
                            ],
                            %%io:format("[DEBUG] with params: S:~p - E:~p - L:~p ~n",Params),
                            [Start, End, Limit] = Params,
                            {ok, Hashes} = cabal_db:get_text_hashes_by_time_range(
                                Db, RequestedChan, Start, End, Limit
                            ),
                            case send_hash_response(S, ConnPid, ReqId, Hashes) of
                                {ok, {_, _, RespSize}} ->
                                    %% TODO: only track if 'end' is in the future
                                    NewChanReqs = ChanReqs ++ [{received, ReqId, ConnPid}],
                                    S#state{
                                        peers = update_peer_sent(NewPeers, ConnPid, RespSize),
                                        channels = maps:put(RequestedChan, NewChanReqs, TmpChans),
                                        activeIn = maps:put(ReqId, {ConnPid, Msg}, ActiveIn)
                                    };
                                {error, _Reason} ->
                                    %% Send failed (peer disconnected), don't track request
                                    S#state{peers = NewPeers}
                            end
                    end;
                % channel state req
                5 ->
                    RequestedChan = proplists:get_value(channel, Body),
                    case maps:take(RequestedChan, Chans) of
                        % ignore
                        error ->
                            S#state{peers = NewPeers};
                        {ChanReqs, TmpChans} ->
                            %%  io:format("[DEBUG] Received incoming state req for ~p from ~p~n",
                            %%            [RequestedChan, Peer]),
                            {ok, Hashes} = cabal_db:get_channel_state(Db, RequestedChan),
                            case send_hash_response(S, ConnPid, ReqId, Hashes) of
                                {ok, {_, _, RespSize}} ->
                                    %% TODO: only track if future is set to 1
                                    NewChanReqs = ChanReqs ++ [{received, ReqId, ConnPid}],
                                    S#state{
                                        peers = update_peer_sent(NewPeers, ConnPid, RespSize),
                                        channels = maps:put(RequestedChan, NewChanReqs, TmpChans),
                                        activeIn = maps:put(ReqId, {ConnPid, Msg}, ActiveIn)
                                    };
                                {error, _Reason} ->
                                    %% Send failed (peer disconnected), don't track request
                                    S#state{peers = NewPeers}
                            end
                    end;
                % channel list request
                6 ->
                    ChanList = maps:keys(Chans),
                    {ok, {_, _, ResponseSize}} = send_channel_list_response(
                        S, ConnPid, ReqId, ChanList
                    ),
                    RTxPeers = update_peer_sent(NewPeers, ConnPid, ResponseSize),
                    S#state{peers = RTxPeers};
                Unhandled ->
                    io:format(
                        "[Warning] unhandled incoming message (Type:~p|ReqId:~p) with activeOut:false~n",
                        [Unhandled, ReqIdStr]
                    ),
                    S#state{peers = NewPeers}
            end
    end.

%%%%
%% sender
%%%

send_cancel_request(State, ConnPid, CancelId) ->
    io:format(
        "[Peer] sending cancel for request ~p to conn ~p~n",
        [hex:bin_to_hexstr(CancelId), ConnPid]
    ),
    ReqId = crypto:strong_rand_bytes(4),
    Header = [
        {requestId, ReqId},
        {circuitId, <<0, 0, 0, 0>>},
        {ttl, 0}
    ],
    Binary = cabal_wire:encode_cancel_request(Header, CancelId),
    send_binary_to_peer(State, ConnPid, Binary).

send_channel_state_request(State, ConnPid, Channel, Future) ->
    ReqId = crypto:strong_rand_bytes(4),
    io:format(
        "[Peer] sending channel state request(~p) for ~p to conn ~p~n",
        [hex:bin_to_hexstr(ReqId), Channel, ConnPid]
    ),
    Header = [
        {requestId, ReqId},
        {circuitId, <<0, 0, 0, 0>>},
        {ttl, 3}
    ],
    Binary = cabal_wire:encode_channel_state_request(Header, Channel, Future),
    send_binary_to_peer(State, ConnPid, Binary).

send_channel_time_range_request(State, ConnPid, Channel, Start, End, Limit) ->
    ReqId = crypto:strong_rand_bytes(4),
    io:format(
        "[Peer] sending channel time range request(~p) for ~p to conn ~p~n",
        [hex:bin_to_hexstr(ReqId), Channel, ConnPid]
    ),
    Header = [
        {requestId, ReqId},
        {circuitId, <<0, 0, 0, 0>>},
        {ttl, 3}
    ],
    Binary = cabal_wire:encode_channel_time_range_request(Header, Channel, Start, End, Limit),
    send_binary_to_peer(State, ConnPid, Binary).

send_post_request(State, ConnPid, Hashes) ->
    ReqId = crypto:strong_rand_bytes(4),
    io:format(
        "[Peer] sending post request(~p) for ~p hashes to conn ~p~n",
        [hex:bin_to_hexstr(ReqId), length(Hashes), ConnPid]
    ),
    Header = [
        {requestId, ReqId},
        {circuitId, <<0, 0, 0, 0>>},
        {ttl, 3}
    ],
    Binary = cabal_wire:encode_post_request(Header, Hashes),
    send_binary_to_peer(State, ConnPid, Binary).

send_channel_list_request(State, ConnPid, Offset, Limit) ->
    ReqId = crypto:strong_rand_bytes(4),
    io:format(
        "[Peer] sending channel list request(~p) to conn ~p (offset:~p, limit:~p)~n",
        [hex:bin_to_hexstr(ReqId), ConnPid, Offset, Limit]
    ),
    Header = [
        {requestId, ReqId},
        {circuitId, <<0, 0, 0, 0>>},
        {ttl, 3}
    ],
    Binary = cabal_wire:encode_channel_list_request(Header, Offset, Limit),
    send_binary_to_peer(State, ConnPid, Binary).

send_channel_list_response(State, ConnPid, ReqId, Channels) ->
    io:format(
        "[Peer] sending ch. list response of ~p channels to conn ~p~n",
        [length(Channels), ConnPid]
    ),
    Header = [
        {requestId, ReqId},
        {circuitId, <<0, 0, 0, 0>>}
    ],
    Binary = cabal_wire:encode_channel_list_response(Header, Channels),
    send_binary_to_peer(State, ConnPid, Binary).

send_post_response(State, ConnPid, ReqId, Posts) ->
    io:format(
        "[Peer] sending post response(~p) for ~p posts to conn ~p~n",
        [hex:bin_to_hexstr(ReqId), length(Posts), ConnPid]
    ),
    Header = [
        {requestId, ReqId},
        {circuitId, <<0, 0, 0, 0>>}
    ],
    Binary = cabal_wire:encode_post_response(Header, Posts),
    send_binary_to_peer(State, ConnPid, Binary).

send_hash_response(State, ConnPid, ReqId, Hashes) ->
    io:format(
        "[Peer/Resp ~p] sending hash ~p hashes to conn ~p~n",
        [hex:bin_to_hexstr(ReqId), length(Hashes), ConnPid]
    ),
    Header = [
        {requestId, ReqId},
        {circuitId, <<0, 0, 0, 0>>}
    ],
    Binary = cabal_wire:encode_hash_response(Header, Hashes),
    send_binary_to_peer(State, ConnPid, Binary).

%%%%%%%%%%%
% helpers %
%%%%%%%%%%%

%% Convert address tuple to string format for consistency
%% Handles both tuple format from transport ({{127,0,0,1}, 3113})
%% and string format from database ("127.0.0.1:3113")
format_address({{A, B, C, D}, Port}) ->
    lists:flatten(io_lib:format("~w.~w.~w.~w:~w", [A, B, C, D, Port]));
format_address(Addr) when is_list(Addr) ->
    Addr.

send_binary_to_peer(State, ConnPid, Binary) ->
    %% io:format("[Peer] Sending ~p bytes to connection ~p~n", [byte_size(Binary), ConnPid]),
    BinarySize = byte_size(Binary),
    {MsgLen, Payload} = cabal_wire:decode_varint(Binary),
    case byte_size(Payload) =:= MsgLen of
        false ->
            throw("unexpected message size");
        true ->
            {ok, Msg} = cabal_wire:decode(Payload),
            TPid = State#state.transportPid,
            case cabal_transport:send(TPid, ConnPid, Binary) of
                ok ->
                    [Header, _] = Msg,
                    ReqId = proplists:get_value(requestId, Header),
                    {ok, {ReqId, Msg, BinarySize}};
                {error, Reason} ->
                    io:format("[Peer] Failed to send message to conn ~p: ~p~n", [ConnPid, Reason]),
                    {error, Reason}
            end
    end.

update_peer_sent(Peers, ConnPid, Size) when is_map(Peers), is_integer(Size) ->
    PeerMeta = maps:get(ConnPid, Peers),
    {Tx, Rx} = maps:get(bytesTransmitted, PeerMeta),
    NewPeers = Peers#{ConnPid => maps:update(bytesTransmitted, {Tx + Size, Rx}, PeerMeta)},
    NewPeers.

update_peer_received(Peers, ConnPid, Size) when is_map(Peers), is_integer(Size) ->
    case maps:take(ConnPid, Peers) of
        error ->
            throw({peerNotActive, ConnPid});
        {PeerMeta, PeersSansPeer} ->
            {Tx, Rx} = maps:get(bytesTransmitted, PeerMeta),
            UpdatedMeta = PeerMeta#{
                bytesTransmitted => {Tx, Rx + Size},
                lastSeenAt => os:system_time(1000)
            },
            maps:put(ConnPid, UpdatedMeta, PeersSansPeer)
    end.

%% these could be their own module

default_caberl_location() ->
    H = os:getenv("HOME"),
    Loc = filename:join([H, ".caberl"]),
    ok =
        case file:make_dir(Loc) of
            ok -> ok;
            {error, eexist} -> ok
        end,
    Loc.

create_or_load_keypair(Location) ->
    Fname = filename:join(Location, "secret"),
    Seed =
        case file:read_file(Fname) of
            %% load existing
            {ok, Content} when byte_size(Content) =:= 32 ->
                Content;
            %% generate new keypair
            {error, enoent} ->
                S = crypto:strong_rand_bytes(32),
                ok = file:write_file(Fname, S),
                ok = file:change_mode(Fname, 8#0400),
                io:format("[Cabal] Created fresh keypair~n"),
                S
        end,
    Kp = enacl:sign_seed_keypair(Seed),
    PubKeyStr = hex:bin_to_hexstr(maps:get(public, Kp)),
    io:format("[Cabal] Public Key: ~s~n", [PubKeyStr]),
    Kp.

create_or_load_transport_keypair(Location) ->
    Fname = filename:join(Location, "transport_secret"),
    case file:read_file(Fname) of
        %% load existing - new format stores both secret (32) and public (32) = 64 bytes
        {ok, <<Secret:32/binary, Public:32/binary>>} ->
            io:format("[Transport] Loaded existing Curve25519 keypair~n"),
            enoise_keypair:new(dh25519, Secret, Public);
        %% load old format - just secret key (32 bytes), derive public key
        {ok, Secret} when byte_size(Secret) =:= 32 ->
            io:format("[Transport] Loaded old format keypair, deriving public key~n"),
            Kp = enoise_keypair:new(dh25519, Secret, undefined),
            %% Upgrade to new format
            Public = enoise_keypair:pubkey(Kp),
            ok = file:write_file(Fname, <<Secret/binary, Public/binary>>),
            ok = file:change_mode(Fname, 8#0400),
            io:format("[Transport] Upgraded keypair file to new format~n"),
            Kp;
        %% generate new keypair
        {error, enoent} ->
            Kp = enoise_keypair:new(dh25519),
            Secret = enoise_keypair:seckey(Kp),
            Public = enoise_keypair:pubkey(Kp),
            ok = file:write_file(Fname, <<Secret/binary, Public/binary>>),
            ok = file:change_mode(Fname, 8#0400),
            io:format("[Transport] Created fresh Curve25519 keypair~n"),
            Kp
    end.

%% one off utility

human_bytesize(Size) ->
    human_bytesize(Size, ["b", "kb", "mb", "gb"]).
human_bytesize(S, [_ | [_ | _] = L]) when S >= 1024 ->
    human_bytesize(S / 1024, L);
human_bytesize(S, [M | _]) ->
    lists:flatten(io_lib:format("~.1f~s", [float(S), M])).

%% Event handler management functions

mark_channels_for_notification(State = #state{peerEventsPid = PeerEventsPid}, Channels) ->
    %% Notify cabal_peer_events about new channel activity
    case PeerEventsPid of
        undefined ->
            %% cabal_peer_events not started yet (during initialization), skip notification
            State;
        _ ->
            cabal_peer_events:notify_channels(PeerEventsPid, Channels),
            State
    end.

%% Polling helper for synchronous dial
poll_for_connection(Pid, Timeout) ->
    %% First wait for peer connection
    StartTime = erlang:monotonic_time(millisecond),
    case poll_for_peer_connection(Pid, Timeout, StartTime) of
        ok ->
            %% Then wait for initial sync to complete by checking that channels have content
            poll_for_initial_sync(Pid, Timeout, StartTime);
        {error, Reason} ->
            {error, Reason}
    end.

poll_for_peer_connection(Pid, Timeout, StartTime) ->
    Elapsed = erlang:monotonic_time(millisecond) - StartTime,
    if
        Elapsed >= Timeout ->
            {error, timeout};
        true ->
            case peer_list(Pid) of
                [] ->
                    timer:sleep(50),
                    poll_for_peer_connection(Pid, Timeout, StartTime);
                [_ | _] ->
                    ok
            end
    end.

poll_for_initial_sync(Pid, Timeout, StartTime) ->
    Elapsed = erlang:monotonic_time(millisecond) - StartTime,
    if
        Elapsed >= Timeout ->
            {error, timeout};
        true ->
            %% Check if we have any messages or unfetched hashes in joined channels
            {ok, Channels} = channels_joined(Pid),
            case Channels of
                [] ->
                    %% No channels joined, sync complete
                    {ok, synced};
                [Chan | _] ->
                    %% Check if channel has content (messages or unfetched)
                    {ok, {Texts, _}} = read(Pid, Chan),
                    {ok, Unfetched} = unfetched_count(Pid, Chan),
                    Total = length(Texts) + Unfetched,
                    if
                        Total > 0 ->
                            %% Have content, sync complete
                            {ok, synced};
                        true ->
                            %% Still syncing
                            timer:sleep(50),
                            poll_for_initial_sync(Pid, Timeout, StartTime)
                    end
            end
    end.

%% Polling helper for synchronous fetch_history
poll_for_fetch_completion(Pid, Channel, InitialUnfetched, RequestedCount, StartTime, Timeout) ->
    Elapsed = erlang:monotonic_time(millisecond) - StartTime,
    if
        Elapsed >= Timeout ->
            {error, timeout};
        true ->
            {ok, CurrentUnfetched} = unfetched_count(Pid, Channel),
            Expected = max(0, InitialUnfetched - RequestedCount),
            if
                CurrentUnfetched =< Expected ->
                    {ok, RequestedCount};
                true ->
                    timer:sleep(50),
                    poll_for_fetch_completion(
                        Pid, Channel, InitialUnfetched, RequestedCount, StartTime, Timeout
                    )
            end
    end.

%% Polling helper for await_writes
%% Wait for message count to stabilize (no changes for 200ms)
poll_for_writes_complete(Pid, Channel, LastCount, StartTime, Timeout) ->
    Elapsed = erlang:monotonic_time(millisecond) - StartTime,
    if
        Elapsed >= Timeout ->
            {error, timeout};
        true ->
            {ok, {Texts, _}} = read(Pid, Channel),
            CurrentCount = length(Texts),
            if
                CurrentCount =:= LastCount ->
                    %% Count stable, wait a bit to confirm
                    timer:sleep(200),
                    {ok, {Texts2, _}} = read(Pid, Channel),
                    FinalCount = length(Texts2),
                    if
                        FinalCount =:= CurrentCount ->
                            %% Still stable, we're done
                            ok;
                        true ->
                            %% Changed during wait, continue polling
                            timer:sleep(50),
                            poll_for_writes_complete(Pid, Channel, FinalCount, StartTime, Timeout)
                    end;
                true ->
                    %% Count changed, continue polling
                    timer:sleep(50),
                    poll_for_writes_complete(Pid, Channel, CurrentCount, StartTime, Timeout)
            end
    end.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
human_bytesize_test() ->
    ?assertEqual("81.0mb", human_bytesize(84912712)),
    ?assertEqual("512.0b", human_bytesize(512)),
    ?assertEqual("1.0kb", human_bytesize(1024)).

serialize_and_load_keypair_test() ->
    TestDir = string:chomp(
        os:cmd("mktemp --tmpdir -d cabal-test-XXXXX")
    ),
    Kp = create_or_load_keypair(TestDir),
    Kp2 = create_or_load_keypair(TestDir),
    ?assertEqual(Kp, Kp2).

format_address_test() ->
    %% Test converting tuple format to string
    ?assertEqual("127.0.0.1:3113", format_address({{127, 0, 0, 1}, 3113})),
    ?assertEqual("192.168.1.100:8080", format_address({{192, 168, 1, 100}, 8080})),

    %% Test passthrough of string format
    ?assertEqual("127.0.0.1:3113", format_address("127.0.0.1:3113")),
    ?assertEqual("localhost:3000", format_address("localhost:3000")).

-endif.