a cabal implementation in erlang
% SPDX-FileCopyrightText: 2023 Henry Bubert
%
% SPDX-License-Identifier: LGPL-2.1-or-later

-module(cabal_lazy_loading_suite).

-include_lib("eunit/include/eunit.hrl").

%% ============================================================================
%% Proposed peer API additions (to eliminate timer:sleep):
%%
%%   cabal:dial(Pid, Addr, #{await_sync => true, timeout => Ms})
%%     - Blocks until initial sync completes
%%
%%   cabal:fetch_history(Pid, Chan, N, #{sync => true, timeout => Ms})
%%     - Blocks until fetch completes, returns actual count fetched
%%
%%   cabal:await_writes(Pid, Chan, Timeout)
%%     - Blocks until write queue drains
%%
%%   cabal:subscribe(Pid, Chan, Events) / cabal:unsubscribe(Pid, Ref)
%%     - For async scenarios: {peer_event, Pid, Chan, Event}
%% ============================================================================

-define(SYNC_TIMEOUT, 5000).
-define(FETCH_TIMEOUT, 3000).

%% ============================================================================
%% Setup / Teardown
%% ============================================================================

setup_peers(Names) ->
    lists:map(
        fun(Name) ->
            Args = make_test_args(Name),
            {ok, Sup} = cabal_sup:start_link(Args),
            Pid = cabal_sup:get_peer_pid(Sup),
            cabal:set_nick(Pid, Name),
            {Sup, Pid}
        end,
        Names
    ).

teardown_peers(Peers) ->
    lists:foreach(fun({Sup, _}) -> cabal_sup:stop(Sup) end, Peers).

with_peers(Names, Fun) ->
    Peers = setup_peers(Names),
    try
        Fun(Peers)
    after
        teardown_peers(Peers)
    end.

populate_channel(Peer, Chan, N) ->
    cabal:join(Peer, Chan),
    lists:foreach(
        fun(I) ->
            Msg = lists:flatten(io_lib:format("Message ~p", [I])),
            ok = cabal:write(Peer, Chan, Msg)
        end,
        lists:seq(1, N)
    ),
    ok = cabal:await_writes(Peer, Chan, ?SYNC_TIMEOUT).

connect_peers(From, To, Chan) ->
    cabal:join(From, Chan),
    {ok, Addr} = cabal:node_addr(To),
    {ok, synced} = cabal:dial(From, Addr, #{await_sync => true, timeout => ?SYNC_TIMEOUT}).

%% ============================================================================
%% Tests
%% ============================================================================

basic_lazy_fetch_test() ->
    %% Verifies: eager fetch (100), unfetched tracking, fetch_history works
    with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
        C = "backlog-test",
        NumMessages = 150,

        populate_channel(Alice, C, NumMessages),
        ?assertEqual(NumMessages, count_messages(Alice, C)),

        connect_peers(Bob, Alice, C),

        %% Verify lazy loading state
        {ok, Unfetched} = cabal:unfetched_count(Bob, C),
        Fetched = count_messages(Bob, C),
        ?assertEqual(NumMessages, Fetched + Unfetched),
        %% At least 50 lazy, allow wiggle room
        ?assert(Unfetched >= 40),

        %% Fetch remaining via history
        {ok, Unfetched} = cabal:fetch_history(Bob, C, Unfetched, #{
            sync => true, timeout => ?FETCH_TIMEOUT
        }),

        ?assertEqual(0, element(2, cabal:unfetched_count(Bob, C))),
        ?assertEqual(NumMessages, count_messages(Bob, C))
    end).

live_messages_during_sync_test() ->
    %% Verifies: new messages arrive immediately (not lazy) after sync
    with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
        C = "live-test",

        populate_channel(Alice, C, 150),
        connect_peers(Bob, Alice, C),

        {ok, UnfetchedBefore} = cabal:unfetched_count(Bob, C),

        %% Alice posts 20 more while connected - these should arrive eagerly
        lists:foreach(
            fun(I) ->
                ok = cabal:write(Alice, C, io_lib:format("Live ~p", [I]))
            end,
            lists:seq(1, 20)
        ),
        ok = cabal:await_writes(Alice, C, ?SYNC_TIMEOUT),

        %% Unfetched should NOT increase (live msgs are eager)
        {ok, UnfetchedAfter} = cabal:unfetched_count(Bob, C),
        ?assertEqual(UnfetchedBefore, UnfetchedAfter),
        ?assertEqual(170, count_messages(Bob, C) + UnfetchedAfter)
    end).

multiple_channels_test_() ->
    {timeout, 120, fun() ->
        with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
            Channels = ["chan-A", "chan-B", "chan-C"],
            MsgsPerChan = 300,

            %% Populate all channels
            lists:foreach(fun(C) -> populate_channel(Alice, C, MsgsPerChan) end, Channels),

            %% Bob joins all, connects once
            lists:foreach(fun(C) -> cabal:join(Bob, C) end, Channels),
            {ok, Addr} = cabal:node_addr(Alice),
            {ok, synced} = cabal:dial(Bob, Addr, #{await_sync => true, timeout => 60000}),

            %% Verify each channel
            lists:foreach(
                fun(C) ->
                    {ok, Unfetched} = cabal:unfetched_count(Bob, C),
                    Fetched = count_messages(Bob, C),
                    ?assertEqual(MsgsPerChan, Fetched + Unfetched)
                end,
                Channels
            )
        end)
    end}.

disconnect_during_fetch_test_skip() ->
    %% Verifies: graceful handling when peer dies mid-fetch
    C = "disconnect-test",
    [{AliceSup, Alice}, {BobSup, Bob}] = setup_peers(["Alice", "Bob"]),

    try
        populate_channel(Alice, C, 200),
        connect_peers(Bob, Alice, C),

        %% Start async fetch, then kill Alice
        {ok, Ref} = cabal:fetch_history(Bob, C, 50, #{sync => false}),
        ok = cabal_sup:stop(AliceSup),

        %% Bob should handle this gracefully (timeout or partial)
        receive
            {fetch_complete, Ref, _} -> ok;
            {fetch_failed, Ref, _Reason} -> ok
        after 5000 -> ok
        end,

        %% Bob's state should be consistent
        {ok, Unfetched} = cabal:unfetched_count(Bob, C),
        Fetched = count_messages(Bob, C),
        ?assertEqual(200, Fetched + Unfetched)
    after
        cabal_sup:stop(BobSup)
    end.

rapid_fetch_requests_test_() ->
    %% Verifies: concurrent fetches don't corrupt state
    {timeout, 120, fun() ->
        with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
            C = "rapid-test",
            NumMessages = 300,

            populate_channel(Alice, C, NumMessages),
            connect_peers(Bob, Alice, C),

            %% Fire 3 concurrent fetches
            Refs = [cabal:fetch_history(Bob, C, 30, #{sync => false}) || _ <- lists:seq(1, 3)],

            %% Await all
            lists:foreach(
                fun({ok, Ref}) ->
                    receive
                        {fetch_complete, Ref, _} -> ok
                    after ?FETCH_TIMEOUT -> ok
                    end
                end,
                Refs
            ),

            %% State must be consistent
            {ok, Unfetched} = cabal:unfetched_count(Bob, C),
            Fetched = count_messages(Bob, C),
            ?assertEqual(NumMessages, Fetched + Unfetched),
            %% 100 eager + at least some from fetches
            ?assert(Fetched >= 130)
        end)
    end}.

deep_pagination_test_() ->
    %% Verifies: depth limit stops runaway pagination
    {timeout, 120, fun() ->
        with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
            C = "deep-test",
            NumMessages = 12000,

            %% Create in batches
            cabal:join(Alice, C),
            lists:foreach(
                fun(Batch) ->
                    Start = (Batch - 1) * 500 + 1,
                    lists:foreach(
                        fun(N) ->
                            ok = cabal:write(Alice, C, io_lib:format("M~p", [N]))
                        end,
                        lists:seq(Start, Start + 499)
                    )
                end,
                lists:seq(1, 24)
            ),
            ok = cabal:await_writes(Alice, C, 10000),

            connect_peers(Bob, Alice, C),

            %% Depth limit should cap discovery at ~10k
            {ok, Unfetched} = cabal:unfetched_count(Bob, C),
            Total = count_messages(Bob, C) + Unfetched,
            ?assert(Total =< 10000)
        end)
    end}.

%% ============================================================================
%% Helpers
%% ============================================================================

count_messages(Peer, Chan) ->
    {ok, {Texts, _}} = cabal:read(Peer, Chan),
    length(Texts).

make_test_args(Name) ->
    S = string:chomp(os:cmd("mktemp --tmpdir -d caberl-peer-XXXXX")),
    [{nickname, Name}, {listener, ["localhost", 0]}, {storage, S}].