CEUY65RGF7W2FNSUILOMZPX7UPLUYF46WTV2KF44UR54A6JXOSJAC HTM42FGAEIPH2L545FFCCH5CFGE7ZN7GTWXVXJ5W5AQT5KDX4SGQC 27WOERVOOJ7CB3QBO4NXHPKKGXJBQBYHKN6OOTKUYYA4AMTBBFSQC MB3NERAPOF4M7QFHD5GP3VJ3FUCLWGILKNQNGJISVV3LU44KN5FQC 7KJ7NRWGHHFCJEZRRGEQ47CZDJ5YEZWTIIDNSLFGHKXITKBKUCRAC JMYRP5MUPSBHABHBPIO45F6RC7QSQSNO2EJDUDUIQXX3MEEFDOAQC 2R3WFEOT3WWS6NFBBABSVRUNUPTXHFFMGPZZQOCPLTD2WB3U55HQC QTLCENKPK4QOQJTHEAWAJYTJWVH7ZI5KQ2CQTMJRMA4TOMCHTVBAC C7DQVKR64FXYO37D72SNDMCJOM5PCZ3QW23MBFYNLRXFRNV2R4IAC J32TNYTRQJ3YTGFSI5FXW63R5ACT5SZQ6C4YSHLAZ4HXYAHAN7YQC 6RQQDL46IO2ZFTJSEJREWJIMTNHOH4UBSO2VXAYNLEWNUR72OWHQC NBMKIBO6UJKXCOXXVPPENLEBYI4YOU2VCHH5KIOUGH7WJG47N4PQC JVURIEXR72OUGZ2EHP5HB6OMXUNPHMAV66YUUP2444TPOG5XGASQC EDLKGFB5NWTZTHEO6IAR5M4W4533KJI4O6673MKJNTW3JPHQZP5AC RD27IBBLP4BLBGJJ3GJHYQB2EDXTV33N7WODCYU2C2N6CUX2HAPAC QJSK6Y6GTGHLBKQVS6Z2ONXGZVYTJ43DECJIBRPUPUUJFE4HETIAC 755UGKECZ3PFYEA2TFFOUZTF27CRQTBZWO7UYFU6WQDJMZDBVPRAC R4JDMB7LL3FLA4NJEAV2DQEXII5XS5KIMG3H4YS5P6W7ZZUE7FIQC XGOBDG23Q4PBQCE5IDY4ACQQKXJ77VM43Q433GBWI4VK3PT5NLEAC HSZ7IZYPWNACXC5PJO7CL6P7PWM3FNC3IZTRK2R2TWEWFCJFP6RQC BXNLFE3IMYFUYXQ6RYXKU4FVWV4T2H7WI5AL2DVYJMA4X2I5XB6QC 3RUDKBK2XSZZSMORSCHSLHZCVSQFCCZEHQFB2BFSPCGXC5XVLJMQC Z3LME4VKGE7Z5CXYOXH4YDZNHQPVCP7ORSMAQ2AQO7ZVQ5WASLHQC 4DHZ23CHOQKPJYXVVDUKU4DGWK4TRPLNGZRKGR5OF6WXNRPH3UOAC 55WLMLEEVBRSTAFRZ5RGF7TOGUF5OPVCPA2TMHAQK45OUO7PA3YQC JR3F3TQ3A7I2K5AWRK7SKAOZI5XE3ZGAS3VK4O7FKODUONQSY35QC CTO6D5DFEM2N3FD7HOJART6MSTV3LNZ56UJKHW3X3SVQCDTCHGIQC 34UNGHWEZK2J2VHCULNFD5AQPZQPD6MXU7ZAKY4DLJ5TQSROQ4BAC 76BR2NBTN2IMHYXE4SXEZFPYUZDP55XVPXVHBJFG3T6EKDXXODFQC IHBNW3GI2XB6KAWUYRLL6KDOBUNUOU3N7RRLS6BFNW6SE7ZDHLWQC BSISJB2O2HKYGSCX6HIIMLBIXCZ66BRCZH3622G2NOQJRJ5S3HLAC CBHKQGLDCAH2E4ZNACITBSMADOKPERFCWQPUGMH7UN5TLJXLYI4QC M4TNRFRPHEH6T673JAMJ3CHABASCWMAJVU57HH2XEMDJCB3QPT5QC DZZ4B3UGIYTN3OHAKS2HNCLK7KM2ZSHPZ4JC6YVQL6I2H4KCD5PAC DJ7EM5ZXZRSOBHEAA5EVZNVULJCQ7EX4DQBSK2PKPXITWJDPIUXQC YWCRGWVDIMCPXBQFM23MKCYZCXJJSD733NJEPB6WUU4G4BSREILAC YDZ3R5NY7II7WX4Q5O2UQ72TMCABDPPRLEMTTIBH5DGKOLAOPOPAC 2E4H4QPHKUDVTUDO335LRPAAU7374N5EX6TJW2NKDW7JK3N4HRTQC JYBBIFEKDXX2YPVVBR7ZCDDECV3RMFMM4QWOB2DYRPA5CVN3XVCAC U5FBSDZETGGNW7DDWIX54S47USLJA3S6UUMVBLGUX2E3CFTHTQCQC T3VWIP2EHZMGXM6UQMK77WB225WRNTAW7IWXZ5ZHUZ6PE3I7AATQC GMJDM2KTUACQRY7RQYC25U4UCVD6JEIU3A45GPOCK2QTJS56WQ3AC {ok, Alfi} = peer:start_link(A),{ok, Bert} = peer:start_link(B),
{ok, AlfiSup} = peer_sup:start_link(A),Alfi = peer_sup:get_peer_pid(AlfiSup),{ok, BertSup} = peer_sup:start_link(B),Bert = peer_sup:get_peer_pid(BertSup),
original_lazy_fetch_test() ->%% Test the hybrid fetching behavior:%% - First 100 messages fetched eagerly%% - Remaining messages stored as unfetched%% - fetch_history retrieves them on demandC = "backlog-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),%% Alice creates a channel with 150 text messagespeer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Create 150 text messagesNumMessages = 150,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message number ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NumMessages)),%% Give Alice a moment to process all writestimer:sleep(100),%% Verify Alice has all messages{ok, {AliceTexts, _}} = peer:read(Alice, C),?debugFmt("~nAlice has ~p messages in channel~n", [length(AliceTexts)]),?assertEqual(NumMessages, length(AliceTexts)),%% Bob joins and connects to Alicepeer:join(Bob, C),peer:set_nick(Bob, "Bob"),%% Connect the peers{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),%% Give them time to sync (channel state + first 100 messages)timer:sleep(2000),%% Bob should have some unfetched messages (>50 since we sent 150, fetched 100 eagerly){ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),?debugFmt("~nBob has ~p unfetched messages~n", [UnfetchedBefore]),%% Allow some wiggle room for processing?assert(UnfetchedBefore >= 40),%% At least 90 should have been fetched?assert(UnfetchedBefore =< NumMessages - 90),%% Fetch next 30 messages from history{ok, RequestedCount} = peer:fetch_history(Bob, C, 30),?debugFmt("~nRequested ~p messages from history~n", [RequestedCount]),?assertEqual(30, RequestedCount),%% Give time for messages to arrivetimer:sleep(1000),%% Verify unfetched count decreased{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),?debugFmt("~nBob now has ~p unfetched messages~n", [UnfetchedAfter]),?assert(UnfetchedAfter < UnfetchedBefore),?assertEqual(UnfetchedBefore - 30, UnfetchedAfter),%% Verify Bob can read more messages now{ok, {BobTexts, _}} = peer:read(Bob, C),?debugFmt("~nBob can now read ~p messages~n", [length(BobTexts)]),%% Should have at least 130 messages (100 eager + 30 from fetch_history)?assert(length(BobTexts) >= 130),%% Fetch the rest{ok, RequestedRest} = peer:fetch_history(Bob, C, 100),?debugFmt("~nRequested remaining ~p messages~n", [RequestedRest]),timer:sleep(1000),%% Verify all messages fetched{ok, UnfetchedFinal} = peer:unfetched_count(Bob, C),?debugFmt("~nBob finally has ~p unfetched messages~n", [UnfetchedFinal]),?assertEqual(0, UnfetchedFinal),%% Verify Bob has all messages{ok, {BobFinalTexts, _}} = peer:read(Bob, C),?debugFmt("~nBob can read all ~p messages~n", [length(BobFinalTexts)]),?assertEqual(NumMessages, length(BobFinalTexts)),ok = peer:stop(Alice),ok = peer:stop(Bob).%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Basic Lazy Loading Tests%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%basic_lazy_fetch_test() ->%% Test the hybrid fetching behavior:%% - First 100 messages fetched eagerly%% - Remaining messages stored as unfetched%% - fetch_history retrieves them on demandC = "backlog-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),%% Alice creates a channel with 150 text messagespeer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Create 150 text messagesNumMessages = 150,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message number ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NumMessages)),%% Give Alice a moment to process all writestimer:sleep(100),%% Verify Alice has all messages{ok, {AliceTexts, _}} = peer:read(Alice, C),?debugFmt("~nAlice has ~p messages in channel~n", [length(AliceTexts)]),?assertEqual(NumMessages, length(AliceTexts)),%% Bob joins and connects to Alicepeer:join(Bob, C),peer:set_nick(Bob, "Bob"),%% Connect the peers{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),%% Give them time to sync (channel state + first 100 messages)timer:sleep(2000),%% Bob should have some unfetched messages (>50 since we sent 150, fetched 100 eagerly){ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),?debugFmt("~nBob has ~p unfetched messages~n", [UnfetchedBefore]),%% Allow some wiggle room for processing?assert(UnfetchedBefore >= 40),%% At least 90 should have been fetched?assert(UnfetchedBefore =< NumMessages - 90),%% Fetch next 30 messages from history{ok, RequestedCount} = peer:fetch_history(Bob, C, 30),?debugFmt("~nRequested ~p messages from history~n", [RequestedCount]),?assertEqual(30, RequestedCount),%% Give time for messages to arrivetimer:sleep(1000),%% Verify unfetched count decreased{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),?debugFmt("~nBob now has ~p unfetched messages~n", [UnfetchedAfter]),?assert(UnfetchedAfter < UnfetchedBefore),?assertEqual(UnfetchedBefore - 30, UnfetchedAfter),%% Verify Bob can read more messages now{ok, {BobTexts, _}} = peer:read(Bob, C),?debugFmt("~nBob can now read ~p messages~n", [length(BobTexts)]),%% Should have at least 130 messages (100 eager + 30 from fetch_history)?assert(length(BobTexts) >= 130),%% Fetch the rest{ok, RequestedRest} = peer:fetch_history(Bob, C, 100),?debugFmt("~nRequested remaining ~p messages~n", [RequestedRest]),timer:sleep(1000),%% Verify all messages fetched{ok, UnfetchedFinal} = peer:unfetched_count(Bob, C),?debugFmt("~nBob finally has ~p unfetched messages~n", [UnfetchedFinal]),?assertEqual(0, UnfetchedFinal),%% Verify Bob has all messages{ok, {BobFinalTexts, _}} = peer:read(Bob, C),?debugFmt("~nBob can read all ~p messages~n", [length(BobFinalTexts)]),?assertEqual(NumMessages, length(BobFinalTexts)),ok = peer:stop(Alice),ok = peer:stop(Bob).%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Real-world Scenario Tests%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%sync_then_new_posts_test() ->%% Test scenario:%% 1. Alice creates 150 messages%% 2. Bob connects and syncs (gets 100 eagerly, 50 unfetched)%% 3. Alice creates 20 MORE messages while connected%% 4. Verify Bob's unfetched count updates correctlyC = "updates-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),peer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Phase 1: Create initial 150 messagesInitialMessages = 150,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Initial message ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, InitialMessages)),timer:sleep(100),%% Phase 2: Bob connectspeer:join(Bob, C),peer:set_nick(Bob, "Bob"),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),timer:sleep(2000),%% Check initial unfetched count{ok, UnfetchedInitial} = peer:unfetched_count(Bob, C),?debugFmt("~nBob initially has ~p unfetched messages~n", [UnfetchedInitial]),?assert(UnfetchedInitial >= 40),%% Phase 3: Alice creates MORE messages while connectedNewMessages = 20,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("New message ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NewMessages)),timer:sleep(1000),%% Phase 4: Check if unfetched count is correct%% This is the KEY test - does the unfetched count reflect reality?{ok, UnfetchedAfterNew} = peer:unfetched_count(Bob, C),?debugFmt("~nAfter new messages, Bob has ~p unfetched~n", [UnfetchedAfterNew]),%% Count how many messages Bob actually has{ok, {BobTexts, _}} = peer:read(Bob, C),ActualReceived = length(BobTexts),ExpectedUnfetched = (InitialMessages + NewMessages) - ActualReceived,?debugFmt("~nBob has ~p messages, should have ~p unfetched~n",[ActualReceived, ExpectedUnfetched]),%% The unfetched count should reflect the actual gap%% This test will likely FAIL with current implementation?assertEqual(ExpectedUnfetched, UnfetchedAfterNew),ok = peer:stop(Alice),ok = peer:stop(Bob).disconnect_reconnect_test() ->%% Test scenario:%% 1. Alice creates 100 messages, Bob syncs%% 2. Bob disconnects%% 3. Alice creates 50 more messages%% 4. Bob reconnects%% 5. Verify unfetched count shows the 50 new messagesC = "reconnect-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),peer:join(Alice, C),peer:set_nick(Alice, "Alice"),peer:join(Bob, C),peer:set_nick(Bob, "Bob"),%% Phase 1: Initial messages and syncInitialMessages = 100,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, InitialMessages)),timer:sleep(100),
{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),timer:sleep(2000),
%% ============================================================================%% Proposed peer API additions (to eliminate timer:sleep):%%%% peer:dial(Pid, Addr, #{await_sync => true, timeout => Ms})%% - Blocks until initial sync completes%%%% peer:fetch_history(Pid, Chan, N, #{sync => true, timeout => Ms})%% - Blocks until fetch completes, returns actual count fetched%%%% peer:await_writes(Pid, Chan, Timeout)%% - Blocks until write queue drains%%%% peer:subscribe(Pid, Chan, Events) / peer:unsubscribe(Pid, Ref)%% - For async scenarios: {peer_event, Pid, Chan, Event}%% ============================================================================
%% Verify Bob got all initial messages (since < 100, all eager){ok, UnfetchedInitial} = peer:unfetched_count(Bob, C),?debugFmt("~nInitial unfetched count: ~p~n", [UnfetchedInitial]),?assertEqual(0, UnfetchedInitial),
-define(SYNC_TIMEOUT, 5000).-define(FETCH_TIMEOUT, 3000).
%% Phase 2: Disconnect Bobok = peer:stop(Bob),%% Wait for database to fully close and unregistertimer:sleep(1000),
%% ============================================================================%% Setup / Teardown%% ============================================================================
%% Phase 3: Alice creates more messages while Bob is offlineNewMessages = 50,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Offline message ~p", [N])),ok = peer:write(Alice, C, Msg)
setup_peers(Names) ->lists:map(fun(Name) ->Args = make_test_args(Name),{ok, Sup} = peer_sup:start_link(Args),Pid = peer_sup:get_peer_pid(Sup),peer:set_nick(Pid, Name),{Sup, Pid}
lists:seq(1, NewMessages)),timer:sleep(100),%% Phase 4: Bob reconnects (reusing same storage to test persistence){ok, BobReconnect} = peer:start_link(ArgsBob),peer:join(BobReconnect, C),ok = peer:dial(BobReconnect, AliceAddr),timer:sleep(1000),%% Phase 5: Check unfetched count{ok, UnfetchedAfterReconnect} = peer:unfetched_count(BobReconnect, C),?debugFmt("~nAfter reconnect, unfetched count: ~p~n", [UnfetchedAfterReconnect]),%% Bob should have 0 unfetched since total is 150 (100 eager, 50 from second batch)%% But the count might show them as unfetched if not yet processed{ok, {BobTextsAfterReconnect, _}} = peer:read(BobReconnect, C),TotalReceived = length(BobTextsAfterReconnect),ExpectedUnfetched = (InitialMessages + NewMessages) - TotalReceived,?debugFmt("~nBob has ~p messages, should have ~p unfetched~n",[TotalReceived, ExpectedUnfetched]),?assertEqual(ExpectedUnfetched, UnfetchedAfterReconnect),ok = peer:stop(Alice),ok = peer:stop(BobReconnect).
Names).
multiple_peers_test() ->%% Test scenario:%% 1. Alice creates 200 messages%% 2. Bob and Carol both connect%% 3. Verify both have correct unfetched counts%% 4. Bob fetches 50 messages%% 5. Verify Bob's count updates but Carol's doesn't change
teardown_peers(Peers) ->lists:foreach(fun({Sup, _}) -> peer_sup:stop(Sup) end, Peers).
C = "multi-peer-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),ArgsCarol = make_test_args("Carol"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),{ok, Carol} = peer:start_link(ArgsCarol),peer:join(Alice, C),peer:set_nick(Alice, "Alice"),
with_peers(Names, Fun) ->Peers = setup_peers(Names),tryFun(Peers)afterteardown_peers(Peers)end.
lists:seq(1, NumMessages)),%% Both Bob and Carol connectok = peer:join(Bob, C),ok = peer:set_nick(Bob, "Bob"),ok = peer:join(Carol, C),ok = peer:set_nick(Carol, "Carol"),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),ok = peer:dial(Carol, AliceAddr),timer:sleep(2500),%% Both should have similar unfetched counts{ok, BobUnfetched} = peer:unfetched_count(Bob, C),{ok, CarolUnfetched} = peer:unfetched_count(Carol, C),?debugFmt("~nBob unfetched: ~p, Carol unfetched: ~p~n",[BobUnfetched, CarolUnfetched]
lists:seq(1, N)
%% Both should have ~100 unfetched (200 total - 100 eager)?assert(BobUnfetched >= 90),?assert(CarolUnfetched >= 90),%% Bob fetches some history{ok, 50} = peer:fetch_history(Bob, C, 50),timer:sleep(1000),%% Bob's unfetched should decrease{ok, BobUnfetchedAfter} = peer:unfetched_count(Bob, C),?debugFmt("~nBob unfetched after fetch: ~p~n", [BobUnfetchedAfter]),?assertEqual(BobUnfetched - 50, BobUnfetchedAfter),
ok = peer:await_writes(Peer, Chan, ?SYNC_TIMEOUT).
%% Carol's should remain the same{ok, CarolUnfetchedAfter} = peer:unfetched_count(Carol, C),?debugFmt("~nCarol unfetched after Bob's fetch: ~p~n", [CarolUnfetchedAfter]),?assertEqual(CarolUnfetched, CarolUnfetchedAfter),
connect_peers(From, To, Chan) ->peer:join(From, Chan),{ok, Addr} = peer:node_addr(To),{ok, synced} = peer:dial(From, Addr, #{await_sync => true, timeout => ?SYNC_TIMEOUT}).
incremental_fetch_test_() ->%% Test fetching in multiple small batches{timeout, 30, fun incremental_fetch_test_impl/0}.
basic_lazy_fetch_test() ->%% Verifies: eager fetch (100), unfetched tracking, fetch_history workswith_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->C = "backlog-test",NumMessages = 150,
incremental_fetch_test_impl() ->C = "incremental-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),
populate_channel(Alice, C, NumMessages),?assertEqual(NumMessages, count_messages(Alice, C)),
%% Create 250 messagesNumMessages = 250,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NumMessages)),timer:sleep(100),
%% Verify lazy loading state{ok, Unfetched} = peer:unfetched_count(Bob, C),Fetched = count_messages(Bob, C),?assertEqual(NumMessages, Fetched + Unfetched),%% At least 50 lazy, allow wiggle room?assert(Unfetched >= 40),
peer:join(Bob, C),peer:set_nick(Bob, "Bob"),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),%% Wait for initial sync to complete (100 eager + remaining as unfetched)timer:sleep(2000),
%% Fetch remaining via history{ok, Unfetched} = peer:fetch_history(Bob, C, Unfetched, #{sync => true, timeout => ?FETCH_TIMEOUT}),
FetchUntilEmpty = fun FetchLoop(Iteration) ->case Iteration > MaxIterations oftrue ->?debugFmt("~nMax iterations reached~n", []),error;false ->{ok, Unfetched} = peer:unfetched_count(Bob, C),{ok, {CurrentTexts, _}} = peer:read(Bob, C),CurrentCount = length(CurrentTexts),?debugFmt("~nIteration ~p: ~p messages received, ~p unfetched~n",[Iteration, CurrentCount, Unfetched]),
live_messages_during_sync_test() ->%% Verifies: new messages arrive immediately (not lazy) after syncwith_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->C = "live-test",
case CurrentCount >= NumMessages oftrue ->?debugFmt("~nAll ~p messages received after ~p iterations~n",[NumMessages, Iteration]),ok;false when Unfetched > 0 ->{ok, Requested} = peer:fetch_history(Bob, C, FetchCount),?debugFmt("~nRequested ~p messages~n", [Requested]),timer:sleep(800),FetchLoop(Iteration + 1);false ->%% No unfetched but not all received - messages in flight?debugFmt("~nWaiting for in-flight messages~n", []),timer:sleep(800),FetchLoop(Iteration + 1)endendend,
populate_channel(Alice, C, 150),connect_peers(Bob, Alice, C),
%% Verify all messages received{ok, {BobTexts, _}} = peer:read(Bob, C),?assertEqual(NumMessages, length(BobTexts)),
%% Alice posts 20 more while connected - these should arrive eagerlylists:foreach(fun(I) ->ok = peer:write(Alice, C, io_lib:format("Live ~p", [I]))end,lists:seq(1, 20)),ok = peer:await_writes(Alice, C, ?SYNC_TIMEOUT),
ok = peer:stop(Alice),ok = peer:stop(Bob).
%% Unfetched should NOT increase (live msgs are eager){ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),?assertEqual(UnfetchedBefore, UnfetchedAfter),?assertEqual(170, count_messages(Bob, C) + UnfetchedAfter)end).
C = "empty-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),
%% Populate all channelslists:foreach(fun(C) -> populate_channel(Alice, C, MsgsPerChan) end, Channels),
{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),timer:sleep(1000),%% No messages, unfetched should be 0{ok, Unfetched} = peer:unfetched_count(Bob, C),?assertEqual(0, Unfetched),%% Try to fetch anyway{ok, Requested} = peer:fetch_history(Bob, C, 10),?assertEqual(0, Requested),ok = peer:stop(Alice),ok = peer:stop(Bob).fetch_more_than_available_test() ->%% Test requesting more messages than availableC = "overfetch-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),peer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Create 120 messages (100 eager + 20 unfetched)NumMessages = 120,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NumMessages)),timer:sleep(100),peer:join(Bob, C),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),timer:sleep(2000),{ok, Unfetched} = peer:unfetched_count(Bob, C),?debugFmt("~nUnfetched: ~p~n", [Unfetched]),%% Request 100 but only ~20 available{ok, Requested} = peer:fetch_history(Bob, C, 100),?debugFmt("~nRequested ~p (asked for 100)~n", [Requested]),?assertEqual(Unfetched, Requested),timer:sleep(1000),{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),?assertEqual(0, UnfetchedAfter),ok = peer:stop(Alice),ok = peer:stop(Bob).pagination_large_backlog_test_() ->%% Test automatic pagination with >1000 messages%% This ensures we can handle arbitrarily large channels{timeout, 120, fun pagination_large_backlog_test_impl/0}.pagination_large_backlog_test_impl() ->C = "pagination-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),peer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Create 1500 messages to test pagination%% With limit of 1000, this should trigger 1 pagination requestNumMessages = 1500,?debugFmt("~nCreating ~p messages for pagination test~n", [NumMessages]),lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NumMessages)),timer:sleep(200),{ok, {AliceTexts, _}} = peer:read(Alice, C),?assertEqual(NumMessages, length(AliceTexts)),?debugFmt("~nAlice has all ~p messages~n", [NumMessages]),peer:join(Bob, C),peer:set_nick(Bob, "Bob"),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),%% Wait for initial sync + pagination to complete%% First request: 1000 messages (100 eager + 900 unfetched)%% Pagination should trigger automatically%% Second request: 500 messages (100 eager + 400 unfetched)?debugFmt("~nWaiting for initial sync and pagination...~n", []),timer:sleep(5000),%% Bob should have discovered all messages (100 eager from first batch + 100 from second)%% Plus all unfetched (900 + 400){ok, {BobTexts, _}} = peer:read(Bob, C),{ok, BobUnfetched} = peer:unfetched_count(Bob, C),BobTotal = length(BobTexts) + BobUnfetched,?debugFmt("~nBob has ~p messages fetched, ~p unfetched, ~p total~n",[length(BobTexts), BobUnfetched, BobTotal]),%% Verify Bob discovered all messages through pagination?assertEqual(NumMessages, BobTotal),%% Now fetch the rest incrementally to verify unfetched tracking worksMaxIterations = 20,FetchUntilEmpty = fun FetchLoop(Iteration) ->case Iteration > MaxIterations oftrue ->?debugFmt("~nMax iterations reached~n", []),error;false ->{ok, Unfetched} = peer:unfetched_count(Bob, C),{ok, {CurrentTexts, _}} = peer:read(Bob, C),CurrentCount = length(CurrentTexts),?debugFmt("~nIteration ~p: ~p messages received, ~p unfetched~n",[Iteration, CurrentCount, Unfetched]),case CurrentCount >= NumMessages oftrue ->?debugFmt("~nAll ~p messages received after ~p iterations~n", [NumMessages, Iteration]),ok;false when Unfetched > 0 ->{ok, Requested} = peer:fetch_history(Bob, C, 100),?debugFmt("~nRequested ~p messages~n", [Requested]),timer:sleep(800),FetchLoop(Iteration + 1);false ->%% No unfetched but not all received - messages in flight?debugFmt("~nWaiting for in-flight messages~n", []),timer:sleep(800),FetchLoop(Iteration + 1)endendend,ok = FetchUntilEmpty(1),%% Verify final state{ok, {FinalTexts, _}} = peer:read(Bob, C),{ok, FinalUnfetched} = peer:unfetched_count(Bob, C),?debugFmt("~nFinal: ~p messages, ~p unfetched~n", [length(FinalTexts), FinalUnfetched]),?assertEqual(NumMessages, length(FinalTexts)),?assertEqual(0, FinalUnfetched),ok = peer:stop(Alice),ok = peer:stop(Bob).messages_during_pagination_test_() ->%% Test scenario: New messages arrive while fetching history%% This is very common - user scrolls up to load history while chat is active{timeout, 60, fun messages_during_pagination_test_impl/0}.messages_during_pagination_test_impl() ->C = "concurrent-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),peer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Phase 1: Create 200 messagesInitialMessages = 200,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Initial ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, InitialMessages)),timer:sleep(100),%% Phase 2: Bob connectspeer:join(Bob, C),peer:set_nick(Bob, "Bob"),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),timer:sleep(2000),%% Bob should have ~100 unfetched{ok, UnfetchedInitial} = peer:unfetched_count(Bob, C),?debugFmt("~nBob initially has ~p unfetched~n", [UnfetchedInitial]),?assert(UnfetchedInitial >= 90),%% Phase 3: Bob starts fetching history{ok, Requested1} = peer:fetch_history(Bob, C, 50),?debugFmt("~nBob requested ~p messages~n", [Requested1]),%% Phase 4: WHILE pagination is in-flight, Alice sends 10 NEW messagesNewMessages = 10,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("New message ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NewMessages)),%% Wait for both old and new messages to arrivetimer:sleep(2000),%% Phase 5: Verify counts are correct{ok, {BobTexts, _}} = peer:read(Bob, C),ActualReceived = length(BobTexts),TotalExpected = InitialMessages + NewMessages,?debugFmt("~nBob has ~p messages (expected ~p total)~n",[ActualReceived, TotalExpected]),{ok, UnfetchedFinal} = peer:unfetched_count(Bob, C),ExpectedUnfetched = TotalExpected - ActualReceived,?debugFmt("~nBob has ~p unfetched (expected ~p)~n",[UnfetchedFinal, ExpectedUnfetched]),%% Key assertion: unfetched count should be accurate despite concurrent activity?assertEqual(ExpectedUnfetched, UnfetchedFinal),ok = peer:stop(Alice),ok = peer:stop(Bob).rapid_fetch_history_test() ->{timeout, 120, fun rapid_fetch_history_test_impl/0}.rapid_fetch_history_test_impl() ->%% Test scenario: User spam-clicks "load more" button%% Multiple fetch_history calls before previous ones completeC = "rapid-fetch-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),peer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Create 300 messagesNumMessages = 300,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NumMessages)),timer:sleep(100),peer:join(Bob, C),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),timer:sleep(2000),{ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),?debugFmt("~nBob has ~p unfetched before rapid fetches~n", [UnfetchedBefore]),%% Make 3 rapid fetch_history calls WITHOUT waiting{ok, Req1} = peer:fetch_history(Bob, C, 30),{ok, Req2} = peer:fetch_history(Bob, C, 30),{ok, Req3} = peer:fetch_history(Bob, C, 30),?debugFmt("~nRapid requests: ~p, ~p, ~p~n", [Req1, Req2, Req3]),%% All should succeed (return a count)?assert(is_integer(Req1)),?assert(is_integer(Req2)),?assert(is_integer(Req3)),%% Wait for all to completetimer:sleep(3000),%% Verify final state is consistent{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),{ok, {BobTexts, _}} = peer:read(Bob, C),ActualReceived = length(BobTexts),?debugFmt("~nAfter rapid fetches: ~p received, ~p unfetched~n",[ActualReceived, UnfetchedAfter]),%% Should have received approximately 90 more messages (3 x 30)%% But might be less if requests overlapped%% 100 initial + at least some from fetches?assert(ActualReceived >= 130),?assertEqual(NumMessages - ActualReceived, UnfetchedAfter),ok = peer:stop(Alice),ok = peer:stop(Bob).multiple_channels_large_backlog_test_() ->%% Test scenario: User joins multiple channels with large backlogs%% Common when joining an established community{timeout, 90, fun multiple_channels_large_backlog_test_impl/0}.multiple_channels_large_backlog_test_impl() ->ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),peer:set_nick(Alice, "Alice"),%% Create 3 channels with 300 messages eachChannels = ["chan-A", "chan-B", "chan-C"],MessagesPerChannel = 300,lists:foreach(fun(Chan) ->peer:join(Alice, Chan),
%% Verify each channel
end,Channels),timer:sleep(200),%% Bob joins all channels and connectspeer:set_nick(Bob, "Bob"),lists:foreach(fun(Chan) -> peer:join(Bob, Chan) end, Channels),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),%% Wait for initial sync of all channelstimer:sleep(5000),%% Verify each channel has correct unfetched countVerifyChannel = fun(Chan) ->{ok, {BobTexts, _}} = peer:read(Bob, Chan),{ok, Unfetched} = peer:unfetched_count(Bob, Chan),Total = length(BobTexts) + Unfetched,?debugFmt("~nChannel ~s: ~p received, ~p unfetched, ~p total~n",[Chan, length(BobTexts), Unfetched, Total]),
end)end}.
%% Each channel should have discovered all messages?assertEqual(MessagesPerChannel, Total)end,
disconnect_during_fetch_test_skip() ->%% Verifies: graceful handling when peer dies mid-fetchC = "disconnect-test",[{AliceSup, Alice}, {BobSup, Bob}] = setup_peers(["Alice", "Bob"]),
ok = peer:stop(Alice),ok = peer:stop(Bob).peer_disconnects_during_pagination_test() ->%% Test scenario: Peer goes offline while Bob is fetching history%% Network issues are realityC = "disconnect-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),
%% Start async fetch, then kill Alice{ok, Ref} = peer:fetch_history(Bob, C, 50, #{sync => false}),ok = peer_sup:stop(AliceSup),
peer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Create 200 messagesNumMessages = 200,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message ~p", [N])),ok = peer:write(Alice, C, Msg)
%% Bob should handle this gracefully (timeout or partial)receive{fetch_complete, Ref, _} -> ok;{fetch_failed, Ref, _Reason} -> okafter 5000 -> ok
peer:join(Bob, C),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),timer:sleep(2000),
%% Bob's state should be consistent{ok, Unfetched} = peer:unfetched_count(Bob, C),Fetched = count_messages(Bob, C),?assertEqual(200, Fetched + Unfetched)afterpeer_sup:stop(BobSup)end.
{ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),?debugFmt("~nBob has ~p unfetched before fetch~n", [UnfetchedBefore]),
rapid_fetch_requests_test_() ->%% Verifies: concurrent fetches don't corrupt state{timeout, 120, fun() ->with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->C = "rapid-test",NumMessages = 300,
%% Bob should still be alive and have consistent state{ok, {BobTexts, _}} = peer:read(Bob, C),{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),?debugFmt("~nAfter disconnect: ~p received, ~p unfetched~n",[length(BobTexts), UnfetchedAfter]),%% Unfetched count should be consistent (might not have decreased much)%% The key is Bob shouldn't crash or have corrupted state?assert(is_integer(UnfetchedAfter)),?assertEqual(NumMessages - length(BobTexts), UnfetchedAfter),
%% State must be consistent{ok, Unfetched} = peer:unfetched_count(Bob, C),Fetched = count_messages(Bob, C),?assertEqual(NumMessages, Fetched + Unfetched),%% 100 eager + at least some from fetches?assert(Fetched >= 130)end)end}.
%% Test scenario: Channel needs more than 10 pagination rounds%% Verify the depth limit works correctly{timeout, 180, fun deep_pagination_test_impl/0}.deep_pagination_test_impl() ->C = "deep-pagination-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),peer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Create 12000 messages - would need 12 pagination rounds at 1000/request%% This exceeds the max depth of 10NumMessages = 12000,?debugFmt("~nCreating ~p messages for deep pagination test~n", [NumMessages]),
%% Verifies: depth limit stops runaway pagination{timeout, 120, fun() ->with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->C = "deep-test",NumMessages = 12000,
fun(N) ->Msg = lists:flatten(io_lib:format("Msg ~p", [N])),ok = peer:write(Alice, C, Msg)
fun(Batch) ->Start = (Batch - 1) * 500 + 1,lists:foreach(fun(N) ->ok = peer:write(Alice, C, io_lib:format("M~p", [N]))end,lists:seq(Start, Start + 499))
?debugFmt("~nCreated batch ~p/~p~n", [Batch, NumBatches])end,lists:seq(1, NumBatches)),timer:sleep(500),%% Verify Alice has all messages{ok, {AliceTexts, _}} = peer:read(Alice, C),?assertEqual(NumMessages, length(AliceTexts)),?debugFmt("~nAlice has all ~p messages~n", [NumMessages]),%% Bob connectspeer:join(Bob, C),peer:set_nick(Bob, "Bob"),{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),%% Wait for automatic pagination to complete (or stop at depth limit)?debugFmt("~nWaiting for automatic pagination...~n", []),timer:sleep(15000),
ok = peer:await_writes(Alice, C, 10000),
%% Check what Bob discovered{ok, {BobTexts, _}} = peer:read(Bob, C),{ok, BobUnfetched} = peer:unfetched_count(Bob, C),BobTotal = length(BobTexts) + BobUnfetched,?debugFmt("~nBob discovered ~p total messages (~p fetched, ~p unfetched)~n",[BobTotal, length(BobTexts), BobUnfetched]),
connect_peers(Bob, Alice, C),
%% With depth limit of 10, Bob should discover at most 10 * 1000 = 10000 messages%% (Actually 10 requests: 1st + 9 pagination rounds due to depth starting at 1)%% But implementation might differ, so let's check it stopped gracefully?assert(BobTotal =< 10000),?debugFmt("~nDeep pagination stopped at ~p messages (depth limit working)~n",[BobTotal]),
%% Depth limit should cap discovery at ~10k{ok, Unfetched} = peer:unfetched_count(Bob, C),Total = count_messages(Bob, C) + Unfetched,?assert(Total =< 10000)end)end}.
# Database moduleThe SQLite module functions as a `gen_server` which holds the SQLite handle.It's main jobs are:1. have an API to allow building of clients2. be able to handle sync requests## general info| `post_type` numeric id | common name | description ||------------------------|--------------|-------------|| 0 | `post/text` | a textual chat message, posted to a channel || 1 | `post/delete`| the deletion of a previously published post || 2 | `post/info` | set or clear informative key/value pairs on a user || 3 | `post/topic` | set or clear a channel's topic string || 4 | `post/join` | announce membership to a channel || 5 | `post/leave` | announce cessation of membership to a channel |## channel state req- most recent state messages for a channel- all posts except `post/text`### Ideas/concerns- re-encode posts when answering post requests- let's assume that's fine for now- otherwise have a table where we store posts as-is- repeat public key all the time in `posts`?- could have a `users` table that just has `id:public_key:name`## common queries### Find leaf elements in a tree (unreferenced posts)```sql```### simple ploymorphic check example```sqlcreate table poly(-- all types share theseid integer primary key,type integer not null check (type > 0 AND type < 4),-- depends on typefoo text,foo_count integer,bar integer,baz blob,-- either it's a different type OR it has to fullfill these thingsconstraint is_foo check (type != 1 OR (foo is not null AND foo_count is not null)),constraint is_bar check (),constraint is_foo check ())```type != 3 OR baz is not nulltype != 2 OR bar is not nullselect source, count(*) from links group by parent where source in (select id from posts where channel = ?);-- TODO look at this SO if zero ranges are not listed:-- https://stackoverflow.com/a/22888342
% SPDX-FileCopyrightText: 2023 Henry Bubert%% SPDX-License-Identifier: LGPL-2.1-or-later-module(caberl_sup).-behaviour(supervisor).-export([start_link/1]).-export([init/1]).%% Start the supervisor with peer configurationstart_link(PeerArgs) ->supervisor:start_link(?MODULE, PeerArgs).%% Supervisor callback%%%% Note: Currently uses one_for_one strategy since peer manages db and transport directly.%% For better supervision, a future refactoring should:%% 1. Use rest_for_one strategy%% 2. Start database as first child%% 3. Start peer as second child (with db pid passed in)%% 4. Have transport supervised by peer's internal supervisor%%%% This would ensure:%% - Database I/O errors bring down the whole peer (correct behavior)%% - Transport errors only restart the transport layer (more resilient)%% - Accept loop errors don't crash the transport gen_server (fixed in transport.erl)init(PeerArgs) ->SupFlags = #{strategy => one_for_one,},ChildSpecs = [#{id => peer,start => {peer, start_link, [PeerArgs]},restart => permanent,shutdown => 5000,type => worker,modules => [peer]}],{ok, {SupFlags, ChildSpecs}}.% Max 3 restartsintensity => 3,% Within 10 secondsperiod => 10
% SPDX-FileCopyrightText: 2023 Henry Bubert%% SPDX-License-Identifier: LGPL-2.1-or-later-module(caberl).-export([main/1, start/0, start/1]).%% Cable Peer CLI%%%% A headless peer that can join channels and connect to other peers.%% Uses the default Cable PSK (0808...08)%% Start with default options (for rebar3 shell)start() ->start([]).%% Start with options as a mapstart(Opts) when is_map(Opts) ->run_peer(Opts);start(Opts) when is_list(Opts) ->% Convert proplist to mapConfig = maps:from_list(Opts),run_peer(Config).main(Args) ->% Parse command line argumentscase parse_args(Args) of{ok, Config} ->case maps:get(help, Config, false) oftrue ->print_help(),halt(0);false ->run_peer(Config)end;{error, Reason} ->io:format("Error: ~s~n~n", [Reason]),print_help(),halt(1)end.run_peer(Config) ->io:format("~n=== Starting Cable Peer ===~n~n"),% Extract configurationStorage = maps:get(storage, Config, default_storage()),Port = maps:get(port, Config, 3113),Channels = maps:get(channels, Config, []),Peers = maps:get(peers, Config, []),Nick = maps:get(nick, Config, undefined),io:format("Configuration:~n"),io:format(" Storage: ~s~n", [Storage]),io:format(" Port: ~p~n", [Port]),io:format(" Channels: ~p~n", [Channels]),io:format(" Peers: ~p~n", [Peers]),case Nick ofundefined -> ok;_ -> io:format(" Nickname: ~s~n", [Nick])end,io:format("~n"),% Start the peerio:format("Starting peer...~n"),ListenAddr = ["0.0.0.0", Port],{ok, PeerPid} = peer:start_link([{listener, ListenAddr},{storage, Storage}]),% Get peer info{ok, Addr} = peer:node_addr(PeerPid),{ok, PubKey} = peer:node_public_key(PeerPid),io:format("Peer started!~n"),io:format(" Address: ~p~n", [Addr]),io:format(" Public Key: ~s~n", [hex:bin_to_hexstr(PubKey)]),io:format("~n"),% Set nickname if providedcase Nick ofundefined ->ok;_ ->io:format("Setting nickname to '~s'...~n", [Nick]),ok = peer:set_nick(PeerPid, Nick)end,% Join channelslists:foreach(fun(Chan) ->io:format("Joining channel: ~s~n", [Chan]),peer:join(PeerPid, Chan),end,Channels),% Wait a bit for channels to be joinedcase length(Channels) of_ ->timer:sleep(500),io:format("~n")end,% Connect to peerslists:foreach(fun(PeerAddr) ->io:format("Connecting to peer: ~s~n", [PeerAddr]),peer:dial(PeerPid, PeerAddr),end,Peers),case length(Peers) of_ ->timer:sleep(500),io:format("~n")end,io:format("~n=== Peer is running ===~n"),io:format("Press Ctrl+C to stop~n~n"),io:format("Note: Using default Cable PSK (cabal key)~n"),io:format(" 0808080808080808080808080808080808080808080808080808080808080808~n~n"),% Keep running until interruptedreceivestop -> okend.parse_args(Args) ->parse_args(Args, #{}).parse_args([], Acc) ->{ok, Acc};parse_args(["--help" | _], _Acc) ->{ok, #{help => true}};parse_args(["-h" | _], _Acc) ->{ok, #{help => true}};parse_args(["--storage", Path | Rest], Acc) ->parse_args(Rest, Acc#{storage => Path});parse_args(["--port", PortStr | Rest], Acc) ->case string:to_integer(PortStr) of{Port, ""} when Port > 0, Port =< 65535 ->parse_args(Rest, Acc#{port => Port});_ ->{error, io_lib:format("Invalid port: ~s", [PortStr])}end;parse_args(["--channels", ChansStr | Rest], Acc) ->Channels = string:split(ChansStr, ",", all),parse_args(Rest, Acc#{channels => Channels});parse_args(["--peers", PeersStr | Rest], Acc) ->Peers = string:split(PeersStr, ",", all),parse_args(Rest, Acc#{peers => Peers});parse_args(["--nick", Nick | Rest], Acc) ->parse_args(Rest, Acc#{nick => Nick});parse_args([Unknown | _], _Acc) ->{error, io_lib:format("Unknown option: ~s", [Unknown])}.default_storage() ->Home = os:getenv("HOME"),filename:join(Home, ".caberl").print_help() ->io:format("~nCable Peer~n"),io:format("==========~n~n"),io:format("Usage: caberl [OPTIONS]~n~n"),io:format("Options:~n"),io:format(" --storage PATH Storage directory for database and keys~n"),io:format(" (default: ~~/.caberl)~n"),io:format(" --port PORT Port to listen on (default: 3113)~n"),io:format(" --channels CHAN1,CHAN2 Comma-separated list of channels to join~n"),io:format(" --peers ADDR1,ADDR2 Comma-separated list of peers (host:port)~n"),io:format(" --nick NICKNAME Set nickname on startup~n"),io:format(" -h, --help Show this help message~n"),io:format("~n"),io:format("Examples:~n"),io:format(" # Start peer on default port, join 'general' channel~n"),io:format(" caberl --channels general --nick 'Bot'~n~n"),io:format(" # Join multiple channels and connect to a peer~n"),io:format(" caberl --channels general,dev --peers localhost:3113~n~n"),io:format(" # Use custom storage and port~n"),io:format(" caberl --storage /tmp/caberl --port 9999 --channels test~n~n"),io:format("Notes:~n"),io:format(" - Uses the default Cable PSK (cabal key)~n"),io:format(" - Keys are generated automatically if they don't exist~n"),io:format(" - Press Ctrl+C to stop the peer~n~n").0 ->ok;% Small delay between connectionstimer:sleep(100)0 ->ok;% Small delay between joinstimer:sleep(100)
-module(transport_sup).-behaviour(supervisor).-export([start_link/1]).-export([init/1]).%% Start the transport supervisor with configuration%% This uses a two-phase approach:%% 1. Start supervisor with transport child%% 2. Dynamically add accept_worker once we have transport PIDstart_link(Args) ->case supervisor:start_link(?MODULE, {phase1, Args}) of{ok, SupPid} ->%% Get the transport PID from the supervisor's childrenChildren = supervisor:which_children(SupPid),Transport = lists:keyfind(transport, 1, Children),{transport, TransportPid, worker, [transport]} = Transport,%% Now start accept_worker with the transport PIDAcceptWorkerSpec = #{id => accept_worker,start => {accept_worker,start_link,[[{transport_pid, TransportPid}]]},restart => permanent,shutdown => 5000,type => worker,modules => [accept_worker]},{ok, _AcceptWorkerPid} = supervisor:start_child(SupPid, AcceptWorkerSpec),{ok, SupPid};Error ->Errorend.%% Supervisor callbackinit({phase1, Args}) ->%% Phase 1: Start only transport%% Accept worker will be added dynamically once we have transport PID%% Extract configurationListenAddr = proplists:get_value(listen_addr, Args),KeyPair = proplists:get_value(key_pair, Args),EventHandler = proplists:get_value(event_handler, Args),%% rest_for_one strategy:%% - Transport starts first%% - Accept worker starts second (depends on transport for listener info)%% - If transport crashes, accept_worker restarts too (correct: listener is gone)%% - If accept_worker crashes, transport keeps running (correct: just restart accept loop)SupFlags = #{strategy => rest_for_one,intensity => 5,period => 10},%% Start transport firstTransportSpec = #{id => transport,start =>{transport, start_link, [[{listen_addr, ListenAddr},{key_pair, KeyPair},{event_handler, EventHandler}]]},restart => permanent,shutdown => 5000,type => worker,modules => [transport]},{ok, {SupFlags, [TransportSpec]}}.
% Start accepting connections in a separate process (not linked to avoid crashing transport)AcceptPid = spawn(fun() -> accept_loop(ListenerPid, KeyPair, TransportPid) end),io:format("[Transport] Accept loop started (pid: ~p)~n", [AcceptPid]),
% Note: accept_worker will be started by transport_sup as a supervised child
%% Accept loop for incoming connectionsaccept_loop(ListenerPid, KeyPair, TransportPid) ->% Monitor the transport process - exit if it dieserlang:monitor(process, TransportPid),accept_loop_internal(ListenerPid, KeyPair, TransportPid).accept_loop_internal(ListenerPid, KeyPair, TransportPid) ->% Check if transport is still alivereceive{'DOWN', _Ref, process, TransportPid, DownReason} ->io:format("[Transport] Accept loop exiting - transport process died: ~p~n", [DownReason]),exit(normal)after 0 ->okend,
io:format("[Transport] Waiting for encrypted connections (listener: ~p, transport: ~p)~n",[ListenerPid, TransportPid]),Opts = [{keypair, KeyPair}],case enoise_cable:accept(ListenerPid, Opts) of{ok, ConnPid} ->io:format("[Transport] Accepted new encrypted connection: ~p~n", [ConnPid]),% Wrap the connection setup in a try/catch to prevent crashes from killing the accept loopResult =try% Set the controlling process to the transport gen_server% so it receives {cable_transport, ConnPid, Data} messagesok = enoise_cable:controlling_process(ConnPid, TransportPid),% Get the peer address from the connection{ok, {PeerIP, PeerPort}} = enoise_cable:peername(ConnPid),PeerAddr = {PeerIP, PeerPort},io:format("[Transport] Peer address: ~p~n", [PeerAddr]),TransportPid ! {new_connection, ConnPid, PeerAddr},okcatchError:ErrorReason:Stacktrace ->io:format("[Transport] Error setting up connection ~p: ~p:~p~n Stacktrace: ~p~n",[ConnPid, Error, ErrorReason, Stacktrace]),% Close the connection on errorcatch enoise_cable:close(ConnPid),errorend,case Result ofok -> ok;error -> io:format("[Transport] Connection setup failed, continuing to accept~n")end,accept_loop_internal(ListenerPid, KeyPair, TransportPid);{error, AcceptReason} ->io:format("[Transport] Accept error: ~p~n", [AcceptReason]),timer:sleep(1000),accept_loop_internal(ListenerPid, KeyPair, TransportPid)end.
-module(peer_sup).-behaviour(supervisor).-export([start_link/1, stop/1, get_peer_pid/1]).-export([init/1]).%% Start the peer supervisor%% Args:%% - {listener, [Host, Port]} - Address to listen on%% - {storage, Path} - Storage directory for database and keysstart_link(Args) ->case supervisor:start_link(?MODULE, {phase1, Args}) of{ok, SupPid} ->%% Phase 2: Get TransportPid and start peer_serverTransportSupPid = get_child_pid(SupPid, transport_sup),TransportPid = get_child_pid(TransportSupPid, transport),StoragePath = proplists:get_value(storage, Args),%% Start peer_server with transport_pidPeerServerSpec = #{id => peer_server,start =>{peer, start_link, [[{transport_pid, TransportPid},{storage, StoragePath}]]},restart => transient,shutdown => 5000,type => worker,modules => [peer]},{ok, PeerServerPid} = supervisor:start_child(SupPid, PeerServerSpec),%% Phase 3: Start peer_events with peer_server info{ok, Db} = peer:get_db(PeerServerPid),PeerEventsSpec = #{id => peer_events,start =>{peer_events, start_link, [[{db, Db},{transport_pid, TransportPid},{peer_server_pid, PeerServerPid}]]},restart => transient,shutdown => 5000,type => worker,modules => [peer_events]},{ok, PeerEventsPid} = supervisor:start_child(SupPid, PeerEventsSpec),%% Connect peer_server to peer_eventsok = peer:update_peer_events_pid(PeerServerPid, PeerEventsPid),{ok, SupPid};Error ->Errorend.%% Stop the peer supervisorstop(SupPid) ->%% Just send shutdown signal - let supervisor handle its childrenMRef = erlang:monitor(process, SupPid),unlink(SupPid),exit(SupPid, shutdown),%% Wait for supervisor to finish shutting downreceive{'DOWN', MRef, process, SupPid, _Reason} ->okend.%% Get the peer server PID from the supervisorget_peer_pid(SupPid) ->get_child_pid(SupPid, peer_server).%% Supervisor callbackinit({phase1, Args}) ->SupFlags = #{strategy => rest_for_one,intensity => 5,period => 10},%% Extract configuration[Host, WantPort] = proplists:get_value(listener, Args, ["0.0.0.0", 0]),StoragePath = proplists:get_value(storage, Args),%% Load transport keypairTransportKp = peer:create_or_load_transport_keypair(StoragePath),%% Start transport_sup first (peer_server added dynamically)TransportSupSpec = #{id => transport_sup,start =>{transport_sup, start_link, [[{listen_addr, {Host, WantPort}},{key_pair, TransportKp},% Set later via transport:register_handler{event_handler, undefined}]]},restart => permanent,shutdown => infinity,type => supervisor,modules => [transport_sup]},{ok, {SupFlags, [TransportSupSpec]}}.%% Internal helpersget_child_pid(SupPid, ChildId) ->Children = supervisor:which_children(SupPid),case lists:keyfind(ChildId, 1, Children) of{ChildId, Pid, _, _} -> Pid;false -> undefinedend.
-module(peer_events).-behaviour(gen_server).-export([start_link/1]).-export([register_event_handler/3,unregister_event_handler/2,notify_channels/2]).-export([init/1,handle_call/3,handle_cast/2,handle_continue/2,handle_info/2,terminate/2,code_change/3]).-record(state, {% Database handle (needed for reconnection logic)db,% Transport PID (needed for dialing)transport_pid,% Peer server PID (to query peer list)peer_server_pid,% Timer reference for periodic reconnection attemptsreconnect_timer = undefined,% Event handlers: #{HandlerPid => #{interval => Ms, timer_ref => Ref, pending => sets:set()}}event_handlers = #{}}).%% APIstart_link(Args) ->gen_server:start_link(?MODULE, Args, []).register_event_handler(Pid, HandlerPid, IntervalMs) ->gen_server:call(Pid, {register_event_handler, HandlerPid, IntervalMs}).unregister_event_handler(Pid, HandlerPid) ->gen_server:call(Pid, {unregister_event_handler, HandlerPid}).notify_channels(Pid, Channels) ->gen_server:cast(Pid, {notify_channels, Channels}).%% gen_server callbacksinit(Args) ->process_flag(trap_exit, true),Db = proplists:get_value(db, Args),TransportPid = proplists:get_value(transport_pid, Args),PeerServerPid = proplists:get_value(peer_server_pid, Args),io:format("[PeerEvents] Starting with db=~p, transport=~p, peer_server=~p~n",[Db, TransportPid, PeerServerPid]),State = #state{db = Db,transport_pid = TransportPid,peer_server_pid = PeerServerPid},%% Start reconnect timer using handle_continue{ok, State, {continue, start_reconnect_timer}}.handle_continue(start_reconnect_timer, State) ->TimerRef = erlang:send_after(10000, self(), reconnect_tick),io:format("[PeerEvents] Started reconnect timer~n"),{noreply, State#state{reconnect_timer = TimerRef}}.handle_call({register_event_handler, HandlerPid, IntervalMs}, _From, State) ->NewState = do_register_event_handler(State, HandlerPid, IntervalMs),{reply, ok, NewState};handle_call({unregister_event_handler, HandlerPid}, _From, State) ->NewState = do_unregister_event_handler(State, HandlerPid),{reply, ok, NewState};handle_call(_Request, _From, State) ->{reply, {error, unknown_call}, State}.handle_cast({notify_channels, Channels}, State) ->NewState = mark_channels_for_notification(State, Channels),{noreply, NewState};handle_cast(_Msg, State) ->{noreply, State}.handle_info(reconnect_tick, State) ->%% Attempt reconnections to persistent peersNewState = attempt_peer_reconnections(State),%% Reschedule timerTimerRef = erlang:send_after(10000, self(), reconnect_tick),{noreply, NewState#state{reconnect_timer = TimerRef}};handle_info({notification_timer, HandlerPid}, State) ->NewState = handle_notification_timer(State, HandlerPid),{noreply, NewState};handle_info(_Info, State) ->{noreply, State}.terminate(_Reason, State) ->%% Cancel reconnect timercase State#state.reconnect_timer ofundefined -> ok;TimerRef -> erlang:cancel_timer(TimerRef)end,%% Cancel all event handler timersmaps:foreach(fun(_Pid, Handler) ->case maps:get(timer_ref, Handler, undefined) ofundefined -> ok;Ref -> erlang:cancel_timer(Ref)endend,State#state.event_handlers),ok.code_change(_OldVsn, State, _Extra) ->{ok, State}.%%%%%%%%%%%%%%% Private %%%%%%%%%%%%%%%do_register_event_handler(State = #state{event_handlers = Handlers}, HandlerPid, IntervalMs) ->case maps:is_key(HandlerPid, Handlers) oftrue ->%% Already registered, update intervalio:format("[PeerEvents] Updating event handler ~p interval to ~pms~n", [HandlerPid, IntervalMs]),OldHandler = maps:get(HandlerPid, Handlers),%% Cancel old timercase maps:get(timer_ref, OldHandler, undefined) ofundefined -> ok;OldTimerRef -> erlang:cancel_timer(OldTimerRef)end,%% Start new timerTimerRef = erlang:send_after(IntervalMs, self(), {notification_timer, HandlerPid}),NewHandler = OldHandler#{interval => IntervalMs, timer_ref => TimerRef},State#state{event_handlers = maps:put(HandlerPid, NewHandler, Handlers)};false ->%% New registrationio:format("[PeerEvents] Registered event handler ~p with interval ~pms~n", [HandlerPid, IntervalMs]),TimerRef = erlang:send_after(IntervalMs, self(), {notification_timer, HandlerPid}),Handler = #{interval => IntervalMs,timer_ref => TimerRef,pending => sets:new()},State#state{event_handlers = maps:put(HandlerPid, Handler, Handlers)}end.do_unregister_event_handler(State = #state{event_handlers = Handlers}, HandlerPid) ->case maps:take(HandlerPid, Handlers) oferror ->io:format("[PeerEvents] Event handler ~p not found~n", [HandlerPid]),State;{Handler, NewHandlers} ->io:format("[PeerEvents] Unregistered event handler ~p~n", [HandlerPid]),%% Cancel timercase maps:get(timer_ref, Handler, undefined) ofundefined -> ok;TimerRef -> erlang:cancel_timer(TimerRef)end,State#state{event_handlers = NewHandlers}end.handle_notification_timer(State = #state{event_handlers = Handlers}, HandlerPid) ->case maps:get(HandlerPid, Handlers, undefined) ofundefined ->%% Handler was unregistered, ignoreState;Handler ->Pending = maps:get(pending, Handler),Interval = maps:get(interval, Handler),%% Send notifications for all pending channelscase sets:size(Pending) of0 ->% Nothing to notifyok;_ ->PendingList = sets:to_list(Pending),lists:foreach(fun(Chan) ->tryHandlerPid ! {channel_event, {new_messages, Chan}}catch_:_ ->io:format("[PeerEvents] Failed to send event to handler ~p~n", [HandlerPid])endend,PendingList),io:format("[PeerEvents] Notified handler ~p of ~p channels~n", [HandlerPid, length(PendingList)])end,%% Clear pending and restart timerTimerRef = erlang:send_after(Interval, self(), {notification_timer, HandlerPid}),NewHandler = Handler#{pending => sets:new(), timer_ref => TimerRef},State#state{event_handlers = maps:put(HandlerPid, NewHandler, Handlers)}end.mark_channels_for_notification(State = #state{event_handlers = Handlers}, Channels) ->case maps:size(Handlers) of0 ->% No handlers, nothing to doState;_ ->%% Add channels to pending set for all handlersNewHandlers = maps:map(fun(_Pid, Handler) ->Pending = maps:get(pending, Handler),NewPending = lists:foldl(fun sets:add_element/2, Pending, Channels),Handler#{pending => NewPending}end,Handlers),State#state{event_handlers = NewHandlers}end.%% Persistent peer reconnectionattempt_peer_reconnections(State = #state{db = Db, transport_pid = TransportPid, peer_server_pid = PeerServerPid}) ->%% Get all persistent peers{ok, PersistentPeers} = db:peer_list(Db),%% Get current connected peers from peer_serverPeers =case gen_server:call(PeerServerPid, {get_peers}, 5000) of{ok, P} -> P;_ -> #{}end,%% Filter and attempt reconnectionslists:foreach(fun(PeerInfo) ->Address = maps:get(address, PeerInfo),%% Only attempt if not already connected and backoff period has passedcase is_peer_connected(Address, Peers) oftrue ->% Already connected, skipok;false ->case should_reconnect_peer(PeerInfo) oftrue ->%% Attempt reconnectionio:format("[PeerEvents] Attempting reconnection to ~s~n", [Address]),try%% Parse address and dial[HostStr, PortStr] = string:split(Address, ":"),{ok, Host} = inet:getaddr(HostStr, inet),Port = list_to_integer(PortStr),transport:dial(TransportPid, Host, Port),%% Update last_attempt and increment attempt_countNow = os:system_time(millisecond),AttemptCount = maps:get(attempt_count, PeerInfo, 0),db:peer_update(Db, Address, [{last_attempt, Now},{attempt_count, AttemptCount + 1}])catch_:Error ->io:format("[PeerEvents] Failed to dial ~s: ~p~n", [Address, Error])end;false ->% Backoff period not elapsed, skipokendendend,PersistentPeers),State.%% Calculate exponential backoff delay in milliseconds%% Base delay is 5 seconds, max is 5 minutescalculate_backoff_delay(AttemptCount) ->% 5 secondsBaseDelayMs = 5000,% 5 minutesMaxDelayMs = 300000,%% Exponential backoff: base * 2^attempts, capped at maxDelayMs = BaseDelayMs * math:pow(2, AttemptCount),min(trunc(DelayMs), MaxDelayMs).%% Check if a peer should be reconnected based on last attempt and backoffshould_reconnect_peer(PeerInfo) ->Now = os:system_time(millisecond),LastAttempt = maps:get(last_attempt, PeerInfo, 0),AttemptCount = maps:get(attempt_count, PeerInfo, 0),BackoffDelay = calculate_backoff_delay(AttemptCount),%% Reconnect if enough time has passed since last attempt(Now - LastAttempt) >= BackoffDelay.%% Check if peer address is already connectedis_peer_connected(Address, Peers) ->%% Address is "Host:Port" string from DB%% Peers values have address as {IPTuple, Port} or string%% 1. Parse and resolve the persistent addressTarget =case string:split(Address, ":") of[HostStr, PortStr] ->tryPort = list_to_integer(PortStr),case inet:getaddr(HostStr, inet) of{ok, IP} -> {IP, Port};_ -> undefinedendcatch_:_ -> undefinedend;_ ->undefinedend,case Target ofundefined ->false;TargetAddr ->%% 2. Check if any connected peer matches this resolved addresslists:any(fun({_ConnPid, PeerMeta}) ->case maps:get(address, PeerMeta, undefined) of{PeerIP, PeerPort} -> {PeerIP, PeerPort} =:= TargetAddr;AddrStr when is_list(AddrStr) -> AddrStr =:= Address;_ -> falseendend,maps:to_list(Peers))end.
LisAddr = proplists:get_value(listener, Args, ["0.0.0.0", 3113]),StoragePath = proplists:get_value(storage, Args, default_caberl_location()),gen_server:start_link(peer, [{listener, LisAddr}, {storage, StoragePath}], []).
gen_server:start_link(peer, Args, []).
peerChannelLists = #{},% Timer reference for periodic reconnection attemptsreconnectTimer = undefined,% Event handlers: #{HandlerPid => #{interval => Ms, timer_ref => Ref, pending => sets:set()}}eventHandlers = #{}
peerChannelLists = #{}
%% Synchronous dial with await_sync optiondial(Pid, Addr, #{await_sync := true, timeout := Timeout}) ->%% Start the dialcase Addr of{Host, Port} ->gen_server:cast(Pid, {peerDial, {Host, Port}});AddrStr when is_list(AddrStr) ->[HostStr, PortStr] = string:split(AddrStr, ":"),{ok, Host} = inet:getaddr(HostStr, inet),Port = list_to_integer(PortStr),gen_server:cast(Pid, {peerDial, {Host, Port}})end,%% Poll until we have an active connectionpoll_for_connection(Pid, Timeout);dial(Pid, Addr, Opts) ->%% If await_sync not specified, fall back to asynccase maps:get(await_sync, Opts, false) offalse -> dial(Pid, Addr);true -> dial(Pid, Addr, Opts#{timeout => maps:get(timeout, Opts, 5000)})end.
%% Synchronous fetch_history that blocks until completefetch_history(Pid, Channel, Limit, #{sync := true, timeout := Timeout}) ->StartTime = erlang:monotonic_time(millisecond),{ok, InitialUnfetched} = unfetched_count(Pid, Channel),{ok, RequestedCount} = fetch_history(Pid, Channel, Limit),
%% Do a sync call to ensure all pending writes are processed{ok, _} = channels_joined(Pid),%% Now wait for message count to stabilize (no new messages for a period)StartTime = erlang:monotonic_time(millisecond),poll_for_writes_complete(Pid, Channel, InitialCount, StartTime, Timeout).
%% construct state for event loopInitialState = #state{
%% provided by peer_supTPid = proplists:get_value(transport_pid, Args),ok = transport:register_handler(TPid, self()),%% Get the listening address from transport{ok, {LisAddr, Port}} = transport:get_address(TPid),io:format("[Peer] Listening on ~p:~p~n", [LisAddr, Port]),%% peer_events will be started by peer_sup and connected via update_peer_events_pidio:format("[Peer] Started, transport=~p (peer_events pending)~n", [TPid]),State = #state{
%% Create the event handler firstHandler = spawn(fun() -> event_loop(InitialState) end),io:format("[Peer] Event Loop: ~p~n", [Handler]),%% open network socket via transport and pass the handler[Host, WantPort] = proplists:get_value(listener, Args),{ok, TransportPid} = transport:start_link([{listen_addr, {Host, WantPort}},{key_pair, TransportKp},{event_handler, Handler}]),%% Update the event loop state with the transport PIDHandler ! {updateTransportPid, TransportPid},%% Start periodic reconnection timer (check every 10 seconds)Handler ! {startReconnectTimer},%% Get the listening address{ok, {LisAddr, Port}} = transport:get_address(TransportPid),io:format("[Peer] Listening on ~p:~p~n", [LisAddr, Port]),GenSrvState = [{transport, TransportPid},{listenAddr, {LisAddr, Port}},{eventLoop, Handler},{database, Db}],{ok, GenSrvState}.
{ok, State}.
handle_call({readTextsFromChannel, Chan}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {readTextsFromChannel, From, Chan},{noreply, State};handle_call({writeTextToChannel, Chan, Text}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {writeTextToChannel, From, Chan, Text},{noreply, State};handle_call({nodeAddr}, _From, State) ->LisAddr = proplists:get_value(listenAddr, State),
%% Database callshandle_call({readTextsFromChannel, Chan}, _From, State = #state{db = Db}) ->{ok, Texts} = db:get_texts_for_channel(Db, Chan),{reply, {ok, Texts}, State};handle_call({writeTextToChannel, Chan, Text}, _From, State) ->NewState = handle_database_messages(writeTextToChannel, {Chan, Text}, State),{reply, ok, NewState};%% System callshandle_call({nodeAddr}, _From, State = #state{listenAddr = LisAddr}) ->
handle_call({nodePubKey}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {nodePubKey, From},{noreply, State};handle_call({setOwnNick, Nick}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {setOwnNick, From, Nick},{noreply, State};handle_call({channelsMembers, Chan}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {channelsMembers, From, Chan},{noreply, State};handle_call({channelsSetTopic, Chan, Topic}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {channelsSetTopic, From, Chan, Topic},{noreply, State};handle_call({channelsList}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {channelsList, From},{noreply, State};handle_call({channelsKnown}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {channelsKnown, From},{noreply, State};handle_call({peerChannelList, Peer}, From, State) ->EL = proplists:get_value(eventLoop, State),EL ! {peerChannelList, Peer, From},{noreply, State};handle_call({fetchHistory, Channel, Limit}, From, State) ->EL = proplists:get_value(eventLoop, State),EL ! {fetchHistory, Channel, Limit, From},{noreply, State};handle_call({unfetchedCount, Channel}, From, State) ->EL = proplists:get_value(eventLoop, State),EL ! {unfetchedCount, Channel, From},{noreply, State};handle_call({peerList}, From, State) ->EL = proplists:get_value(eventLoop, State),% reply via event loopEL ! {peerList, From},{noreply, State};handle_call({persistentPeerAdd, Address}, _From, State) ->Db = proplists:get_value(database, State),
handle_call({nodePubKey}, _From, State = #state{keyPair = KeyPair}) ->PubKey = maps:get(public, KeyPair),{reply, {ok, PubKey}, State};handle_call({peerList}, _From, State = #state{peers = Peers}) ->EachPeer = fun(PeerId, PeerMeta, Acc) ->Acc ++ [{PeerId, PeerMeta}]end,PeerList = maps:fold(EachPeer, [], Peers),{reply, PeerList, State};%% Channel callshandle_call({channelsList}, _From, State = #state{channels = Chans}) ->{reply, {ok, maps:keys(Chans)}, State};handle_call({channelsKnown}, _From, State = #state{db = Db}) ->Result = db:channels_known(Db),{reply, Result, State};handle_call({channelsMembers, Chan}, _From, State = #state{channels = Chans, db = Db}) ->case maps:is_key(Chan, Chans) offalse ->{reply, {error, notInChannel}, State};true ->Result = db:get_channel_members(Db, Chan),{reply, Result, State}end;handle_call({channelsSetTopic, Chan, Topic}, _From, State = #state{channels = Chans}) ->case maps:is_key(Chan, Chans) offalse ->{reply, {error, notInChannel}, State};true ->NewState = handle_channel_messages(channelsSetTopic, {Chan, Topic}, State),{reply, ok, NewState}end;handle_call({setOwnNick, Nick}, _From, State) ->NewState = handle_channel_messages(setOwnNick, {Nick}, State),{reply, ok, NewState};%% Peer channel listhandle_call({peerChannelList, Peer}, _From, State = #state{peerChannelLists = Lists}) ->Result =case maps:get(Peer, Lists, undefined) ofundefined -> {ok, []};Channels -> {ok, Channels}end,{reply, Result, State};%% History fetchinghandle_call({fetchHistory, Channel, Limit}, _From, State) ->{Reply, NewState} = handle_system_messages(fetchHistory, {Channel, Limit}, State),{reply, Reply, NewState};handle_call({unfetchedCount, Channel}, _From, State = #state{db = Db}) ->Result = db:unfetched_count(Db, Channel),{reply, Result, State};%% Persistent peer managementhandle_call({persistentPeerAdd, Address}, _From, State = #state{db = Db}) ->
handle_call({registerEventHandler, HandlerPid, IntervalMs}, _From, State) ->EL = proplists:get_value(eventLoop, State),EL ! {registerEventHandler, HandlerPid, IntervalMs, self()},receive{event_handler_registered, Result} ->{reply, Result, State}after 5000 ->{reply, {error, timeout}, State}end;handle_call({unregisterEventHandler, HandlerPid}, _From, State) ->EL = proplists:get_value(eventLoop, State),EL ! {unregisterEventHandler, HandlerPid, self()},receive{event_handler_unregistered, Result} ->{reply, Result, State}after 5000 ->{reply, {error, timeout}, State}end.
%% Event handler management - forward to peer_eventshandle_call({registerEventHandler, HandlerPid, IntervalMs},_From,State = #state{peerEventsPid = PeerEventsPid}) ->Result = peer_events:register_event_handler(PeerEventsPid, HandlerPid, IntervalMs),{reply, Result, State};handle_call({unregisterEventHandler, HandlerPid}, _From, State = #state{peerEventsPid = PeerEventsPid}) ->Result = peer_events:unregister_event_handler(PeerEventsPid, HandlerPid),{reply, Result, State};%% Internal call for peer_events to get peer listhandle_call({get_peers}, _From, State = #state{peers = Peers}) ->{reply, {ok, Peers}, State};%% Supervisor helper callshandle_call({get_db}, _From, State = #state{db = Db}) ->{reply, {ok, Db}, State};handle_call({update_peer_events_pid, PeerEventsPid}, _From, State) ->io:format("[Peer] Updated peer_events PID to ~p~n", [PeerEventsPid]),link(PeerEventsPid),{reply, ok, State#state{peerEventsPid = PeerEventsPid}}.
EL = proplists:get_value(eventLoop, State),EL ! {requestChannelList, Peer},{noreply, State};
NewState = handle_peer_messages(requestChannelList, {Peer}, State),{noreply, NewState};
EL = proplists:get_value(eventLoop, State),EL ! {channelsJoin, Chan},{noreply, State};
NewState = handle_channel_messages(channelsJoin, {Chan}, State),{noreply, NewState};
EL = proplists:get_value(eventLoop, State),EL ! {channelsLeave, Chan},{noreply, State};handle_cast(stop, State) ->%% Note: Avoid io:format during shutdown - group leader may be dead in tests%% Stop transport with timeout to avoid hangingTPid = proplists:get_value(transport, State),case catch gen_server:call(TPid, stop, 1000) ofok ->ok;{'EXIT', {timeout, _}} ->exit(TPid, kill);_Other ->okend,%% Stop event loopEventLoopProc = proplists:get_value(eventLoop, State),EventLoopProc ! {stop},unlink(EventLoopProc),%% Close databaseDb = proplists:get_value(database, State),
NewState = handle_channel_messages(channelsLeave, {Chan}, State),{noreply, NewState};handle_cast(stop, State = #state{db = Db}) ->
handle_info({'EXIT', Pid, Reason}, State) ->Db = proplists:get_value(database, State),TPid = proplists:get_value(transport, State),EL = proplists:get_value(eventLoop, State),
%% Transport event messageshandle_info({peerNew, ConnPid, PeerAddr}, State) ->NewState = handle_peer_messages(peerNew, {ConnPid, PeerAddr}, State),{noreply, NewState};handle_info({peerLost, ConnPid, PeerAddr}, State) ->NewState = handle_peer_messages(peerLost, {ConnPid, PeerAddr}, State),{noreply, NewState};handle_info({peerData, ConnPid, Data}, State) ->NewState = handle_peer_messages(peerData, {ConnPid, Data}, State),{noreply, NewState};%% Network messages (decoded from peerData)handle_info({incomingMsg, Peer, Msg, MsgSize}, State) ->NewState = handle_network_messages(incomingMsg, {Peer, Msg, MsgSize}, State),{noreply, NewState};%% EXIT signalshandle_info({'EXIT', Pid, Reason},State = #state{db = Db, peerEventsPid = PeerEventsPid}) ->
TPid ->io:format("[Peer] Transport process died: ~p - shutting down peer~n", [Reason]),{stop, {transport_died, Reason}, State};EL ->io:format("[Peer] Event loop process died: ~p - shutting down peer~n", [Reason]),{stop, {event_loop_died, Reason}, State};
_ when Pid =:= PeerEventsPid andalso PeerEventsPid =/= undefined ->case Reason ofshutdown ->{noreply, State};normal ->{noreply, State};_ ->io:format("[Peer] Peer events process died: ~p~n", [Reason]),{stop, {peer_events_died, Reason}, State}end;
%% Main event loop - refactored to delegate to specific handlersevent_loop(State) ->receive{stop} ->ok;{updateTransportPid, TransportPid} ->event_loop(State#state{transportPid = TransportPid});{startReconnectTimer} ->%% Start periodic timer (10 seconds)TimerRef = erlang:send_after(10000, self(), {reconnectTimer}),event_loop(State#state{reconnectTimer = TimerRef});{reconnectTimer} ->%% Handle periodic reconnection attemptsNewState = attempt_peer_reconnections(State),%% Reschedule timerTimerRef = erlang:send_after(10000, self(), {reconnectTimer}),event_loop(NewState#state{reconnectTimer = TimerRef});{registerEventHandler, HandlerPid, IntervalMs, From} ->NewState = handle_register_event_handler(State, HandlerPid, IntervalMs),From ! {event_handler_registered, ok},event_loop(NewState);{unregisterEventHandler, HandlerPid, From} ->NewState = handle_unregister_event_handler(State, HandlerPid),From ! {event_handler_unregistered, ok},event_loop(NewState);{notificationTimer, HandlerPid} ->NewState = handle_notification_timer(State, HandlerPid),event_loop(NewState);%% System messages{nodePubKey, From} ->NewState = handle_system_messages(nodePubKey, {From}, State),event_loop(NewState);{peerList, From} ->NewState = handle_system_messages(peerList, {From}, State),event_loop(NewState);{channelsList, From} ->NewState = handle_system_messages(channelsList, {From}, State),event_loop(NewState);{channelsKnown, From} ->NewState = handle_system_messages(channelsKnown, {From}, State),event_loop(NewState);{peerChannelList, Peer, From} ->NewState = handle_system_messages(peerChannelList, {Peer, From}, State),event_loop(NewState);{fetchHistory, Channel, Limit, From} ->NewState = handle_system_messages(fetchHistory, {Channel, Limit, From}, State),event_loop(NewState);{unfetchedCount, Channel, From} ->NewState = handle_system_messages(unfetchedCount, {Channel, From}, State),event_loop(NewState);{stateChange, Chan} ->NewState = handle_system_messages(stateChange, {Chan}, State),event_loop(NewState);%% Peer management messages{peerData, ConnPid, Data} ->NewState = handle_peer_messages(peerData, {ConnPid, Data}, State),event_loop(NewState);{peerLost, ConnPid, PeerAddr} ->NewState = handle_peer_messages(peerLost, {ConnPid, PeerAddr}, State),event_loop(NewState);{peerNew, ConnPid, PeerAddr} ->NewState = handle_peer_messages(peerNew, {ConnPid, PeerAddr}, State),event_loop(NewState);{requestChannelList, ConnPid} ->NewState = handle_peer_messages(requestChannelList, {ConnPid}, State),event_loop(NewState);%% Channel messages{setOwnNick, From, Nick} ->NewState = handle_channel_messages(setOwnNick, {From, Nick}, State),event_loop(NewState);{channelsJoin, Chan} ->NewState = handle_channel_messages(channelsJoin, {Chan}, State),event_loop(NewState);{channelsLeave, Chan} ->NewState = handle_channel_messages(channelsLeave, {Chan}, State),event_loop(NewState);{channelsSetTopic, From, Chan, Topic} ->NewState = handle_channel_messages(channelsSetTopic, {From, Chan, Topic}, State),event_loop(NewState);{channelsMembers, From, Chan} ->NewState = handle_channel_messages(channelsMembers, {From, Chan}, State),event_loop(NewState);%% Database messages{readTextsFromChannel, From, Chan} ->NewState = handle_database_messages(readTextsFromChannel, {From, Chan}, State),event_loop(NewState);{writeTextToChannel, From, Chan, Text} ->NewState = handle_database_messages(writeTextToChannel, {From, Chan, Text}, State),event_loop(NewState);%% Network messages{incomingMsg, Peer, Msg, MsgSize} ->NewState = handle_network_messages(incomingMsg, {Peer, Msg, MsgSize}, State),event_loop(NewState);%% Unhandled messagesOther ->io:format("Unhandled message in event_loop: ~p~n", [Other]),event_loop(State)end.
case maps:is_key(Chan, Chans) of% not even joinedfalse ->gen_server:reply(From, {error, notInChannel}),State;true ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {topic, Chan, Topic}),{ok, _, PostHash} = db:save_post(Db, Bin),SentPeers = lists:foldl(fun({received, ReqId, ConnPid}, AccPeers) ->case maps:get(ReqId, ActiveIn, undefined) ofundefined ->%% Request no longer active (peer disconnected)AccPeers;{ConnPid, [Header, _]} ->case proplists:get_value(type, Header) of4 ->case send_hash_response(State, ConnPid, ReqId, [PostHash]) of{ok, {_, _, Size}} ->update_peer_sent(AccPeers, ConnPid, Size);{error, _Reason} ->%% Send failed (peer disconnected), skip updateAccPeersend;% ignored_ ->
{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {topic, Chan, Topic}),{ok, _, PostHash} = db:save_post(Db, Bin),SentPeers = lists:foldl(fun({received, ReqId, ConnPid}, AccPeers) ->case maps:get(ReqId, ActiveIn, undefined) ofundefined ->%% Request no longer active (peer disconnected)AccPeers;{ConnPid, [Header, _]} ->case proplists:get_value(type, Header) of4 ->case send_hash_response(State, ConnPid, ReqId, [PostHash]) of{ok, {_, _, Size}} ->update_peer_sent(AccPeers, ConnPid, Size);{error, _Reason} ->%% Send failed (peer disconnected), skip update
case maps:is_key(Chan, Chans) offalse ->gen_server:reply(From, {error, notInChannel}),State;true ->{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {text, Chan, Text}),{ok, _, PostHash} = db:save_post(Db, Bin),%% Mark channel for notification (we wrote a message to it)StateWithNotif = mark_channels_for_notification(State, [Chan]),%% find incoming channel time range (type:4) requests which want this postF = fun({Direction, ReqId, ConnPid}, AccPeers) ->case Direction ofreceived ->case maps:get(ReqId, ActiveIn, undefined) ofundefined ->%% Req no longer active (peer disconnected)AccPeers;{ConnPid, [Header, _]} ->case proplists:get_value(msgType, Header) of4 ->casesend_hash_response(State, ConnPid, ReqId, [PostHash])of{ok, {_, _, Size}} ->update_peer_sent(AccPeers, ConnPid, Size);{error, _Reason} ->%% Send failed (peer disconnected), skip updateAccPeersend;_ ->
{ok, Links} = db:get_channel_heads(Db, Chan),Bin = posts:encode(KeyPair, Links, {text, Chan, Text}),{ok, _, PostHash} = db:save_post(Db, Bin),%% Mark channel for notification (we wrote a message to it)StateWithNotif = mark_channels_for_notification(State, [Chan]),%% find incoming channel time range (type:4) requests which want this postF = fun({Direction, ReqId, ConnPid}, AccPeers) ->case Direction ofreceived ->case maps:get(ReqId, ActiveIn, undefined) ofundefined ->%% Req no longer active (peer disconnected)AccPeers;{ConnPid, [Header, _]} ->case proplists:get_value(msgType, Header) of4 ->case send_hash_response(State, ConnPid, ReqId, [PostHash]) of{ok, {_, _, Size}} ->update_peer_sent(AccPeers, ConnPid, Size);{error, _Reason} ->%% Send failed (peer disconnected), skip update
endend;sent ->AccPeersendend,SentPeers = lists:foldl(F, Peers, maps:get(Chan, Chans)),gen_server:reply(From, ok),StateWithNotif#state{peers = SentPeers}end;
end;_ ->AccPeersendend;sent ->AccPeersendend,SentPeers = lists:foldl(F, Peers, maps:get(Chan, Chans)),StateWithNotif#state{peers = SentPeers};
%% Calculate exponential backoff delay in milliseconds%% Base delay is 5 seconds, max is 5 minutescalculate_backoff_delay(AttemptCount) ->% 5 secondsBaseDelayMs = 5000,% 5 minutesMaxDelayMs = 300000,%% Exponential backoff: base * 2^attempts, capped at maxDelayMs = BaseDelayMs * math:pow(2, AttemptCount),min(trunc(DelayMs), MaxDelayMs).%% Check if a peer should be reconnected based on last attempt and backoffshould_reconnect_peer(PeerInfo) ->Now = os:system_time(millisecond),LastAttempt = maps:get(last_attempt, PeerInfo, 0),AttemptCount = maps:get(attempt_count, PeerInfo, 0),BackoffDelay = calculate_backoff_delay(AttemptCount),%% Reconnect if enough time has passed since last attempt(Now - LastAttempt) >= BackoffDelay.%% Check if peer address is already connectedis_peer_connected(Address, Peers) ->%% Address is "Host:Port" string from DB%% Peers values have address as {IPTuple, Port} usually%% 1. Parse and resolve the persistent addressTarget =case string:split(Address, ":") of[HostStr, PortStr] ->tryPort = list_to_integer(PortStr),case inet:getaddr(HostStr, inet) of{ok, IP} -> {IP, Port};_ -> undefinedendcatch_:_ -> undefinedend;_ ->undefinedend,case Target of
mark_channels_for_notification(State = #state{peerEventsPid = PeerEventsPid}, Channels) ->%% Notify peer_events about new channel activitycase PeerEventsPid of
false;TargetAddr ->%% 2. Check if any connected peer matches this resolved addresslists:any(fun({_ConnPid, PeerMeta}) ->case maps:get(address, PeerMeta, undefined) of{PeerIP, PeerPort} -> {PeerIP, PeerPort} =:= TargetAddr;_ -> falseendend,maps:to_list(Peers))
%% peer_events not started yet (during initialization), skip notificationState;_ ->peer_events:notify_channels(PeerEventsPid, Channels),State
%% Event handler management functions
%% Polling helper for synchronous dialpoll_for_connection(Pid, Timeout) ->%% First wait for peer connectionStartTime = erlang:monotonic_time(millisecond),case poll_for_peer_connection(Pid, Timeout, StartTime) ofok ->%% Then wait for initial sync to complete by checking that channels have contentpoll_for_initial_sync(Pid, Timeout, StartTime);{error, Reason} ->{error, Reason}end.
handle_register_event_handler(State = #state{eventHandlers = Handlers}, HandlerPid, IntervalMs) ->case maps:is_key(HandlerPid, Handlers) of
poll_for_peer_connection(Pid, Timeout, StartTime) ->Elapsed = erlang:monotonic_time(millisecond) - StartTime,ifElapsed >= Timeout ->{error, timeout};
%% Already registered, update intervalio:format("[Peer] Updating event handler ~p interval to ~pms~n", [HandlerPid, IntervalMs]),OldHandler = maps:get(HandlerPid, Handlers),%% Cancel old timercase maps:get(timer_ref, OldHandler, undefined) ofundefined -> ok;OldTimerRef -> erlang:cancel_timer(OldTimerRef)end,%% Start new timerTimerRef = erlang:send_after(IntervalMs, self(), {notificationTimer, HandlerPid}),NewHandler = OldHandler#{interval => IntervalMs, timer_ref => TimerRef},State#state{eventHandlers = maps:put(HandlerPid, NewHandler, Handlers)};false ->%% New registrationio:format("[Peer] Registered event handler ~p with interval ~pms~n", [HandlerPid, IntervalMs]),TimerRef = erlang:send_after(IntervalMs, self(), {notificationTimer, HandlerPid}),Handler = #{interval => IntervalMs,timer_ref => TimerRef,pending => sets:new()},State#state{eventHandlers = maps:put(HandlerPid, Handler, Handlers)}
case peer_list(Pid) of[] ->timer:sleep(50),poll_for_peer_connection(Pid, Timeout, StartTime);[_ | _] ->okend
handle_unregister_event_handler(State = #state{eventHandlers = Handlers}, HandlerPid) ->case maps:take(HandlerPid, Handlers) oferror ->io:format("[Peer] Event handler ~p not found~n", [HandlerPid]),State;{Handler, NewHandlers} ->io:format("[Peer] Unregistered event handler ~p~n", [HandlerPid]),%% Cancel timercase maps:get(timer_ref, Handler, undefined) ofundefined -> ok;TimerRef -> erlang:cancel_timer(TimerRef)end,State#state{eventHandlers = NewHandlers}
poll_for_initial_sync(Pid, Timeout, StartTime) ->Elapsed = erlang:monotonic_time(millisecond) - StartTime,ifElapsed >= Timeout ->{error, timeout};true ->%% Check if we have any messages or unfetched hashes in joined channels{ok, Channels} = channels_joined(Pid),case Channels of[] ->%% No channels joined, sync complete{ok, synced};[Chan | _] ->%% Check if channel has content (messages or unfetched){ok, {Texts, _}} = read(Pid, Chan),{ok, Unfetched} = unfetched_count(Pid, Chan),Total = length(Texts) + Unfetched,ifTotal > 0 ->%% Have content, sync complete{ok, synced};true ->%% Still syncingtimer:sleep(50),poll_for_initial_sync(Pid, Timeout, StartTime)endend
handle_notification_timer(State = #state{eventHandlers = Handlers}, HandlerPid) ->case maps:get(HandlerPid, Handlers, undefined) ofundefined ->%% Handler was unregistered, ignoreState;Handler ->Pending = maps:get(pending, Handler),Interval = maps:get(interval, Handler),%% Send notifications for all pending channelscase sets:size(Pending) of0 ->% Nothing to notifyok;_ ->PendingList = sets:to_list(Pending),lists:foreach(fun(Chan) ->tryHandlerPid ! {channel_event, {new_messages, Chan}}catch_:_ ->io:format("[Peer] Failed to send event to handler ~p~n", [HandlerPid])endend,PendingList),io:format("[Peer] Notified handler ~p of ~p channels~n", [HandlerPid, length(PendingList)])end,%% Clear pending and restart timerTimerRef = erlang:send_after(Interval, self(), {notificationTimer, HandlerPid}),NewHandler = Handler#{pending => sets:new(), timer_ref => TimerRef},State#state{eventHandlers = maps:put(HandlerPid, NewHandler, Handlers)}end.mark_channels_for_notification(State = #state{eventHandlers = Handlers}, Channels) ->case maps:size(Handlers) of0 ->% No handlers, nothing to doState;_ ->%% Add channels to pending set for all handlersNewHandlers = maps:map(fun(_Pid, Handler) ->Pending = maps:get(pending, Handler),NewPending = lists:foldl(fun sets:add_element/2, Pending, Channels),Handler#{pending => NewPending}end,Handlers),State#state{eventHandlers = NewHandlers}
%% Polling helper for synchronous fetch_historypoll_for_fetch_completion(Pid, Channel, InitialUnfetched, RequestedCount, StartTime, Timeout) ->Elapsed = erlang:monotonic_time(millisecond) - StartTime,ifElapsed >= Timeout ->{error, timeout};true ->{ok, CurrentUnfetched} = unfetched_count(Pid, Channel),Expected = max(0, InitialUnfetched - RequestedCount),ifCurrentUnfetched =< Expected ->{ok, RequestedCount};true ->timer:sleep(50),poll_for_fetch_completion(Pid, Channel, InitialUnfetched, RequestedCount, StartTime, Timeout)end
%% Attempt to reconnect to persistent peers based on exponential backoffattempt_peer_reconnections(State = #state{db = Db, transportPid = TransportPid, peers = Peers}) ->%% Get all persistent peers{ok, PersistentPeers} = db:peer_list(Db),%% Filter and attempt reconnectionslists:foreach(fun(PeerInfo) ->Address = maps:get(address, PeerInfo),%% Only attempt if not already connected and backoff period has passedcase is_peer_connected(Address, Peers) oftrue ->% Already connected, skipok;false ->case should_reconnect_peer(PeerInfo) of
%% Polling helper for await_writes%% Wait for message count to stabilize (no changes for 200ms)poll_for_writes_complete(Pid, Channel, LastCount, StartTime, Timeout) ->Elapsed = erlang:monotonic_time(millisecond) - StartTime,ifElapsed >= Timeout ->{error, timeout};true ->{ok, {Texts, _}} = read(Pid, Channel),CurrentCount = length(Texts),ifCurrentCount =:= LastCount ->%% Count stable, wait a bit to confirmtimer:sleep(200),{ok, {Texts2, _}} = read(Pid, Channel),FinalCount = length(Texts2),ifFinalCount =:= CurrentCount ->%% Still stable, we're doneok;
%% Attempt reconnectionio:format("[Peer] Attempting reconnection to ~s~n", [Address]),try%% Parse address and dial[HostStr, PortStr] = string:split(Address, ":"),{ok, Host} = inet:getaddr(HostStr, inet),Port = list_to_integer(PortStr),transport:dial(TransportPid, Host, Port),%% Update last_attempt and increment attempt_countNow = os:system_time(millisecond),AttemptCount = maps:get(attempt_count, PeerInfo, 0),db:peer_update(Db, Address, [{last_attempt, Now},{attempt_count, AttemptCount + 1}])catch_:Error ->io:format("[Peer] Failed to dial ~s: ~p~n", [Address, Error])end;false ->% Backoff period not elapsed, skipokend
%% Changed during wait, continue pollingtimer:sleep(50),poll_for_writes_complete(Pid, Channel, FinalCount, StartTime, Timeout)end;true ->%% Count changed, continue pollingtimer:sleep(50),poll_for_writes_complete(Pid, Channel, CurrentCount, StartTime, Timeout)
{application, cable,[{description, "cable"},{vsn, {cmd, "echo 0.1-`git describe --always --tags`"}},{registered, []},{applications, [kernel,% sasl,stdlib,crypto,observer,% wx,% ranch,enacl,% jiffy,% bitcask,compiler,debugger,syntax_tools,tools]},{env, []}% {mod, {cable_app, [{cable_log_level, notice}]}}
{application, cable, [{description, "cable"},{vsn, {cmd, "echo 0.1-`git describe --always --tags`"}},{registered, []},{applications, [kernel,% sasl,stdlib,crypto,observer,% wx,% ranch,enacl,% jiffy,% bitcask,compiler,debugger,syntax_tools,tools]},{env, []}% {mod, {cable_app, [{cable_log_level, notice}]}}
-module(accept_worker).-behaviour(gen_server).-export([start_link/1]).-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).-record(state, {listener_pid,keypair,transport_pid}).%% Start the accept workerstart_link(Args) ->gen_server:start_link(?MODULE, Args, []).%% gen_server callbacksinit(Args) ->TransportPid = proplists:get_value(transport_pid, Args),%% Get listener info from transport{ok, {ListenerPid, KeyPair}} = transport:get_listener_info(TransportPid),io:format("[AcceptWorker] Starting with listener ~p, transport ~p~n", [ListenerPid, TransportPid]),%% Trigger first accept immediatelyself() ! accept,{ok, #state{listener_pid = ListenerPid,keypair = KeyPair,transport_pid = TransportPid}}.handle_call(Request, _From, State) ->io:format("[AcceptWorker] Unhandled call: ~p~n", [Request]),{reply, {error, unknown_call}, State}.handle_cast(Msg, State) ->io:format("[AcceptWorker] Unhandled cast: ~p~n", [Msg]),{noreply, State}.handle_info(accept,State = #state{listener_pid = ListenerPid, keypair = KeyPair, transport_pid = TransportPid}) ->io:format("[AcceptWorker] Waiting for encrypted connections (listener: ~p, transport: ~p)~n", [ListenerPid, TransportPid]),Opts = [{keypair, KeyPair}],case enoise_cable:accept(ListenerPid, Opts) of{ok, ConnPid} ->io:format("[AcceptWorker] Accepted new encrypted connection: ~p~n", [ConnPid]),%% Wrap the connection setup in a try/catch to prevent crashesResult =try%% Set the controlling process to the transport gen_serverok = enoise_cable:controlling_process(ConnPid, TransportPid),%% Get the peer address from the connection{ok, {PeerIP, PeerPort}} = enoise_cable:peername(ConnPid),PeerAddr = {PeerIP, PeerPort},io:format("[AcceptWorker] Peer address: ~p~n", [PeerAddr]),TransportPid ! {new_connection, ConnPid, PeerAddr},okcatchError:ErrorReason:Stacktrace ->io:format("[AcceptWorker] Error setting up connection ~p: ~p:~p~n Stacktrace: ~p~n",[ConnPid, Error, ErrorReason, Stacktrace]),%% Close the connection on errorcatch enoise_cable:close(ConnPid),errorend,case Result ofok -> ok;error -> io:format("[AcceptWorker] Connection setup failed, continuing to accept~n")end,%% Continue acceptingself() ! accept,{noreply, State};{error, AcceptReason} ->io:format("[AcceptWorker] Accept error: ~p~n", [AcceptReason]),%% Retry after 1 seconderlang:send_after(1000, self(), accept),{noreply, State}end;handle_info(Info, State) ->io:format("[AcceptWorker] Unhandled info: ~p~n", [Info]),{noreply, State}.terminate(_Reason, _State) ->ok.code_change(_OldVsn, State, _Extra) ->{ok, State}.