CTO6D5DFEM2N3FD7HOJART6MSTV3LNZ56UJKHW3X3SVQCDTCHGIQC 76BR2NBTN2IMHYXE4SXEZFPYUZDP55XVPXVHBJFG3T6EKDXXODFQC EDLKGFB5NWTZTHEO6IAR5M4W4533KJI4O6673MKJNTW3JPHQZP5AC JVURIEXR72OUGZ2EHP5HB6OMXUNPHMAV66YUUP2444TPOG5XGASQC JMYRP5MUPSBHABHBPIO45F6RC7QSQSNO2EJDUDUIQXX3MEEFDOAQC JR3F3TQ3A7I2K5AWRK7SKAOZI5XE3ZGAS3VK4O7FKODUONQSY35QC 34UNGHWEZK2J2VHCULNFD5AQPZQPD6MXU7ZAKY4DLJ5TQSROQ4BAC 55WLMLEEVBRSTAFRZ5RGF7TOGUF5OPVCPA2TMHAQK45OUO7PA3YQC M4TNRFRPHEH6T673JAMJ3CHABASCWMAJVU57HH2XEMDJCB3QPT5QC DJ7EM5ZXZRSOBHEAA5EVZNVULJCQ7EX4DQBSK2PKPXITWJDPIUXQC QTLCENKPK4QOQJTHEAWAJYTJWVH7ZI5KQ2CQTMJRMA4TOMCHTVBAC BSISJB2O2HKYGSCX6HIIMLBIXCZ66BRCZH3622G2NOQJRJ5S3HLAC ST7Y5WTYIQNKUOVXW6T5PQZL6R3VUYC37BAEQOA2FL6AYVPR4OLAC NBMKIBO6UJKXCOXXVPPENLEBYI4YOU2VCHH5KIOUGH7WJG47N4PQC 2E4H4QPHKUDVTUDO335LRPAAU7374N5EX6TJW2NKDW7JK3N4HRTQC 6RQQDL46IO2ZFTJSEJREWJIMTNHOH4UBSO2VXAYNLEWNUR72OWHQC 2R3WFEOT3WWS6NFBBABSVRUNUPTXHFFMGPZZQOCPLTD2WB3U55HQC MONVI5STEDKY5ALVMEXJJXDUX6XQRKTFLP7BBNOQML3VSJEM2JAAC lazy_fetch_test() ->%% Test the hybrid fetching behavior:%% - First 100 messages fetched eagerly%% - Remaining messages stored as unfetched%% - fetch_history retrieves them on demandC = "backlog-test",ArgsAlice = make_test_args("Alice"),ArgsBob = make_test_args("Bob"),{ok, Alice} = peer:start_link(ArgsAlice),{ok, Bob} = peer:start_link(ArgsBob),
%% Alice creates a channel with 150 text messagespeer:join(Alice, C),peer:set_nick(Alice, "Alice"),%% Create 150 text messagesNumMessages = 150,lists:foreach(fun(N) ->Msg = lists:flatten(io_lib:format("Message number ~p", [N])),ok = peer:write(Alice, C, Msg)end,lists:seq(1, NumMessages)),%% Give Alice a moment to process all writestimer:sleep(100),%% Verify Alice has all messages{ok, {AliceTexts, _}} = peer:read(Alice, C),?debugFmt("~nAlice has ~p messages in channel~n", [length(AliceTexts)]),?assertEqual(NumMessages, length(AliceTexts)),%% Bob joins and connects to Alicepeer:join(Bob, C),peer:set_nick(Bob, "Bob"),%% Connect the peers{ok, AliceAddr} = peer:node_addr(Alice),ok = peer:dial(Bob, AliceAddr),%% Give them time to sync (channel state + first 100 messages)timer:sleep(2000),%% Bob should have some unfetched messages (>50 since we sent 150, fetched 100 eagerly){ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),?debugFmt("~nBob has ~p unfetched messages~n", [UnfetchedBefore]),?assert(UnfetchedBefore >= 40), %% Allow some wiggle room for processing?assert(UnfetchedBefore =< NumMessages - 90), %% At least 90 should have been fetched%% Fetch next 30 messages from history{ok, RequestedCount} = peer:fetch_history(Bob, C, 30),?debugFmt("~nRequested ~p messages from history~n", [RequestedCount]),?assertEqual(30, RequestedCount),%% Give time for messages to arrivetimer:sleep(1000),%% Verify unfetched count decreased{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),?debugFmt("~nBob now has ~p unfetched messages~n", [UnfetchedAfter]),?assert(UnfetchedAfter < UnfetchedBefore),?assertEqual(UnfetchedBefore - 30, UnfetchedAfter),%% Verify Bob can read more messages now{ok, {BobTexts, _}} = peer:read(Bob, C),?debugFmt("~nBob can now read ~p messages~n", [length(BobTexts)]),%% Should have at least 130 messages (100 eager + 30 from fetch_history)?assert(length(BobTexts) >= 130),%% Fetch the rest{ok, RequestedRest} = peer:fetch_history(Bob, C, 100),?debugFmt("~nRequested remaining ~p messages~n", [RequestedRest]),timer:sleep(1000),%% Verify all messages fetched{ok, UnfetchedFinal} = peer:unfetched_count(Bob, C),?debugFmt("~nBob finally has ~p unfetched messages~n", [UnfetchedFinal]),?assertEqual(0, UnfetchedFinal),%% Verify Bob has all messages{ok, {BobFinalTexts, _}} = peer:read(Bob, C),?debugFmt("~nBob can read all ~p messages~n", [length(BobFinalTexts)]),?assertEqual(NumMessages, length(BobFinalTexts)),ok = peer:stop(Alice),ok = peer:stop(Bob).
handle_call({fetchHistory, Channel, Limit}, From, State) ->EL = proplists:get_value(eventLoop, State),EL ! {fetchHistory, Channel, Limit, From},{noreply, State};handle_call({unfetchedCount, Channel}, From, State) ->EL = proplists:get_value(eventLoop, State),EL ! {unfetchedCount, Channel, From},{noreply, State};
gen_server:reply(From, Result),State;handle_system_messages(fetchHistory, {Channel, Limit, From}, State = #state{db = Db, peers = Peers}) ->%% Get unfetched hashes from database{ok, UnfetchedHashes} = db:unfetched_get(Db, Channel, Limit),case UnfetchedHashes of[] ->gen_server:reply(From, {ok, 0}),State;_ ->%% Pick a random connected peer to request fromPeerPids = maps:keys(Peers),case PeerPids of[] ->gen_server:reply(From, {error, no_connected_peers}),State;_ ->ConnPid = lists:nth(rand:uniform(length(PeerPids)), PeerPids),{ok, {PostReqId, _PostReqMsg, SentSize}} = send_post_request(State, ConnPid, UnfetchedHashes),ActiveOut = State#state.activeOut,UpdatedOut = maps:put(PostReqId, {ConnPid, post_request}, ActiveOut),gen_server:reply(From, {ok, length(UnfetchedHashes)}),State#state{activeOut = UpdatedOut,peers = update_peer_sent(Peers, ConnPid, SentSize)}endend;handle_system_messages(unfetchedCount, {Channel, From}, State = #state{db = Db}) ->Result = db:unfetched_count(Db, Channel),
%% this needs to be spread out across conns%% asking the responder directly should be okay enough for now%% TODO: keep track of which hashes we asked for{ok, {PostReqId, _PostReqMsg, SentSize}} = send_post_request(S, ConnPid, NewHashes),UpdatedOut = maps:put(PostReqId, ConnPid, ActiveOut),%% update loop stateS#state{peers = update_peer_sent(NewPeers, ConnPid, SentSize),activeOut = UpdatedOut}
%% Determine request type to apply hybrid fetch strategy{_OrigConnPid, OrigMsg} = maps:get(ReqId, ActiveOut, {ConnPid, undefined}),OrigMsgType = case OrigMsg ofundefined -> unknown;[OrigHeader, _OrigBody] ->proplists:get_value(msgType, OrigHeader, unknown);_ -> unknownend,%% Hybrid fetch logic:%% - channel_state_request (type 5): fetch ALL (metadata like join/leave/topic)%% - channel_time_range_request (type 4): fetch first 100, store rest as unfetched%% - unknown: default to eager fetch (backward compat){HashesToFetch, HashesToStore, Channel} = case OrigMsgType of5 -> % channel_state_request - fetch all eagerlyio:format("[Peer] Hash response for channel_state_request: fetching all ~p hashes eagerly~n", [length(NewHashes)]),{NewHashes, [], undefined};4 -> % channel_time_range_request - hybrid strategyChan = case OrigMsg of[_OH, OrigBody] -> proplists:get_value(channel, OrigBody, undefined);_ -> undefinedend,EagerCount = min(100, length(NewHashes)),Eager = lists:sublist(NewHashes, EagerCount),Lazy = lists:sublist(NewHashes, EagerCount + 1, length(NewHashes)),io:format("[Peer] Hash response for channel_time_range_request (~s): fetching ~p hashes eagerly, storing ~p as unfetched~n",[Chan, length(Eager), length(Lazy)]),{Eager, Lazy, Chan};_ -> % unknown - default to eagerio:format("[Peer] Hash response for unknown request type: fetching all ~p hashes eagerly (fallback)~n", [length(NewHashes)]),{NewHashes, [], undefined}end,%% Store unfetched hashes in database (internal operation)case HashesToStore of[] -> ok;_ when Channel =/= undefined ->gen_server:call(Db, {unfetchedAdd, HashesToStore, Channel, "hash_response"});_ ->io:format("[Peer] Warning: Cannot store unfetched hashes without channel info~n"),okend,%% Request posts for eager hashescase HashesToFetch of[] ->S#state{peers = NewPeers};_ ->{ok, {PostReqId, _PostReqMsg, SentSize}} = send_post_request(S, ConnPid, HashesToFetch),UpdatedOut = maps:put(PostReqId, {ConnPid, post_request}, ActiveOut),S#state{peers = update_peer_sent(NewPeers, ConnPid, SentSize),activeOut = UpdatedOut}end
[{ok, _, _} = db:save_post(Db, P) || P <- NewPosts],
SavedPosts = [db:save_post(Db, P) || P <- NewPosts],%% Remove successfully fetched hashes from unfetched_hashes table (internal operation)FetchedHashes = [Hash || {ok, _, Hash} <- SavedPosts],case FetchedHashes of[] -> ok;_ ->gen_server:call(Db, {unfetchedDelete, FetchedHashes}),io:format("[Peer] Removed ~p fetched hashes from unfetched table~n", [length(FetchedHashes)])end,
%% unfetched hash queries (internal - called by peer module only)unfetched_get(Db, Channel, Limit) ->gen_server:call(Db, {unfetchedGet, Channel, Limit}).unfetched_count(Db, Channel) ->gen_server:call(Db, {unfetchedCount, Channel}).%% Note: unfetched_add and unfetched_delete are internal operations%% called directly via gen_server:call from peer module, no wrapper functions
{reply, Result, State}.
{reply, Result, State};%%%%%%%%%%%%%%%%%%%%%%%% Unfetched Hashes %%%%%%%%%%%%%%%%%%%%%%%%handle_call({unfetchedAdd, Hashes, Channel, Source}, _From, [{sql, Db}, _] = State) ->Now = os:system_time(millisecond),ChannelBin = list_to_binary(Channel),SourceBin = list_to_binary(Source),Results = lists:map(fun(Hash) ->Row = [{hash, {blob, Hash}},{channel, ChannelBin},{discovered_at, Now},{source, SourceBin}],case sqlite3:write(Db, unfetched_hashes, Row) of{rowid, _RowId} -> ok;{error, 19, _Msg} -> already_existsendend,Hashes),{reply, {ok, Results}, State};handle_call({unfetchedGet, Channel, Limit}, _From, [{sql, Db}, _] = State) ->ChannelBin = list_to_binary(Channel),Query = "SELECT hash FROM unfetched_hashes WHERE channel = ? ORDER BY discovered_at ASC LIMIT ?",[{columns, ["hash"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Query, [ChannelBin, Limit]),Hashes = [Hash || {{blob, Hash}} <- Rows],{reply, {ok, Hashes}, State};handle_call({unfetchedCount, Channel}, _From, [{sql, Db}, _] = State) ->ChannelBin = list_to_binary(Channel),Query = "SELECT count(*) as count FROM unfetched_hashes WHERE channel = ?",[{columns, ["count"]}, {rows, [{Count}]}] = sqlite3:sql_exec(Db, Query, [ChannelBin]),{reply, {ok, Count}, State};handle_call({unfetchedDelete, Hashes}, _From, [{sql, Db}, _] = State) ->lists:foreach(fun(Hash) ->ok = sqlite3:delete(Db, unfetched_hashes, {hash, {blob, Hash}})end,Hashes),{reply, ok, State}.