% SPDX-FileCopyrightText: 2023 Henry Bubert
%
% SPDX-License-Identifier: LGPL-2.1-or-later
-module(cable_lazy_loading_suite).
-include_lib("eunit/include/eunit.hrl").
original_lazy_fetch_test() ->
%% Test the hybrid fetching behavior:
%% - First 100 messages fetched eagerly
%% - Remaining messages stored as unfetched
%% - fetch_history retrieves them on demand
C = "backlog-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
%% Alice creates a channel with 150 text messages
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 150 text messages
NumMessages = 150,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message number ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NumMessages)
),
%% Give Alice a moment to process all writes
timer:sleep(100),
%% Verify Alice has all messages
{ok, {AliceTexts, _}} = peer:read(Alice, C),
?debugFmt("~nAlice has ~p messages in channel~n", [length(AliceTexts)]),
?assertEqual(NumMessages, length(AliceTexts)),
%% Bob joins and connects to Alice
peer:join(Bob, C),
peer:set_nick(Bob, "Bob"),
%% Connect the peers
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
%% Give them time to sync (channel state + first 100 messages)
timer:sleep(2000),
%% Bob should have some unfetched messages (>50 since we sent 150, fetched 100 eagerly)
{ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob has ~p unfetched messages~n", [UnfetchedBefore]),
%% Allow some wiggle room for processing
?assert(UnfetchedBefore >= 40),
%% At least 90 should have been fetched
?assert(UnfetchedBefore =< NumMessages - 90),
%% Fetch next 30 messages from history
{ok, RequestedCount} = peer:fetch_history(Bob, C, 30),
?debugFmt("~nRequested ~p messages from history~n", [RequestedCount]),
?assertEqual(30, RequestedCount),
%% Give time for messages to arrive
timer:sleep(1000),
%% Verify unfetched count decreased
{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob now has ~p unfetched messages~n", [UnfetchedAfter]),
?assert(UnfetchedAfter < UnfetchedBefore),
?assertEqual(UnfetchedBefore - 30, UnfetchedAfter),
%% Verify Bob can read more messages now
{ok, {BobTexts, _}} = peer:read(Bob, C),
?debugFmt("~nBob can now read ~p messages~n", [length(BobTexts)]),
%% Should have at least 130 messages (100 eager + 30 from fetch_history)
?assert(length(BobTexts) >= 130),
%% Fetch the rest
{ok, RequestedRest} = peer:fetch_history(Bob, C, 100),
?debugFmt("~nRequested remaining ~p messages~n", [RequestedRest]),
timer:sleep(1000),
%% Verify all messages fetched
{ok, UnfetchedFinal} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob finally has ~p unfetched messages~n", [UnfetchedFinal]),
?assertEqual(0, UnfetchedFinal),
%% Verify Bob has all messages
{ok, {BobFinalTexts, _}} = peer:read(Bob, C),
?debugFmt("~nBob can read all ~p messages~n", [length(BobFinalTexts)]),
?assertEqual(NumMessages, length(BobFinalTexts)),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Basic Lazy Loading Tests
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
basic_lazy_fetch_test() ->
%% Test the hybrid fetching behavior:
%% - First 100 messages fetched eagerly
%% - Remaining messages stored as unfetched
%% - fetch_history retrieves them on demand
C = "backlog-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
%% Alice creates a channel with 150 text messages
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 150 text messages
NumMessages = 150,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message number ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NumMessages)
),
%% Give Alice a moment to process all writes
timer:sleep(100),
%% Verify Alice has all messages
{ok, {AliceTexts, _}} = peer:read(Alice, C),
?debugFmt("~nAlice has ~p messages in channel~n", [length(AliceTexts)]),
?assertEqual(NumMessages, length(AliceTexts)),
%% Bob joins and connects to Alice
peer:join(Bob, C),
peer:set_nick(Bob, "Bob"),
%% Connect the peers
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
%% Give them time to sync (channel state + first 100 messages)
timer:sleep(2000),
%% Bob should have some unfetched messages (>50 since we sent 150, fetched 100 eagerly)
{ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob has ~p unfetched messages~n", [UnfetchedBefore]),
%% Allow some wiggle room for processing
?assert(UnfetchedBefore >= 40),
%% At least 90 should have been fetched
?assert(UnfetchedBefore =< NumMessages - 90),
%% Fetch next 30 messages from history
{ok, RequestedCount} = peer:fetch_history(Bob, C, 30),
?debugFmt("~nRequested ~p messages from history~n", [RequestedCount]),
?assertEqual(30, RequestedCount),
%% Give time for messages to arrive
timer:sleep(1000),
%% Verify unfetched count decreased
{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob now has ~p unfetched messages~n", [UnfetchedAfter]),
?assert(UnfetchedAfter < UnfetchedBefore),
?assertEqual(UnfetchedBefore - 30, UnfetchedAfter),
%% Verify Bob can read more messages now
{ok, {BobTexts, _}} = peer:read(Bob, C),
?debugFmt("~nBob can now read ~p messages~n", [length(BobTexts)]),
%% Should have at least 130 messages (100 eager + 30 from fetch_history)
?assert(length(BobTexts) >= 130),
%% Fetch the rest
{ok, RequestedRest} = peer:fetch_history(Bob, C, 100),
?debugFmt("~nRequested remaining ~p messages~n", [RequestedRest]),
timer:sleep(1000),
%% Verify all messages fetched
{ok, UnfetchedFinal} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob finally has ~p unfetched messages~n", [UnfetchedFinal]),
?assertEqual(0, UnfetchedFinal),
%% Verify Bob has all messages
{ok, {BobFinalTexts, _}} = peer:read(Bob, C),
?debugFmt("~nBob can read all ~p messages~n", [length(BobFinalTexts)]),
?assertEqual(NumMessages, length(BobFinalTexts)),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Real-world Scenario Tests
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
sync_then_new_posts_test() ->
%% Test scenario:
%% 1. Alice creates 150 messages
%% 2. Bob connects and syncs (gets 100 eagerly, 50 unfetched)
%% 3. Alice creates 20 MORE messages while connected
%% 4. Verify Bob's unfetched count updates correctly
C = "updates-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Phase 1: Create initial 150 messages
InitialMessages = 150,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Initial message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, InitialMessages)
),
timer:sleep(100),
%% Phase 2: Bob connects
peer:join(Bob, C),
peer:set_nick(Bob, "Bob"),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
timer:sleep(2000),
%% Check initial unfetched count
{ok, UnfetchedInitial} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob initially has ~p unfetched messages~n", [UnfetchedInitial]),
?assert(UnfetchedInitial >= 40),
%% Phase 3: Alice creates MORE messages while connected
NewMessages = 20,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("New message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NewMessages)
),
timer:sleep(1000),
%% Phase 4: Check if unfetched count is correct
%% This is the KEY test - does the unfetched count reflect reality?
{ok, UnfetchedAfterNew} = peer:unfetched_count(Bob, C),
?debugFmt("~nAfter new messages, Bob has ~p unfetched~n", [UnfetchedAfterNew]),
%% Count how many messages Bob actually has
{ok, {BobTexts, _}} = peer:read(Bob, C),
ActualReceived = length(BobTexts),
ExpectedUnfetched = (InitialMessages + NewMessages) - ActualReceived,
?debugFmt(
"~nBob has ~p messages, should have ~p unfetched~n",
[ActualReceived, ExpectedUnfetched]
),
%% The unfetched count should reflect the actual gap
%% This test will likely FAIL with current implementation
?assertEqual(ExpectedUnfetched, UnfetchedAfterNew),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
disconnect_reconnect_test() ->
%% Test scenario:
%% 1. Alice creates 100 messages, Bob syncs
%% 2. Bob disconnects
%% 3. Alice creates 50 more messages
%% 4. Bob reconnects
%% 5. Verify unfetched count shows the 50 new messages
C = "reconnect-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
peer:join(Bob, C),
peer:set_nick(Bob, "Bob"),
%% Phase 1: Initial messages and sync
InitialMessages = 100,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, InitialMessages)
),
timer:sleep(100),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
timer:sleep(2000),
%% Verify Bob got all initial messages (since < 100, all eager)
{ok, UnfetchedInitial} = peer:unfetched_count(Bob, C),
?debugFmt("~nInitial unfetched count: ~p~n", [UnfetchedInitial]),
?assertEqual(0, UnfetchedInitial),
%% Phase 2: Disconnect Bob
ok = peer:stop(Bob),
%% Wait for database to fully close and unregister
timer:sleep(1000),
%% Phase 3: Alice creates more messages while Bob is offline
NewMessages = 50,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Offline message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NewMessages)
),
timer:sleep(100),
%% Phase 4: Bob reconnects (reusing same storage to test persistence)
{ok, BobReconnect} = peer:start_link(ArgsBob),
peer:join(BobReconnect, C),
ok = peer:dial(BobReconnect, AliceAddr),
timer:sleep(1000),
%% Phase 5: Check unfetched count
{ok, UnfetchedAfterReconnect} = peer:unfetched_count(BobReconnect, C),
?debugFmt("~nAfter reconnect, unfetched count: ~p~n", [UnfetchedAfterReconnect]),
%% Bob should have 0 unfetched since total is 150 (100 eager, 50 from second batch)
%% But the count might show them as unfetched if not yet processed
{ok, {BobTextsAfterReconnect, _}} = peer:read(BobReconnect, C),
TotalReceived = length(BobTextsAfterReconnect),
ExpectedUnfetched = (InitialMessages + NewMessages) - TotalReceived,
?debugFmt(
"~nBob has ~p messages, should have ~p unfetched~n",
[TotalReceived, ExpectedUnfetched]
),
?assertEqual(ExpectedUnfetched, UnfetchedAfterReconnect),
ok = peer:stop(Alice),
ok = peer:stop(BobReconnect).
multiple_peers_test() ->
%% Test scenario:
%% 1. Alice creates 200 messages
%% 2. Bob and Carol both connect
%% 3. Verify both have correct unfetched counts
%% 4. Bob fetches 50 messages
%% 5. Verify Bob's count updates but Carol's doesn't change
C = "multi-peer-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
ArgsCarol = make_test_args("Carol"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
{ok, Carol} = peer:start_link(ArgsCarol),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 200 messages
NumMessages = 200,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NumMessages)
),
%% Both Bob and Carol connect
ok = peer:join(Bob, C),
ok = peer:set_nick(Bob, "Bob"),
ok = peer:join(Carol, C),
ok = peer:set_nick(Carol, "Carol"),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
ok = peer:dial(Carol, AliceAddr),
timer:sleep(2500),
%% Both should have similar unfetched counts
{ok, BobUnfetched} = peer:unfetched_count(Bob, C),
{ok, CarolUnfetched} = peer:unfetched_count(Carol, C),
?debugFmt(
"~nBob unfetched: ~p, Carol unfetched: ~p~n",
[BobUnfetched, CarolUnfetched]
),
%% Both should have ~100 unfetched (200 total - 100 eager)
?assert(BobUnfetched >= 90),
?assert(CarolUnfetched >= 90),
%% Bob fetches some history
{ok, 50} = peer:fetch_history(Bob, C, 50),
timer:sleep(1000),
%% Bob's unfetched should decrease
{ok, BobUnfetchedAfter} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob unfetched after fetch: ~p~n", [BobUnfetchedAfter]),
?assertEqual(BobUnfetched - 50, BobUnfetchedAfter),
%% Carol's should remain the same
{ok, CarolUnfetchedAfter} = peer:unfetched_count(Carol, C),
?debugFmt("~nCarol unfetched after Bob's fetch: ~p~n", [CarolUnfetchedAfter]),
?assertEqual(CarolUnfetched, CarolUnfetchedAfter),
ok = peer:stop(Alice),
ok = peer:stop(Bob),
ok = peer:stop(Carol).
incremental_fetch_test_() ->
%% Test fetching in multiple small batches
{timeout, 30, fun incremental_fetch_test_impl/0}.
incremental_fetch_test_impl() ->
C = "incremental-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 250 messages
NumMessages = 250,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NumMessages)
),
timer:sleep(100),
peer:join(Bob, C),
peer:set_nick(Bob, "Bob"),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
%% Wait for initial sync to complete (100 eager + remaining as unfetched)
timer:sleep(2000),
%% Fetch in increments of 20
FetchCount = 20,
%% 250 messages, 100 eager, 150/20 = 7.5 iterations + buffer
MaxIterations = 15,
FetchUntilEmpty = fun FetchLoop(Iteration) ->
case Iteration > MaxIterations of
true ->
?debugFmt("~nMax iterations reached~n", []),
error;
false ->
{ok, Unfetched} = peer:unfetched_count(Bob, C),
{ok, {CurrentTexts, _}} = peer:read(Bob, C),
CurrentCount = length(CurrentTexts),
?debugFmt(
"~nIteration ~p: ~p messages received, ~p unfetched~n",
[Iteration, CurrentCount, Unfetched]
),
case CurrentCount >= NumMessages of
true ->
?debugFmt(
"~nAll ~p messages received after ~p iterations~n",
[NumMessages, Iteration]
),
ok;
false when Unfetched > 0 ->
{ok, Requested} = peer:fetch_history(Bob, C, FetchCount),
?debugFmt("~nRequested ~p messages~n", [Requested]),
timer:sleep(800),
FetchLoop(Iteration + 1);
false ->
%% No unfetched but not all received - messages in flight
?debugFmt("~nWaiting for in-flight messages~n", []),
timer:sleep(800),
FetchLoop(Iteration + 1)
end
end
end,
ok = FetchUntilEmpty(1),
%% Verify all messages received
{ok, {BobTexts, _}} = peer:read(Bob, C),
?assertEqual(NumMessages, length(BobTexts)),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
empty_channel_test() ->
%% Test unfetched count on empty channel
C = "empty-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:join(Bob, C),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
timer:sleep(1000),
%% No messages, unfetched should be 0
{ok, Unfetched} = peer:unfetched_count(Bob, C),
?assertEqual(0, Unfetched),
%% Try to fetch anyway
{ok, Requested} = peer:fetch_history(Bob, C, 10),
?assertEqual(0, Requested),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
fetch_more_than_available_test() ->
%% Test requesting more messages than available
C = "overfetch-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 120 messages (100 eager + 20 unfetched)
NumMessages = 120,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NumMessages)
),
timer:sleep(100),
peer:join(Bob, C),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
timer:sleep(2000),
{ok, Unfetched} = peer:unfetched_count(Bob, C),
?debugFmt("~nUnfetched: ~p~n", [Unfetched]),
%% Request 100 but only ~20 available
{ok, Requested} = peer:fetch_history(Bob, C, 100),
?debugFmt("~nRequested ~p (asked for 100)~n", [Requested]),
?assertEqual(Unfetched, Requested),
timer:sleep(1000),
{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),
?assertEqual(0, UnfetchedAfter),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
pagination_large_backlog_test_() ->
%% Test automatic pagination with >1000 messages
%% This ensures we can handle arbitrarily large channels
{timeout, 120, fun pagination_large_backlog_test_impl/0}.
pagination_large_backlog_test_impl() ->
C = "pagination-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 1500 messages to test pagination
%% With limit of 1000, this should trigger 1 pagination request
NumMessages = 1500,
?debugFmt("~nCreating ~p messages for pagination test~n", [NumMessages]),
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NumMessages)
),
timer:sleep(200),
{ok, {AliceTexts, _}} = peer:read(Alice, C),
?assertEqual(NumMessages, length(AliceTexts)),
?debugFmt("~nAlice has all ~p messages~n", [NumMessages]),
peer:join(Bob, C),
peer:set_nick(Bob, "Bob"),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
%% Wait for initial sync + pagination to complete
%% First request: 1000 messages (100 eager + 900 unfetched)
%% Pagination should trigger automatically
%% Second request: 500 messages (100 eager + 400 unfetched)
?debugFmt("~nWaiting for initial sync and pagination...~n", []),
timer:sleep(5000),
%% Bob should have discovered all messages (100 eager from first batch + 100 from second)
%% Plus all unfetched (900 + 400)
{ok, {BobTexts, _}} = peer:read(Bob, C),
{ok, BobUnfetched} = peer:unfetched_count(Bob, C),
BobTotal = length(BobTexts) + BobUnfetched,
?debugFmt(
"~nBob has ~p messages fetched, ~p unfetched, ~p total~n",
[length(BobTexts), BobUnfetched, BobTotal]
),
%% Verify Bob discovered all messages through pagination
?assertEqual(NumMessages, BobTotal),
%% Now fetch the rest incrementally to verify unfetched tracking works
MaxIterations = 20,
FetchUntilEmpty = fun FetchLoop(Iteration) ->
case Iteration > MaxIterations of
true ->
?debugFmt("~nMax iterations reached~n", []),
error;
false ->
{ok, Unfetched} = peer:unfetched_count(Bob, C),
{ok, {CurrentTexts, _}} = peer:read(Bob, C),
CurrentCount = length(CurrentTexts),
?debugFmt(
"~nIteration ~p: ~p messages received, ~p unfetched~n",
[Iteration, CurrentCount, Unfetched]
),
case CurrentCount >= NumMessages of
true ->
?debugFmt("~nAll ~p messages received after ~p iterations~n", [
NumMessages, Iteration
]),
ok;
false when Unfetched > 0 ->
{ok, Requested} = peer:fetch_history(Bob, C, 100),
?debugFmt("~nRequested ~p messages~n", [Requested]),
timer:sleep(800),
FetchLoop(Iteration + 1);
false ->
%% No unfetched but not all received - messages in flight
?debugFmt("~nWaiting for in-flight messages~n", []),
timer:sleep(800),
FetchLoop(Iteration + 1)
end
end
end,
ok = FetchUntilEmpty(1),
%% Verify final state
{ok, {FinalTexts, _}} = peer:read(Bob, C),
{ok, FinalUnfetched} = peer:unfetched_count(Bob, C),
?debugFmt("~nFinal: ~p messages, ~p unfetched~n", [length(FinalTexts), FinalUnfetched]),
?assertEqual(NumMessages, length(FinalTexts)),
?assertEqual(0, FinalUnfetched),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
messages_during_pagination_test_() ->
%% Test scenario: New messages arrive while fetching history
%% This is very common - user scrolls up to load history while chat is active
{timeout, 60, fun messages_during_pagination_test_impl/0}.
messages_during_pagination_test_impl() ->
C = "concurrent-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Phase 1: Create 200 messages
InitialMessages = 200,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Initial ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, InitialMessages)
),
timer:sleep(100),
%% Phase 2: Bob connects
peer:join(Bob, C),
peer:set_nick(Bob, "Bob"),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
timer:sleep(2000),
%% Bob should have ~100 unfetched
{ok, UnfetchedInitial} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob initially has ~p unfetched~n", [UnfetchedInitial]),
?assert(UnfetchedInitial >= 90),
%% Phase 3: Bob starts fetching history
{ok, Requested1} = peer:fetch_history(Bob, C, 50),
?debugFmt("~nBob requested ~p messages~n", [Requested1]),
%% Phase 4: WHILE pagination is in-flight, Alice sends 10 NEW messages
NewMessages = 10,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("New message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NewMessages)
),
%% Wait for both old and new messages to arrive
timer:sleep(2000),
%% Phase 5: Verify counts are correct
{ok, {BobTexts, _}} = peer:read(Bob, C),
ActualReceived = length(BobTexts),
TotalExpected = InitialMessages + NewMessages,
?debugFmt(
"~nBob has ~p messages (expected ~p total)~n",
[ActualReceived, TotalExpected]
),
{ok, UnfetchedFinal} = peer:unfetched_count(Bob, C),
ExpectedUnfetched = TotalExpected - ActualReceived,
?debugFmt(
"~nBob has ~p unfetched (expected ~p)~n",
[UnfetchedFinal, ExpectedUnfetched]
),
%% Key assertion: unfetched count should be accurate despite concurrent activity
?assertEqual(ExpectedUnfetched, UnfetchedFinal),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
rapid_fetch_history_test() ->
{timeout, 120, fun rapid_fetch_history_test_impl/0}.
rapid_fetch_history_test_impl() ->
%% Test scenario: User spam-clicks "load more" button
%% Multiple fetch_history calls before previous ones complete
C = "rapid-fetch-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 300 messages
NumMessages = 300,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NumMessages)
),
timer:sleep(100),
peer:join(Bob, C),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
timer:sleep(2000),
{ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob has ~p unfetched before rapid fetches~n", [UnfetchedBefore]),
%% Make 3 rapid fetch_history calls WITHOUT waiting
{ok, Req1} = peer:fetch_history(Bob, C, 30),
{ok, Req2} = peer:fetch_history(Bob, C, 30),
{ok, Req3} = peer:fetch_history(Bob, C, 30),
?debugFmt("~nRapid requests: ~p, ~p, ~p~n", [Req1, Req2, Req3]),
%% All should succeed (return a count)
?assert(is_integer(Req1)),
?assert(is_integer(Req2)),
?assert(is_integer(Req3)),
%% Wait for all to complete
timer:sleep(3000),
%% Verify final state is consistent
{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),
{ok, {BobTexts, _}} = peer:read(Bob, C),
ActualReceived = length(BobTexts),
?debugFmt(
"~nAfter rapid fetches: ~p received, ~p unfetched~n",
[ActualReceived, UnfetchedAfter]
),
%% Should have received approximately 90 more messages (3 x 30)
%% But might be less if requests overlapped
%% 100 initial + at least some from fetches
?assert(ActualReceived >= 130),
?assertEqual(NumMessages - ActualReceived, UnfetchedAfter),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
multiple_channels_large_backlog_test_() ->
%% Test scenario: User joins multiple channels with large backlogs
%% Common when joining an established community
{timeout, 90, fun multiple_channels_large_backlog_test_impl/0}.
multiple_channels_large_backlog_test_impl() ->
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:set_nick(Alice, "Alice"),
%% Create 3 channels with 300 messages each
Channels = ["chan-A", "chan-B", "chan-C"],
MessagesPerChannel = 300,
lists:foreach(
fun(Chan) ->
peer:join(Alice, Chan),
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("~s msg ~p", [Chan, N])),
ok = peer:write(Alice, Chan, Msg)
end,
lists:seq(1, MessagesPerChannel)
)
end,
Channels
),
timer:sleep(200),
%% Bob joins all channels and connects
peer:set_nick(Bob, "Bob"),
lists:foreach(fun(Chan) -> peer:join(Bob, Chan) end, Channels),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
%% Wait for initial sync of all channels
timer:sleep(5000),
%% Verify each channel has correct unfetched count
VerifyChannel = fun(Chan) ->
{ok, {BobTexts, _}} = peer:read(Bob, Chan),
{ok, Unfetched} = peer:unfetched_count(Bob, Chan),
Total = length(BobTexts) + Unfetched,
?debugFmt(
"~nChannel ~s: ~p received, ~p unfetched, ~p total~n",
[Chan, length(BobTexts), Unfetched, Total]
),
%% Each channel should have discovered all messages
?assertEqual(MessagesPerChannel, Total)
end,
lists:foreach(VerifyChannel, Channels),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
peer_disconnects_during_pagination_test() ->
%% Test scenario: Peer goes offline while Bob is fetching history
%% Network issues are reality
C = "disconnect-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 200 messages
NumMessages = 200,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Message ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(1, NumMessages)
),
timer:sleep(100),
peer:join(Bob, C),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
timer:sleep(2000),
{ok, UnfetchedBefore} = peer:unfetched_count(Bob, C),
?debugFmt("~nBob has ~p unfetched before fetch~n", [UnfetchedBefore]),
%% Start fetching history
{ok, _Requested} = peer:fetch_history(Bob, C, 50),
%% IMMEDIATELY disconnect Alice (simulating network failure)
ok = peer:stop(Alice),
%% Wait to see what happens
timer:sleep(2000),
%% Bob should still be alive and have consistent state
{ok, {BobTexts, _}} = peer:read(Bob, C),
{ok, UnfetchedAfter} = peer:unfetched_count(Bob, C),
?debugFmt(
"~nAfter disconnect: ~p received, ~p unfetched~n",
[length(BobTexts), UnfetchedAfter]
),
%% Unfetched count should be consistent (might not have decreased much)
%% The key is Bob shouldn't crash or have corrupted state
?assert(is_integer(UnfetchedAfter)),
?assertEqual(NumMessages - length(BobTexts), UnfetchedAfter),
ok = peer:stop(Bob).
deep_pagination_test_() ->
%% Test scenario: Channel needs more than 10 pagination rounds
%% Verify the depth limit works correctly
{timeout, 180, fun deep_pagination_test_impl/0}.
deep_pagination_test_impl() ->
C = "deep-pagination-test",
ArgsAlice = make_test_args("Alice"),
ArgsBob = make_test_args("Bob"),
{ok, Alice} = peer:start_link(ArgsAlice),
{ok, Bob} = peer:start_link(ArgsBob),
peer:join(Alice, C),
peer:set_nick(Alice, "Alice"),
%% Create 12000 messages - would need 12 pagination rounds at 1000/request
%% This exceeds the max depth of 10
NumMessages = 12000,
?debugFmt("~nCreating ~p messages for deep pagination test~n", [NumMessages]),
%% Create in batches to avoid timeout
BatchSize = 500,
NumBatches = NumMessages div BatchSize,
lists:foreach(
fun(Batch) ->
Start = (Batch - 1) * BatchSize + 1,
End = Batch * BatchSize,
lists:foreach(
fun(N) ->
Msg = lists:flatten(io_lib:format("Msg ~p", [N])),
ok = peer:write(Alice, C, Msg)
end,
lists:seq(Start, End)
),
?debugFmt("~nCreated batch ~p/~p~n", [Batch, NumBatches])
end,
lists:seq(1, NumBatches)
),
timer:sleep(500),
%% Verify Alice has all messages
{ok, {AliceTexts, _}} = peer:read(Alice, C),
?assertEqual(NumMessages, length(AliceTexts)),
?debugFmt("~nAlice has all ~p messages~n", [NumMessages]),
%% Bob connects
peer:join(Bob, C),
peer:set_nick(Bob, "Bob"),
{ok, AliceAddr} = peer:node_addr(Alice),
ok = peer:dial(Bob, AliceAddr),
%% Wait for automatic pagination to complete (or stop at depth limit)
?debugFmt("~nWaiting for automatic pagination...~n", []),
timer:sleep(15000),
%% Check what Bob discovered
{ok, {BobTexts, _}} = peer:read(Bob, C),
{ok, BobUnfetched} = peer:unfetched_count(Bob, C),
BobTotal = length(BobTexts) + BobUnfetched,
?debugFmt(
"~nBob discovered ~p total messages (~p fetched, ~p unfetched)~n",
[BobTotal, length(BobTexts), BobUnfetched]
),
%% With depth limit of 10, Bob should discover at most 10 * 1000 = 10000 messages
%% (Actually 10 requests: 1st + 9 pagination rounds due to depth starting at 1)
%% But implementation might differ, so let's check it stopped gracefully
?assert(BobTotal =< 10000),
?debugFmt(
"~nDeep pagination stopped at ~p messages (depth limit working)~n",
[BobTotal]
),
ok = peer:stop(Alice),
ok = peer:stop(Bob).
%%%%%
%% helpers
%%%%%
make_test_args(Name) ->
S = string:chomp(os:cmd("mktemp --tmpdir -d caberl-peer-XXXXX")),
?debugFmt("~nUsing Storage for ~s: ~p~n", [Name, S]),
[
{nickname, Name},
{listener, ["localhost", 0]},
{storage, S}
].