-module(cabal_lazy_loading_suite).
-include_lib("eunit/include/eunit.hrl").
-define(SYNC_TIMEOUT, 5000).
-define(FETCH_TIMEOUT, 3000).
setup_peers(Names) ->
lists:map(
fun(Name) ->
Args = make_test_args(Name),
{ok, Sup} = cabal_sup:start_link(Args),
Pid = cabal_sup:get_peer_pid(Sup),
cabal:set_nick(Pid, Name),
{Sup, Pid}
end,
Names
).
teardown_peers(Peers) ->
lists:foreach(fun({Sup, _}) -> cabal_sup:stop(Sup) end, Peers).
with_peers(Names, Fun) ->
Peers = setup_peers(Names),
try
Fun(Peers)
after
teardown_peers(Peers)
end.
populate_channel(Peer, Chan, N) ->
cabal:join(Peer, Chan),
lists:foreach(
fun(I) ->
Msg = lists:flatten(io_lib:format("Message ~p", [I])),
ok = cabal:write(Peer, Chan, Msg)
end,
lists:seq(1, N)
),
ok = cabal:await_writes(Peer, Chan, ?SYNC_TIMEOUT).
connect_peers(From, To, Chan) ->
cabal:join(From, Chan),
{ok, Addr} = cabal:node_addr(To),
{ok, synced} = cabal:dial(From, Addr, #{await_sync => true, timeout => ?SYNC_TIMEOUT}).
basic_lazy_fetch_test() ->
with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
C = "backlog-test",
NumMessages = 150,
populate_channel(Alice, C, NumMessages),
?assertEqual(NumMessages, count_messages(Alice, C)),
connect_peers(Bob, Alice, C),
{ok, Unfetched} = cabal:unfetched_count(Bob, C),
Fetched = count_messages(Bob, C),
?assertEqual(NumMessages, Fetched + Unfetched),
?assert(Unfetched >= 40),
{ok, Unfetched} = cabal:fetch_history(Bob, C, Unfetched, #{
sync => true, timeout => ?FETCH_TIMEOUT
}),
?assertEqual(0, element(2, cabal:unfetched_count(Bob, C))),
?assertEqual(NumMessages, count_messages(Bob, C))
end).
live_messages_during_sync_test() ->
with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
C = "live-test",
populate_channel(Alice, C, 150),
connect_peers(Bob, Alice, C),
{ok, UnfetchedBefore} = cabal:unfetched_count(Bob, C),
lists:foreach(
fun(I) ->
ok = cabal:write(Alice, C, io_lib:format("Live ~p", [I]))
end,
lists:seq(1, 20)
),
ok = cabal:await_writes(Alice, C, ?SYNC_TIMEOUT),
{ok, UnfetchedAfter} = cabal:unfetched_count(Bob, C),
?assertEqual(UnfetchedBefore, UnfetchedAfter),
?assertEqual(170, count_messages(Bob, C) + UnfetchedAfter)
end).
multiple_channels_test_() ->
{timeout, 120, fun() ->
with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
Channels = ["chan-A", "chan-B", "chan-C"],
MsgsPerChan = 300,
lists:foreach(fun(C) -> populate_channel(Alice, C, MsgsPerChan) end, Channels),
lists:foreach(fun(C) -> cabal:join(Bob, C) end, Channels),
{ok, Addr} = cabal:node_addr(Alice),
{ok, synced} = cabal:dial(Bob, Addr, #{await_sync => true, timeout => 60000}),
lists:foreach(
fun(C) ->
{ok, Unfetched} = cabal:unfetched_count(Bob, C),
Fetched = count_messages(Bob, C),
?assertEqual(MsgsPerChan, Fetched + Unfetched)
end,
Channels
)
end)
end}.
disconnect_during_fetch_test_skip() ->
C = "disconnect-test",
[{AliceSup, Alice}, {BobSup, Bob}] = setup_peers(["Alice", "Bob"]),
try
populate_channel(Alice, C, 200),
connect_peers(Bob, Alice, C),
{ok, Ref} = cabal:fetch_history(Bob, C, 50, #{sync => false}),
ok = cabal_sup:stop(AliceSup),
receive
{fetch_complete, Ref, _} -> ok;
{fetch_failed, Ref, _Reason} -> ok
after 5000 -> ok
end,
{ok, Unfetched} = cabal:unfetched_count(Bob, C),
Fetched = count_messages(Bob, C),
?assertEqual(200, Fetched + Unfetched)
after
cabal_sup:stop(BobSup)
end.
rapid_fetch_requests_test_() ->
{timeout, 120, fun() ->
with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
C = "rapid-test",
NumMessages = 300,
populate_channel(Alice, C, NumMessages),
connect_peers(Bob, Alice, C),
Refs = [cabal:fetch_history(Bob, C, 30, #{sync => false}) || _ <- lists:seq(1, 3)],
lists:foreach(
fun({ok, Ref}) ->
receive
{fetch_complete, Ref, _} -> ok
after ?FETCH_TIMEOUT -> ok
end
end,
Refs
),
{ok, Unfetched} = cabal:unfetched_count(Bob, C),
Fetched = count_messages(Bob, C),
?assertEqual(NumMessages, Fetched + Unfetched),
?assert(Fetched >= 130)
end)
end}.
deep_pagination_test_() ->
{timeout, 120, fun() ->
with_peers(["Alice", "Bob"], fun([{_, Alice}, {_, Bob}]) ->
C = "deep-test",
NumMessages = 12000,
cabal:join(Alice, C),
lists:foreach(
fun(Batch) ->
Start = (Batch - 1) * 500 + 1,
lists:foreach(
fun(N) ->
ok = cabal:write(Alice, C, io_lib:format("M~p", [N]))
end,
lists:seq(Start, Start + 499)
)
end,
lists:seq(1, 24)
),
ok = cabal:await_writes(Alice, C, 10000),
connect_peers(Bob, Alice, C),
{ok, Unfetched} = cabal:unfetched_count(Bob, C),
Total = count_messages(Bob, C) + Unfetched,
?assert(Total =< 10000)
end)
end}.
count_messages(Peer, Chan) ->
{ok, {Texts, _}} = cabal:read(Peer, Chan),
length(Texts).
make_test_args(Name) ->
S = string:chomp(os:cmd("mktemp --tmpdir -d caberl-peer-XXXXX")),
[{nickname, Name}, {listener, ["localhost", 0]}, {storage, S}].