-module(cabal).
-behavior(gen_server).
-export([start_link/1, stop/1]).
-export([node_addr/1, node_public_key/1, dial/2, dial/3, peer_list/1]).
-export([channels_joined/1, channels_known/1, channel_members/2, join/2, leave/2]).
-export([set_topic/3, set_nick/2, read/2, write/3]).
-export([request_channel_list/2, peer_channel_list/2]).
-export([fetch_history/3, fetch_history/4, unfetched_count/2, await_writes/3]).
-export([persistent_peer_add/2, persistent_peer_remove/2, persistent_peer_list/1]).
-export([register_event_handler/3, unregister_event_handler/2]).
-export([get_db/1, update_peer_events_pid/2]).
-export([default_caberl_location/0, create_or_load_transport_keypair/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
-export([handle_peer_messages/3, handle_channel_messages/3, handle_database_messages/3]).
-export([handle_network_messages/3, handle_system_messages/3]).
start_link(Args) ->
gen_server:start_link(cabal, Args, []).
stop(Pid) ->
gen_server:cast(Pid, stop).
get_db(Pid) ->
gen_server:call(Pid, {get_db}).
update_peer_events_pid(Pid, PeerEventsPid) ->
gen_server:call(Pid, {update_peer_events_pid, PeerEventsPid}).
-record(state, {
db,
keyPair,
transportPid,
transportSup,
peerEventsPid,
listenAddr,
peers = #{},
channels = #{},
activeOut = #{},
activeIn = #{},
peerChannelLists = #{}
}).
dial(Pid, {Host, Port}) ->
gen_server:cast(Pid, {peerDial, {Host, Port}});
dial(Pid, Addr) when is_list(Addr) ->
[HostStr, PortStr] = string:split(Addr, ":"),
{ok, Host} = inet:getaddr(HostStr, inet),
Port = list_to_integer(PortStr),
gen_server:cast(Pid, {peerDial, {Host, Port}}).
dial(Pid, Addr, #{await_sync := true, timeout := Timeout}) ->
case Addr of
{Host, Port} ->
gen_server:cast(Pid, {peerDial, {Host, Port}});
AddrStr when is_list(AddrStr) ->
[HostStr, PortStr] = string:split(AddrStr, ":"),
{ok, Host} = inet:getaddr(HostStr, inet),
Port = list_to_integer(PortStr),
gen_server:cast(Pid, {peerDial, {Host, Port}})
end,
poll_for_connection(Pid, Timeout);
dial(Pid, Addr, Opts) ->
case maps:get(await_sync, Opts, false) of
false -> dial(Pid, Addr);
true -> dial(Pid, Addr, Opts#{timeout => maps:get(timeout, Opts, 5000)})
end.
node_addr(Pid) ->
gen_server:call(Pid, {nodeAddr}).
node_public_key(Pid) ->
gen_server:call(Pid, {nodePubKey}).
peer_list(Pid) ->
gen_server:call(Pid, {peerList}).
request_channel_list(Pid, Peer) ->
gen_server:cast(Pid, {requestChannelList, Peer}).
peer_channel_list(Pid, Peer) ->
gen_server:call(Pid, {peerChannelList, Peer}).
fetch_history(Pid, Channel, Limit) ->
gen_server:call(Pid, {fetchHistory, Channel, Limit}).
fetch_history(Pid, Channel, Limit, #{sync := true, timeout := Timeout}) ->
StartTime = erlang:monotonic_time(millisecond),
{ok, InitialUnfetched} = unfetched_count(Pid, Channel),
{ok, RequestedCount} = fetch_history(Pid, Channel, Limit),
poll_for_fetch_completion(Pid, Channel, InitialUnfetched, RequestedCount, StartTime, Timeout);
fetch_history(Pid, Channel, Limit, _Opts) ->
fetch_history(Pid, Channel, Limit).
unfetched_count(Pid, Channel) ->
gen_server:call(Pid, {unfetchedCount, Channel}).
join(Pid, Chan) when is_list(Chan) ->
gen_server:cast(Pid, {channelsJoin, Chan}).
leave(Pid, Chan) when is_list(Chan) ->
gen_server:cast(Pid, {channelsLeave, Chan}).
channels_joined(Pid) ->
gen_server:call(Pid, {channelsList}).
channels_known(Pid) ->
gen_server:call(Pid, {channelsKnown}).
channel_members(Pid, Chan) ->
gen_server:call(Pid, {channelsMembers, Chan}).
set_topic(Pid, Chan, Topic) ->
gen_server:call(Pid, {channelsSetTopic, Chan, Topic}).
set_nick(Pid, Nick) ->
gen_server:call(Pid, {setOwnNick, Nick}).
read(Pid, Chan) ->
gen_server:call(Pid, {readTextsFromChannel, Chan}).
write(Pid, Chan, Text) ->
gen_server:call(Pid, {writeTextToChannel, Chan, Text}).
await_writes(Pid, Channel, Timeout) ->
{ok, {InitialTexts, _}} = read(Pid, Channel),
InitialCount = length(InitialTexts),
{ok, _} = channels_joined(Pid),
StartTime = erlang:monotonic_time(millisecond),
poll_for_writes_complete(Pid, Channel, InitialCount, StartTime, Timeout).
persistent_peer_add(Pid, Address) ->
gen_server:call(Pid, {persistentPeerAdd, Address}).
persistent_peer_remove(Pid, Address) ->
gen_server:call(Pid, {persistentPeerRemove, Address}).
persistent_peer_list(Pid) ->
gen_server:call(Pid, {persistentPeerList}).
register_event_handler(Pid, HandlerPid, IntervalMs) when IntervalMs >= 100 ->
gen_server:call(Pid, {registerEventHandler, HandlerPid, IntervalMs}).
unregister_event_handler(Pid, HandlerPid) ->
gen_server:call(Pid, {unregisterEventHandler, HandlerPid}).
init(Args) ->
process_flag(trap_exit, true),
StorePath = proplists:get_value(storage, Args),
Kp = create_or_load_keypair(StorePath),
{ok, Db} = cabal_db:open(StorePath),
{ok, Chans} = cabal_db:channels_list(Db),
TransportSup = proplists:get_value(transport_sup, Args),
{ok, TPid} = cabal_transport_sup:get_transport_pid(TransportSup),
erlang:monitor(process, TPid),
ok = cabal_transport:register_handler(TPid, self()),
{ok, {LisAddr, Port}} = cabal_transport:get_address(TPid),
io:format("[Peer] Listening on ~p:~p~n", [LisAddr, Port]),
io:format("[Peer] Started, transport=~p (cabal_peer_events pending)~n", [TPid]),
State = #state{
db = Db,
keyPair = Kp,
transportPid = TPid,
transportSup = TransportSup,
peerEventsPid = undefined,
listenAddr = {LisAddr, Port},
channels = maps:from_keys(Chans, [])
},
{ok, State}.
handle_call({readTextsFromChannel, Chan}, _From, State = #state{db = Db}) ->
{ok, Texts} = cabal_db:get_texts_for_channel(Db, Chan),
{reply, {ok, Texts}, State};
handle_call({writeTextToChannel, Chan, Text}, _From, State) ->
NewState = handle_database_messages(writeTextToChannel, {Chan, Text}, State),
{reply, ok, NewState};
handle_call({nodeAddr}, _From, State = #state{listenAddr = LisAddr}) ->
{reply, {ok, LisAddr}, State};
handle_call({nodePubKey}, _From, State = #state{keyPair = KeyPair}) ->
PubKey = maps:get(public, KeyPair),
{reply, {ok, PubKey}, State};
handle_call({peerList}, _From, State = #state{peers = Peers}) ->
EachPeer = fun(PeerId, PeerMeta, Acc) ->
Acc ++ [{PeerId, PeerMeta}]
end,
PeerList = maps:fold(EachPeer, [], Peers),
{reply, PeerList, State};
handle_call({channelsList}, _From, State = #state{channels = Chans}) ->
{reply, {ok, maps:keys(Chans)}, State};
handle_call({channelsKnown}, _From, State = #state{db = Db}) ->
Result = cabal_db:channels_known(Db),
{reply, Result, State};
handle_call({channelsMembers, Chan}, _From, State = #state{channels = Chans, db = Db}) ->
case maps:is_key(Chan, Chans) of
false ->
{reply, {error, notInChannel}, State};
true ->
Result = cabal_db:get_channel_members(Db, Chan),
{reply, Result, State}
end;
handle_call({channelsSetTopic, Chan, Topic}, _From, State = #state{channels = Chans}) ->
case maps:is_key(Chan, Chans) of
false ->
{reply, {error, notInChannel}, State};
true ->
NewState = handle_channel_messages(channelsSetTopic, {Chan, Topic}, State),
{reply, ok, NewState}
end;
handle_call({setOwnNick, Nick}, _From, State) ->
NewState = handle_channel_messages(setOwnNick, {Nick}, State),
{reply, ok, NewState};
handle_call({peerChannelList, Peer}, _From, State = #state{peerChannelLists = Lists}) ->
Result =
case maps:get(Peer, Lists, undefined) of
undefined -> {ok, []};
Channels -> {ok, Channels}
end,
{reply, Result, State};
handle_call({fetchHistory, Channel, Limit}, _From, State) ->
{Reply, NewState} = handle_system_messages(fetchHistory, {Channel, Limit}, State),
{reply, Reply, NewState};
handle_call({unfetchedCount, Channel}, _From, State = #state{db = Db}) ->
Result = cabal_db:unfetched_count(Db, Channel),
{reply, Result, State};
handle_call({persistentPeerAdd, Address}, _From, State = #state{db = Db}) ->
Result = cabal_db:peer_add(Db, Address),
{reply, Result, State};
handle_call({persistentPeerRemove, Address}, _From, State = #state{db = Db}) ->
Result = cabal_db:peer_delete(Db, Address),
{reply, Result, State};
handle_call({persistentPeerList}, _From, State = #state{db = Db}) ->
Result = cabal_db:peer_list(Db),
{reply, Result, State};
handle_call(
{registerEventHandler, HandlerPid, IntervalMs},
_From,
State = #state{peerEventsPid = PeerEventsPid}
) ->
Result = cabal_peer_events:register_event_handler(PeerEventsPid, HandlerPid, IntervalMs),
{reply, Result, State};
handle_call(
{unregisterEventHandler, HandlerPid}, _From, State = #state{peerEventsPid = PeerEventsPid}
) ->
Result = cabal_peer_events:unregister_event_handler(PeerEventsPid, HandlerPid),
{reply, Result, State};
handle_call({get_peers}, _From, State = #state{peers = Peers}) ->
{reply, {ok, Peers}, State};
handle_call({get_db}, _From, State = #state{db = Db}) ->
{reply, {ok, Db}, State};
handle_call({update_peer_events_pid, PeerEventsPid}, _From, State) ->
io:format("[Peer] Updated cabal_peer_events PID to ~p~n", [PeerEventsPid]),
link(PeerEventsPid),
{reply, ok, State#state{peerEventsPid = PeerEventsPid}}.
handle_cast({peerDial, {Host, Port}}, State = #state{transportPid = TPid}) ->
cabal_transport:dial(TPid, Host, Port),
{noreply, State};
handle_cast({requestChannelList, Peer}, State) ->
NewState = handle_peer_messages(requestChannelList, {Peer}, State),
{noreply, NewState};
handle_cast({channelsJoin, Chan}, State) ->
io:format("[Chan] joining ~p~n", [Chan]),
NewState = handle_channel_messages(channelsJoin, {Chan}, State),
{noreply, NewState};
handle_cast({channelsLeave, Chan}, State) ->
io:format("[Chan] leaving ~p~n", [Chan]),
NewState = handle_channel_messages(channelsLeave, {Chan}, State),
{noreply, NewState};
handle_cast(stop, State = #state{db = Db}) ->
ok = cabal_db:close(Db),
{stop, normal, State};
handle_cast(Msg, State) ->
io:format("Unexpected cast: ~p~n", [Msg]),
{noreply, State}.
handle_info({peerNew, ConnPid, PeerAddr}, State) ->
NewState = handle_peer_messages(peerNew, {ConnPid, PeerAddr}, State),
{noreply, NewState};
handle_info({peerLost, ConnPid, PeerAddr}, State) ->
NewState = handle_peer_messages(peerLost, {ConnPid, PeerAddr}, State),
{noreply, NewState};
handle_info({peerData, ConnPid, Data}, State) ->
NewState = handle_peer_messages(peerData, {ConnPid, Data}, State),
{noreply, NewState};
handle_info({incomingMsg, Peer, Msg, MsgSize}, State) ->
NewState = handle_network_messages(incomingMsg, {Peer, Msg, MsgSize}, State),
{noreply, NewState};
handle_info(
{'EXIT', Pid, Reason},
State = #state{db = Db, peerEventsPid = PeerEventsPid}
) ->
case Pid of
Db ->
io:format("[Peer] Database process died: ~p~n", [Reason]),
{stop, {database_died, Reason}, State};
_ when Pid =:= PeerEventsPid andalso PeerEventsPid =/= undefined ->
case Reason of
shutdown ->
{noreply, State};
normal ->
{noreply, State};
_ ->
io:format("[Peer] Peer events process died: ~p~n", [Reason]),
{stop, {cabal_peer_events_died, Reason}, State}
end;
_ when Pid =:= State#state.transportPid ->
io:format("[Peer] Transport process died: ~p. Attempting recovery...~n", [Reason]),
TransportSup = State#state.transportSup,
timer:sleep(200),
case cabal_transport_sup:get_transport_pid(TransportSup) of
{ok, NewTPid} ->
io:format("[Peer] Reacquired transport PID: ~p~n", [NewTPid]),
erlang:monitor(process, NewTPid),
ok = cabal_transport:register_handler(NewTPid, self()),
{noreply, State#state{transportPid = NewTPid}};
{error, _} ->
io:format("[Peer] Failed to reacquire transport PID! Stopping.~n"),
{stop, transport_died_permanently, State}
end;
_ ->
io:format("[Peer] Linked process ~p died: ~p~n", [Pid, Reason]),
{noreply, State}
end;
handle_info(Msg, State) ->
io:format("Unexpected message: ~p~n", [Msg]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_system_messages(nodePubKey, {From}, State = #state{keyPair = KeyPair}) ->
PubKey = maps:get(public, KeyPair),
gen_server:reply(From, {ok, PubKey}),
State;
handle_system_messages(peerList, {From}, State = #state{peers = Peers}) ->
EachPeer = fun(PeerId, PeerMeta, Acc) ->
Acc ++ [{PeerId, PeerMeta}]
end,
PeerList = maps:fold(EachPeer, [], Peers),
gen_server:reply(From, PeerList),
State;
handle_system_messages(channelsList, {From}, State = #state{channels = Chans}) ->
gen_server:reply(From, {ok, maps:keys(Chans)}),
State;
handle_system_messages(channelsKnown, {From}, State = #state{db = Db}) ->
Result = cabal_db:channels_known(Db),
gen_server:reply(From, Result),
State;
handle_system_messages(peerChannelList, {Peer, From}, State = #state{peerChannelLists = Lists}) ->
Result =
case maps:get(Peer, Lists, undefined) of
undefined -> {ok, []};
Channels -> {ok, Channels}
end,
gen_server:reply(From, Result),
State;
handle_system_messages(
fetchHistory, {Channel, Limit}, State = #state{db = Db, peers = Peers}
) ->
{ok, UnfetchedHashes} = cabal_db:unfetched_get(Db, Channel, Limit),
case UnfetchedHashes of
[] ->
{{ok, 0}, State};
_ ->
PeerPids = maps:keys(Peers),
case PeerPids of
[] ->
{{error, no_connected_peers}, State};
_ ->
ConnPid = lists:nth(rand:uniform(length(PeerPids)), PeerPids),
{ok, {PostReqId, _PostReqMsg, SentSize}} = send_post_request(
State, ConnPid, UnfetchedHashes
),
ActiveOut = State#state.activeOut,
UpdatedOut = maps:put(PostReqId, {ConnPid, post_request}, ActiveOut),
NewState = State#state{
activeOut = UpdatedOut,
peers = update_peer_sent(Peers, ConnPid, SentSize)
},
{{ok, length(UnfetchedHashes)}, NewState}
end
end;
handle_system_messages(unfetchedCount, {Channel, From}, State = #state{db = Db}) ->
Result = cabal_db:unfetched_count(Db, Channel),
gen_server:reply(From, Result),
State;
handle_system_messages(stateChange, {Chan}, State) ->
io:format("[DEBUG] state change of ~p~n", [Chan]),
State;
handle_system_messages(Message, Args, State) ->
io:format("Unhandled system message: ~p with args ~p~n", [Message, Args]),
State.
handle_peer_messages(
peerData,
{ConnPid, Data},
State
) ->
{ok, Msgs} = cabal_wire:split_messages(Data),
EachMsg = fun(Chunk) ->
{ok, Msg} = cabal_wire:decode(Chunk),
MsgSize = byte_size(Chunk),
self() ! {incomingMsg, ConnPid, Msg, MsgSize}
end,
lists:foreach(EachMsg, Msgs),
State;
handle_peer_messages(
peerLost,
{ConnPid, PeerAddr},
State = #state{peers = Peers, activeIn = ActiveIn, activeOut = ActiveOut, channels = Chans}
) ->
NormalizedAddr = format_address(PeerAddr),
case maps:is_key(ConnPid, Peers) of
false ->
io:format("[Peer] Lost connection ~p to unknown peer: ~p~n", [ConnPid, NormalizedAddr]),
State;
true ->
io:format("[Peer:~p] lost connection to ~p~n", [ConnPid, NormalizedAddr]),
{
#{
lastSeenAt := LastSeen,
bytesTransmitted := {Sent, Received}
},
NewPeers
} = maps:take(ConnPid, Peers),
io:format(
"[Peer:~p->~p] LastSeen: ~p | Sent: ~p | Received: ~p~n",
[ConnPid, NormalizedAddr, LastSeen, human_bytesize(Sent), human_bytesize(Received)]
),
NewActiveOut = maps:filter(
fun(_ReqId, {ReqConnPid, _Msg}) ->
ReqConnPid =/= ConnPid
end,
ActiveOut
),
RemovedOutCount = maps:size(ActiveOut) - maps:size(NewActiveOut),
NewActiveIn = maps:filter(
fun(_ReqId, {ReqConnPid, _Msg}) ->
ReqConnPid =/= ConnPid
end,
ActiveIn
),
RemovedInCount = maps:size(ActiveIn) - maps:size(NewActiveIn),
CleanupChannelReqs = fun(_ChanName, ChanReqs) ->
FilteredReqs = lists:filter(
fun({_Direction, _ReqId, ReqConnPid}) ->
ReqConnPid =/= ConnPid
end,
ChanReqs
),
{true, FilteredReqs}
end,
NewChans = maps:filtermap(CleanupChannelReqs, Chans),
io:format(
"[Peer:~p] Cleaned up ~p outgoing and ~p incoming requests~n",
[ConnPid, RemovedOutCount, RemovedInCount]
),
Db = State#state.db,
case cabal_db:peer_get(Db, NormalizedAddr) of
{ok, PeerInfo} ->
CurrentScore = maps:get(score, PeerInfo, 0),
Now = os:system_time(millisecond),
cabal_db:peer_update(Db, NormalizedAddr, [
{score, max(0, CurrentScore - 5)},
{last_seen, Now}
]),
io:format("[Peer] Updated persistent peer ~s: score -5~n", [NormalizedAddr]);
not_found ->
ok
end,
State#state{
peers = NewPeers,
activeIn = NewActiveIn,
activeOut = NewActiveOut,
channels = NewChans
}
end;
handle_peer_messages(
peerNew,
{ConnPid, PeerAddr},
State = #state{
channels = Chans, peers = Peers, activeOut = ActiveOut
}
) ->
NormalizedAddr = format_address(PeerAddr),
EachChan = fun(Chan, {AccNewReqs, AccChannels, PeerSent}) ->
io:format("[Peer] Processing channel ~p for new connection ~p~n", [Chan, ConnPid]),
{ok, {StateReqId, StateMsg, StateSize}} = send_channel_state_request(
State, ConnPid, Chan, true
),
End = 0,
Start = os:system_time(1000) - (12 * 60 * 60 * 1000),
Limit = 1000,
{ok, {TimeReqId, TimeMsg, TimeSize}} = send_channel_time_range_request(
State, ConnPid, Chan, Start, End, Limit
),
AnnotatedTimeMsg = [TimeMsg, {pagination_limit, Limit}],
ReqsForChan =
maps:get(Chan, AccChannels, []) ++
[{sent, StateReqId, ConnPid}] ++
[{sent, TimeReqId, ConnPid}],
{
AccNewReqs ++ [{StateReqId, StateMsg}, {TimeReqId, AnnotatedTimeMsg}],
maps:update(Chan, ReqsForChan, AccChannels),
PeerSent + StateSize + TimeSize
}
end,
{NewRequests, NewChans, PeerSent} = lists:foldl(EachChan, {[], Chans, 0}, maps:keys(Chans)),
io:format("[Peer] Created ~p new requests for ~p channels~n", [
length(NewRequests), maps:size(Chans)
]),
NewOut = lists:foldl(
fun({ReqId, Msg}, AccActiveOut) ->
maps:put(ReqId, {ConnPid, Msg}, AccActiveOut)
end,
ActiveOut,
NewRequests
),
PeerMetadata = #{
address => NormalizedAddr,
lastSeenAt => os:system_time(1000),
bytesTransmitted => {PeerSent, 0}
},
NewState = State#state{
peers = maps:put(ConnPid, PeerMetadata, Peers),
channels = NewChans,
activeOut = NewOut
},
io:format("[Peer] Tracking new connection ~p to ~p with metadata ~p~n", [
ConnPid, NormalizedAddr, PeerMetadata
]),
Db = State#state.db,
case cabal_db:peer_get(Db, NormalizedAddr) of
{ok, PeerInfo} ->
CurrentScore = maps:get(score, PeerInfo, 0),
Now = os:system_time(millisecond),
cabal_db:peer_update(Db, NormalizedAddr, [
{score, CurrentScore + 10},
{last_seen, Now},
{attempt_count, 0}
]),
io:format("[Peer] Updated persistent peer ~s: score +10, reset backoff~n", [
NormalizedAddr
]);
not_found ->
ok
end,
NewState;
handle_peer_messages(
requestChannelList,
{ConnPid},
State = #state{activeOut = ActiveOut, peers = Peers}
) ->
case send_channel_list_request(State, ConnPid, 0, 100) of
{ok, {ReqId, _Msg, Size}} ->
io:format("[Peer] Sent channel list request (~p) to conn ~p~n", [
hex:bin_to_hexstr(ReqId), ConnPid
]),
NewActiveOut = maps:put(ReqId, {ConnPid, channel_list_request}, ActiveOut),
NewPeers = update_peer_sent(Peers, ConnPid, Size),
State#state{activeOut = NewActiveOut, peers = NewPeers};
{error, Reason} ->
io:format("[Peer] Failed to send channel list request to conn ~p: ~p~n", [
ConnPid, Reason
]),
State
end;
handle_peer_messages(Message, Args, State) ->
io:format("Unhandled peer message: ~p with args ~p~n", [Message, Args]),
State.
handle_channel_messages(
setOwnNick,
{Nick},
State = #state{db = Db, keyPair = KeyPair, activeIn = ActiveIn, peers = Peers}
) ->
{ok, Links} = cabal_db:get_channel_heads(Db, null),
Bin = cabal_posts:encode(KeyPair, Links, {info, {name, Nick}}),
{ok, _, PostHash} = cabal_db:save_post(Db, Bin),
F = fun(ReqId, {ConnPid, [Header, _]}, AccPeers) ->
case proplists:get_value(type, Header) of
5 ->
case send_hash_response(State, ConnPid, ReqId, [PostHash]) of
{ok, {_, _, Size}} ->
update_peer_sent(AccPeers, ConnPid, Size);
{error, _Reason} ->
AccPeers
end;
_ ->
AccPeers
end
end,
SentPeers = maps:fold(F, Peers, ActiveIn),
State#state{peers = SentPeers};
handle_channel_messages(
channelsJoin,
{Chan},
State = #state{
channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeOut = ActiveOut
}
) ->
case maps:is_key(Chan, Chans) of
true ->
State;
false ->
{ok, Links} = cabal_db:get_channel_heads(Db, Chan),
Bin = cabal_posts:encode(KeyPair, Links, {join, Chan}),
{ok, _, _PostHash} = cabal_db:save_post(Db, Bin),
ok = cabal_db:channels_join(Db, Chan),
self() ! {stateChange, Chan},
EachConn = fun(ConnPid, {AccPeers, AccActiveOuts}) ->
case send_channel_state_request(State, ConnPid, Chan, true) of
{ok, {ReqId, Msg, Size}} ->
{
{sent, ReqId, ConnPid},
{
update_peer_sent(AccPeers, ConnPid, Size),
maps:put(ReqId, {ConnPid, Msg}, AccActiveOuts)
}
};
{error, _Reason} ->
{
{error, send_failed},
{AccPeers, AccActiveOuts}
}
end
end,
{Reqs, {NewPeers, NewActiveOut}} = lists:mapfoldl(
EachConn, {Peers, ActiveOut}, maps:keys(Peers)
),
ValidReqs = [R || R <- Reqs, element(1, R) =/= error],
State#state{
channels = maps:put(Chan, ValidReqs, Chans),
peers = NewPeers,
activeOut = NewActiveOut
}
end;
handle_channel_messages(
channelsLeave,
{Chan},
State = #state{
channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeOut = ActiveOut
}
) ->
case maps:is_key(Chan, Chans) of
false ->
State;
true ->
{ok, Links} = cabal_db:get_channel_heads(Db, Chan),
Bin = cabal_posts:encode(KeyPair, Links, {leave, Chan}),
{ok, _Id, _Hash} = cabal_db:save_post(Db, Bin),
ok = cabal_db:channels_leave(Db, Chan),
self() ! {stateChange, Chan},
{ChanRequests, NewChans} = maps:take(Chan, Chans),
EachReq = fun({_Direction, ReqId, ConnPid}, {AccActiveOuts, AccPeers}) ->
{{ConnPid, _}, NewActiveOuts} = maps:take(ReqId, AccActiveOuts),
case send_cancel_request(State, ConnPid, ReqId) of
{ok, {_, _, Size}} ->
{
NewActiveOuts,
update_peer_sent(AccPeers, ConnPid, Size)
};
{error, _} ->
{NewActiveOuts, AccPeers}
end
end,
{NewActiveOut, NewPeers} = lists:foldl(EachReq, {ActiveOut, Peers}, ChanRequests),
State#state{
channels = NewChans,
activeOut = NewActiveOut,
peers = NewPeers
}
end;
handle_channel_messages(
channelsSetTopic,
{Chan, Topic},
State = #state{channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeIn = ActiveIn}
) ->
{ok, Links} = cabal_db:get_channel_heads(Db, Chan),
Bin = cabal_posts:encode(KeyPair, Links, {topic, Chan, Topic}),
{ok, _, PostHash} = cabal_db:save_post(Db, Bin),
SentPeers = lists:foldl(
fun({received, ReqId, ConnPid}, AccPeers) ->
case maps:get(ReqId, ActiveIn, undefined) of
undefined ->
AccPeers;
{ConnPid, [Header, _]} ->
case proplists:get_value(type, Header) of
4 ->
case send_hash_response(State, ConnPid, ReqId, [PostHash]) of
{ok, {_, _, Size}} ->
update_peer_sent(AccPeers, ConnPid, Size);
{error, _Reason} ->
AccPeers
end;
_ ->
AccPeers
end
end
end,
Peers,
maps:get(Chan, Chans)
),
State#state{peers = SentPeers};
handle_channel_messages(channelsMembers, {From, Chan}, State = #state{channels = Chans, db = Db}) ->
case maps:is_key(Chan, Chans) of
false ->
gen_server:reply(From, {error, notInChannel});
true ->
Result = cabal_db:get_channel_members(Db, Chan),
gen_server:reply(From, Result)
end,
State;
handle_channel_messages(Message, Args, State) ->
io:format("Unhandled channel message: ~p with args ~p~n", [Message, Args]),
State.
handle_database_messages(readTextsFromChannel, {From, Chan}, State = #state{db = Db}) ->
{ok, Texts} = cabal_db:get_texts_for_channel(Db, Chan),
gen_server:reply(From, {ok, Texts}),
State;
handle_database_messages(
writeTextToChannel,
{Chan, Text},
State = #state{channels = Chans, db = Db, keyPair = KeyPair, peers = Peers, activeIn = ActiveIn}
) ->
{ok, Links} = cabal_db:get_channel_heads(Db, Chan),
Bin = cabal_posts:encode(KeyPair, Links, {text, Chan, Text}),
{ok, _, PostHash} = cabal_db:save_post(Db, Bin),
StateWithNotif = mark_channels_for_notification(State, [Chan]),
F = fun({Direction, ReqId, ConnPid}, AccPeers) ->
case Direction of
received ->
case maps:get(ReqId, ActiveIn, undefined) of
undefined ->
AccPeers;
{ConnPid, [Header, _]} ->
case proplists:get_value(msgType, Header) of
4 ->
case send_hash_response(State, ConnPid, ReqId, [PostHash]) of
{ok, {_, _, Size}} ->
update_peer_sent(AccPeers, ConnPid, Size);
{error, _Reason} ->
AccPeers
end;
_ ->
AccPeers
end
end;
sent ->
AccPeers
end
end,
SentPeers = lists:foldl(F, Peers, maps:get(Chan, Chans)),
StateWithNotif#state{peers = SentPeers};
handle_database_messages(Message, Args, State) ->
io:format("Unhandled database message: ~p with args ~p~n", [Message, Args]),
State.
handle_network_messages(incomingMsg, {ConnPid, Msg, MsgSize}, State) ->
handle_incoming_message(State, ConnPid, Msg, MsgSize);
handle_network_messages(Message, Args, State) ->
io:format("Unhandled network message: ~p with args ~p~n", [Message, Args]),
State.
handle_incoming_message(S, ConnPid, Msg, MsgSize) ->
#state{activeOut = ActiveOut, activeIn = ActiveIn, peers = Peers, channels = Chans, db = Db} =
S,
NewPeers = update_peer_received(Peers, ConnPid, MsgSize),
[Header, Body] = Msg,
[{msgType, MsgType}, _, {requestId, ReqId}] = Header,
ReqIdStr = hex:bin_to_hexstr(ReqId),
IsActiveOut = maps:is_key(ReqId, ActiveOut),
io:format(
"[Peer/Req ~p] incoming: Type:~p Size:~p\tIsActiveOut:~p~n",
[ReqIdStr, MsgType, MsgSize, IsActiveOut]
),
case IsActiveOut of
true ->
case MsgType of
0 ->
FilterUnknownPosts = fun(H) -> not cabal_db:has_post(Db, H) end,
AllHashes = proplists:get_value(hashes, Body),
NewHashes = lists:filter(FilterUnknownPosts, AllHashes),
case length(NewHashes) of
0 ->
S#state{peers = NewPeers};
_ ->
{_OrigConnPid, OrigMsg} = maps:get(
ReqId, ActiveOut, {ConnPid, undefined}
),
OrigMsgType =
case OrigMsg of
undefined ->
unknown;
[[OrigHeader, _OrigBody], {pagination_limit, _}, {depth, _}] ->
proplists:get_value(msgType, OrigHeader, unknown);
[[OrigHeader, _OrigBody], {pagination_limit, _}] ->
proplists:get_value(msgType, OrigHeader, unknown);
[OrigHeader, _OrigBody] ->
proplists:get_value(msgType, OrigHeader, unknown);
_ ->
unknown
end,
{HashesToFetch, HashesToStore, Channel} =
case OrigMsgType of
5 ->
io:format(
"[Peer] Hash response for channel_state_request: fetching all ~p hashes eagerly~n",
[length(NewHashes)]
),
{NewHashes, [], undefined};
4 ->
Chan =
case OrigMsg of
[
[_OH, OrigBody],
{pagination_limit, _},
{depth, _}
] ->
proplists:get_value(
channel, OrigBody, undefined
);
[[_OH, OrigBody], {pagination_limit, _}] ->
proplists:get_value(
channel, OrigBody, undefined
);
[_OH, OrigBody] ->
proplists:get_value(
channel, OrigBody, undefined
);
_ ->
undefined
end,
EagerCount = min(100, length(NewHashes)),
Eager = lists:sublist(NewHashes, EagerCount),
Lazy = lists:sublist(
NewHashes, EagerCount + 1, length(NewHashes)
),
io:format(
"[Peer] Hash response for channel_time_range_request (~s): fetching ~p hashes eagerly, storing ~p as unfetched~n",
[Chan, length(Eager), length(Lazy)]
),
{Eager, Lazy, Chan};
_ ->
io:format(
"[Peer] Hash response for unknown request type: fetching all ~p hashes eagerly (fallback)~n",
[length(NewHashes)]
),
{NewHashes, [], undefined}
end,
case HashesToStore of
[] ->
ok;
_ when Channel =/= undefined ->
cabal_db:unfetched_add(Db, HashesToStore, Channel, "hash_response");
_ ->
io:format(
"[Peer] Warning: Cannot store unfetched hashes without channel info~n"
),
ok
end,
{PaginationLimit, OrigRequestBody, PaginationDepth} =
case OrigMsg of
[
[_OrigHeader, OBody],
{pagination_limit, PLimit},
{depth, Depth}
] ->
{PLimit, OBody, Depth};
[[_OrigHeader, OBody], {pagination_limit, PLimit}] ->
{PLimit, OBody, 0};
[_OrigHeader, OBody] when is_list(OBody) ->
case proplists:is_defined(channel, OBody) of
true -> {undefined, OBody, 0};
false -> {undefined, undefined, 0}
end;
_ ->
{undefined, undefined, 0}
end,
MaxPaginationDepth = 10,
ShouldPaginate =
PaginationLimit =/= undefined andalso
OrigMsgType == 4 andalso
Channel =/= undefined andalso
length(AllHashes) == PaginationLimit andalso
PaginationDepth < MaxPaginationDepth,
case ShouldPaginate of
true ->
io:format(
"[Peer] Pagination: fetched ~p hashes (limit ~p), depth ~p/~p for channel ~s~n",
[
length(AllHashes),
PaginationLimit,
PaginationDepth + 1,
MaxPaginationDepth,
Channel
]
);
false when PaginationDepth >= MaxPaginationDepth ->
io:format(
"[Peer] Warning: Max pagination depth reached for channel ~s~n",
[Channel]
);
false ->
ok
end,
StateAfterPosts =
case HashesToFetch of
[] ->
S#state{peers = NewPeers};
_ ->
{ok, {PostReqId, _PostReqMsg, SentSize}} = send_post_request(
S, ConnPid, HashesToFetch
),
PostMetadata =
case ShouldPaginate of
true ->
{ConnPid, post_request, #{
pagination => true,
channel => Channel,
orig_start => proplists:get_value(
timestart, OrigRequestBody, undefined
),
limit => PaginationLimit,
depth => PaginationDepth,
conn_pid => ConnPid
}};
false ->
{ConnPid, post_request}
end,
UpdatedOut = maps:put(
PostReqId, PostMetadata, ActiveOut
),
S#state{
peers = update_peer_sent(NewPeers, ConnPid, SentSize),
activeOut = UpdatedOut
}
end,
StateAfterPosts
end;
1 ->
Posts = proplists:get_value(posts, Body),
FilterUnknownPosts = fun(Bin) ->
[NewHeader, _] = cabal_posts:decode(Bin),
H = proplists:get_value(hash, NewHeader),
HasNot = not cabal_db:has_post(Db, H),
HasNot
end,
NewPosts = lists:filter(FilterUnknownPosts, Posts),
SavedPosts = [cabal_db:save_post(Db, P) || P <- NewPosts],
FetchedHashes = [Hash || {ok, _, Hash} <- SavedPosts],
case FetchedHashes of
[] ->
ok;
_ ->
cabal_db:unfetched_delete(Db, FetchedHashes),
io:format("[Peer] Removed ~p fetched hashes from unfetched table~n", [
length(FetchedHashes)
])
end,
OrigMeta = maps:get(ReqId, ActiveOut, {ConnPid, post_request}),
StateAfterPagination =
case OrigMeta of
{_OrigConn, post_request, #{pagination := true} = PagMeta} ->
PagChannel = maps:get(channel, PagMeta),
OrigStart = maps:get(orig_start, PagMeta),
PagLimit = maps:get(limit, PagMeta),
PagDepth = maps:get(depth, PagMeta),
PagConnPid = maps:get(conn_pid, PagMeta),
{ok, OldestTs} = cabal_db:get_oldest_timestamp(Db, PagChannel),
case OldestTs of
undefined ->
io:format(
"[Peer] Warning: Cannot paginate - no timestamp found for channel ~s~n",
[PagChannel]
),
S;
_ when OldestTs =< OrigStart ->
io:format(
"[Peer] Pagination not needed - oldest message (~p) within original window (~p)~n",
[OldestTs, OrigStart]
),
S;
_ ->
TimeWindow = 12 * 60 * 60 * 1000,
NewEnd = OldestTs - 1,
NewStart = max(OrigStart, NewEnd - TimeWindow),
io:format(
"[Peer] Sending pagination request for ~s: Start=~p, End=~p (OldestTs=~p), Limit=~p, Depth=~p~n",
[
PagChannel,
NewStart,
NewEnd,
OldestTs,
PagLimit,
PagDepth + 1
]
),
{ok, {PagReqId, PagMsg, PagSize}} =
send_channel_time_range_request(
S,
PagConnPid,
PagChannel,
NewStart,
NewEnd,
PagLimit
),
NewDepth = PagDepth + 1,
AnnotatedPagMsg = [
PagMsg,
{pagination_limit, PagLimit},
{depth, NewDepth}
],
NewActiveOut = maps:put(
PagReqId, {PagConnPid, AnnotatedPagMsg}, ActiveOut
),
NewChans = maps:update_with(
PagChannel,
fun(Reqs) -> Reqs ++ [{sent, PagReqId, PagConnPid}] end,
[{sent, PagReqId, PagConnPid}],
S#state.channels
),
S#state{
activeOut = NewActiveOut,
channels = NewChans,
peers = update_peer_sent(
S#state.peers, PagConnPid, PagSize
)
}
end;
_ ->
S
end,
Channels = lists:filtermap(
fun(P) ->
[_PostHeader, PostBody] = cabal_posts:decode(P),
case proplists:get_value(channel, PostBody, undefined) of
undefined -> false;
Chan -> {true, Chan}
end
end,
NewPosts
),
NewState = mark_channels_for_notification(StateAfterPagination, Channels),
NewState#state{peers = NewPeers};
7 ->
Channels = proplists:get_value(channels, Body),
io:format(
"[Peer] Received channel list response from conn ~p: ~p channels~n",
[ConnPid, length(Channels)]
),
io:format(" Channels: ~p~n", [Channels]),
PeerChannelLists = S#state.peerChannelLists,
NewPeerChannelLists = maps:put(ConnPid, Channels, PeerChannelLists),
S#state{peers = NewPeers, peerChannelLists = NewPeerChannelLists}
end;
false ->
case MsgType of
2 ->
Hashes = proplists:get_value(hashes, Body),
LoadKnownPosts = fun(H) ->
case cabal_db:load_post(Db, H) of
notFound -> false;
{ok, [_, _, Bin]} -> {true, Bin}
end
end,
KnownPosts = lists:filtermap(LoadKnownPosts, Hashes),
{ok, {_ReqId, _Msg, Size}} = send_post_response(S, ConnPid, ReqId, KnownPosts),
S#state{peers = update_peer_sent(Peers, ConnPid, Size)};
3 ->
CancelId = proplists:get_value(cancelId, Body, <<>>),
case maps:take(CancelId, ActiveIn) of
error ->
S#state{peers = NewPeers};
{_, NewActiveIn} ->
UpdateChans = fun(_Chan, ChanReqs) ->
F = fun({received, ChanReqId, _}) ->
CancelId =/= ChanReqId
end,
UpdatedReqs = lists:filter(F, ChanReqs),
{true, UpdatedReqs}
end,
S#state{
peers = NewPeers,
channels = maps:filtermap(UpdateChans, Chans),
activeIn = NewActiveIn
}
end;
4 ->
RequestedChan = proplists:get_value(channel, Body),
case maps:take(RequestedChan, Chans) of
error ->
S#state{peers = NewPeers};
{ChanReqs, TmpChans} ->
Params = [
proplists:get_value(F, Body)
|| F <- [timestart, timeend, limit]
],
[Start, End, Limit] = Params,
{ok, Hashes} = cabal_db:get_text_hashes_by_time_range(
Db, RequestedChan, Start, End, Limit
),
case send_hash_response(S, ConnPid, ReqId, Hashes) of
{ok, {_, _, RespSize}} ->
NewChanReqs = ChanReqs ++ [{received, ReqId, ConnPid}],
S#state{
peers = update_peer_sent(NewPeers, ConnPid, RespSize),
channels = maps:put(RequestedChan, NewChanReqs, TmpChans),
activeIn = maps:put(ReqId, {ConnPid, Msg}, ActiveIn)
};
{error, _Reason} ->
S#state{peers = NewPeers}
end
end;
5 ->
RequestedChan = proplists:get_value(channel, Body),
case maps:take(RequestedChan, Chans) of
error ->
S#state{peers = NewPeers};
{ChanReqs, TmpChans} ->
{ok, Hashes} = cabal_db:get_channel_state(Db, RequestedChan),
case send_hash_response(S, ConnPid, ReqId, Hashes) of
{ok, {_, _, RespSize}} ->
NewChanReqs = ChanReqs ++ [{received, ReqId, ConnPid}],
S#state{
peers = update_peer_sent(NewPeers, ConnPid, RespSize),
channels = maps:put(RequestedChan, NewChanReqs, TmpChans),
activeIn = maps:put(ReqId, {ConnPid, Msg}, ActiveIn)
};
{error, _Reason} ->
S#state{peers = NewPeers}
end
end;
6 ->
ChanList = maps:keys(Chans),
{ok, {_, _, ResponseSize}} = send_channel_list_response(
S, ConnPid, ReqId, ChanList
),
RTxPeers = update_peer_sent(NewPeers, ConnPid, ResponseSize),
S#state{peers = RTxPeers};
Unhandled ->
io:format(
"[Warning] unhandled incoming message (Type:~p|ReqId:~p) with activeOut:false~n",
[Unhandled, ReqIdStr]
),
S#state{peers = NewPeers}
end
end.
send_cancel_request(State, ConnPid, CancelId) ->
io:format(
"[Peer] sending cancel for request ~p to conn ~p~n",
[hex:bin_to_hexstr(CancelId), ConnPid]
),
ReqId = crypto:strong_rand_bytes(4),
Header = [
{requestId, ReqId},
{circuitId, <<0, 0, 0, 0>>},
{ttl, 0}
],
Binary = cabal_wire:encode_cancel_request(Header, CancelId),
send_binary_to_peer(State, ConnPid, Binary).
send_channel_state_request(State, ConnPid, Channel, Future) ->
ReqId = crypto:strong_rand_bytes(4),
io:format(
"[Peer] sending channel state request(~p) for ~p to conn ~p~n",
[hex:bin_to_hexstr(ReqId), Channel, ConnPid]
),
Header = [
{requestId, ReqId},
{circuitId, <<0, 0, 0, 0>>},
{ttl, 3}
],
Binary = cabal_wire:encode_channel_state_request(Header, Channel, Future),
send_binary_to_peer(State, ConnPid, Binary).
send_channel_time_range_request(State, ConnPid, Channel, Start, End, Limit) ->
ReqId = crypto:strong_rand_bytes(4),
io:format(
"[Peer] sending channel time range request(~p) for ~p to conn ~p~n",
[hex:bin_to_hexstr(ReqId), Channel, ConnPid]
),
Header = [
{requestId, ReqId},
{circuitId, <<0, 0, 0, 0>>},
{ttl, 3}
],
Binary = cabal_wire:encode_channel_time_range_request(Header, Channel, Start, End, Limit),
send_binary_to_peer(State, ConnPid, Binary).
send_post_request(State, ConnPid, Hashes) ->
ReqId = crypto:strong_rand_bytes(4),
io:format(
"[Peer] sending post request(~p) for ~p hashes to conn ~p~n",
[hex:bin_to_hexstr(ReqId), length(Hashes), ConnPid]
),
Header = [
{requestId, ReqId},
{circuitId, <<0, 0, 0, 0>>},
{ttl, 3}
],
Binary = cabal_wire:encode_post_request(Header, Hashes),
send_binary_to_peer(State, ConnPid, Binary).
send_channel_list_request(State, ConnPid, Offset, Limit) ->
ReqId = crypto:strong_rand_bytes(4),
io:format(
"[Peer] sending channel list request(~p) to conn ~p (offset:~p, limit:~p)~n",
[hex:bin_to_hexstr(ReqId), ConnPid, Offset, Limit]
),
Header = [
{requestId, ReqId},
{circuitId, <<0, 0, 0, 0>>},
{ttl, 3}
],
Binary = cabal_wire:encode_channel_list_request(Header, Offset, Limit),
send_binary_to_peer(State, ConnPid, Binary).
send_channel_list_response(State, ConnPid, ReqId, Channels) ->
io:format(
"[Peer] sending ch. list response of ~p channels to conn ~p~n",
[length(Channels), ConnPid]
),
Header = [
{requestId, ReqId},
{circuitId, <<0, 0, 0, 0>>}
],
Binary = cabal_wire:encode_channel_list_response(Header, Channels),
send_binary_to_peer(State, ConnPid, Binary).
send_post_response(State, ConnPid, ReqId, Posts) ->
io:format(
"[Peer] sending post response(~p) for ~p posts to conn ~p~n",
[hex:bin_to_hexstr(ReqId), length(Posts), ConnPid]
),
Header = [
{requestId, ReqId},
{circuitId, <<0, 0, 0, 0>>}
],
Binary = cabal_wire:encode_post_response(Header, Posts),
send_binary_to_peer(State, ConnPid, Binary).
send_hash_response(State, ConnPid, ReqId, Hashes) ->
io:format(
"[Peer/Resp ~p] sending hash ~p hashes to conn ~p~n",
[hex:bin_to_hexstr(ReqId), length(Hashes), ConnPid]
),
Header = [
{requestId, ReqId},
{circuitId, <<0, 0, 0, 0>>}
],
Binary = cabal_wire:encode_hash_response(Header, Hashes),
send_binary_to_peer(State, ConnPid, Binary).
format_address({{A, B, C, D}, Port}) ->
lists:flatten(io_lib:format("~w.~w.~w.~w:~w", [A, B, C, D, Port]));
format_address(Addr) when is_list(Addr) ->
Addr.
send_binary_to_peer(State, ConnPid, Binary) ->
BinarySize = byte_size(Binary),
{MsgLen, Payload} = cabal_wire:decode_varint(Binary),
case byte_size(Payload) =:= MsgLen of
false ->
throw("unexpected message size");
true ->
{ok, Msg} = cabal_wire:decode(Payload),
TPid = State#state.transportPid,
case cabal_transport:send(TPid, ConnPid, Binary) of
ok ->
[Header, _] = Msg,
ReqId = proplists:get_value(requestId, Header),
{ok, {ReqId, Msg, BinarySize}};
{error, Reason} ->
io:format("[Peer] Failed to send message to conn ~p: ~p~n", [ConnPid, Reason]),
{error, Reason}
end
end.
update_peer_sent(Peers, ConnPid, Size) when is_map(Peers), is_integer(Size) ->
PeerMeta = maps:get(ConnPid, Peers),
{Tx, Rx} = maps:get(bytesTransmitted, PeerMeta),
NewPeers = Peers#{ConnPid => maps:update(bytesTransmitted, {Tx + Size, Rx}, PeerMeta)},
NewPeers.
update_peer_received(Peers, ConnPid, Size) when is_map(Peers), is_integer(Size) ->
case maps:take(ConnPid, Peers) of
error ->
throw({peerNotActive, ConnPid});
{PeerMeta, PeersSansPeer} ->
{Tx, Rx} = maps:get(bytesTransmitted, PeerMeta),
UpdatedMeta = PeerMeta#{
bytesTransmitted => {Tx, Rx + Size},
lastSeenAt => os:system_time(1000)
},
maps:put(ConnPid, UpdatedMeta, PeersSansPeer)
end.
default_caberl_location() ->
H = os:getenv("HOME"),
Loc = filename:join([H, ".caberl"]),
ok =
case file:make_dir(Loc) of
ok -> ok;
{error, eexist} -> ok
end,
Loc.
create_or_load_keypair(Location) ->
Fname = filename:join(Location, "secret"),
Seed =
case file:read_file(Fname) of
{ok, Content} when byte_size(Content) =:= 32 ->
Content;
{error, enoent} ->
S = crypto:strong_rand_bytes(32),
ok = file:write_file(Fname, S),
ok = file:change_mode(Fname, 8#0400),
io:format("[Cabal] Created fresh keypair~n"),
S
end,
Kp = enacl:sign_seed_keypair(Seed),
PubKeyStr = hex:bin_to_hexstr(maps:get(public, Kp)),
io:format("[Cabal] Public Key: ~s~n", [PubKeyStr]),
Kp.
create_or_load_transport_keypair(Location) ->
Fname = filename:join(Location, "transport_secret"),
case file:read_file(Fname) of
{ok, <<Secret:32/binary, Public:32/binary>>} ->
io:format("[Transport] Loaded existing Curve25519 keypair~n"),
enoise_keypair:new(dh25519, Secret, Public);
{ok, Secret} when byte_size(Secret) =:= 32 ->
io:format("[Transport] Loaded old format keypair, deriving public key~n"),
Kp = enoise_keypair:new(dh25519, Secret, undefined),
Public = enoise_keypair:pubkey(Kp),
ok = file:write_file(Fname, <<Secret/binary, Public/binary>>),
ok = file:change_mode(Fname, 8#0400),
io:format("[Transport] Upgraded keypair file to new format~n"),
Kp;
{error, enoent} ->
Kp = enoise_keypair:new(dh25519),
Secret = enoise_keypair:seckey(Kp),
Public = enoise_keypair:pubkey(Kp),
ok = file:write_file(Fname, <<Secret/binary, Public/binary>>),
ok = file:change_mode(Fname, 8#0400),
io:format("[Transport] Created fresh Curve25519 keypair~n"),
Kp
end.
human_bytesize(Size) ->
human_bytesize(Size, ["b", "kb", "mb", "gb"]).
human_bytesize(S, [_ | [_ | _] = L]) when S >= 1024 ->
human_bytesize(S / 1024, L);
human_bytesize(S, [M | _]) ->
lists:flatten(io_lib:format("~.1f~s", [float(S), M])).
mark_channels_for_notification(State = #state{peerEventsPid = PeerEventsPid}, Channels) ->
case PeerEventsPid of
undefined ->
State;
_ ->
cabal_peer_events:notify_channels(PeerEventsPid, Channels),
State
end.
poll_for_connection(Pid, Timeout) ->
StartTime = erlang:monotonic_time(millisecond),
case poll_for_peer_connection(Pid, Timeout, StartTime) of
ok ->
poll_for_initial_sync(Pid, Timeout, StartTime);
{error, Reason} ->
{error, Reason}
end.
poll_for_peer_connection(Pid, Timeout, StartTime) ->
Elapsed = erlang:monotonic_time(millisecond) - StartTime,
if
Elapsed >= Timeout ->
{error, timeout};
true ->
case peer_list(Pid) of
[] ->
timer:sleep(50),
poll_for_peer_connection(Pid, Timeout, StartTime);
[_ | _] ->
ok
end
end.
poll_for_initial_sync(Pid, Timeout, StartTime) ->
Elapsed = erlang:monotonic_time(millisecond) - StartTime,
if
Elapsed >= Timeout ->
{error, timeout};
true ->
{ok, Channels} = channels_joined(Pid),
case Channels of
[] ->
{ok, synced};
[Chan | _] ->
{ok, {Texts, _}} = read(Pid, Chan),
{ok, Unfetched} = unfetched_count(Pid, Chan),
Total = length(Texts) + Unfetched,
if
Total > 0 ->
{ok, synced};
true ->
timer:sleep(50),
poll_for_initial_sync(Pid, Timeout, StartTime)
end
end
end.
poll_for_fetch_completion(Pid, Channel, InitialUnfetched, RequestedCount, StartTime, Timeout) ->
Elapsed = erlang:monotonic_time(millisecond) - StartTime,
if
Elapsed >= Timeout ->
{error, timeout};
true ->
{ok, CurrentUnfetched} = unfetched_count(Pid, Channel),
Expected = max(0, InitialUnfetched - RequestedCount),
if
CurrentUnfetched =< Expected ->
{ok, RequestedCount};
true ->
timer:sleep(50),
poll_for_fetch_completion(
Pid, Channel, InitialUnfetched, RequestedCount, StartTime, Timeout
)
end
end.
poll_for_writes_complete(Pid, Channel, LastCount, StartTime, Timeout) ->
Elapsed = erlang:monotonic_time(millisecond) - StartTime,
if
Elapsed >= Timeout ->
{error, timeout};
true ->
{ok, {Texts, _}} = read(Pid, Channel),
CurrentCount = length(Texts),
if
CurrentCount =:= LastCount ->
timer:sleep(200),
{ok, {Texts2, _}} = read(Pid, Channel),
FinalCount = length(Texts2),
if
FinalCount =:= CurrentCount ->
ok;
true ->
timer:sleep(50),
poll_for_writes_complete(Pid, Channel, FinalCount, StartTime, Timeout)
end;
true ->
timer:sleep(50),
poll_for_writes_complete(Pid, Channel, CurrentCount, StartTime, Timeout)
end
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
human_bytesize_test() ->
?assertEqual("81.0mb", human_bytesize(84912712)),
?assertEqual("512.0b", human_bytesize(512)),
?assertEqual("1.0kb", human_bytesize(1024)).
serialize_and_load_keypair_test() ->
TestDir = string:chomp(
os:cmd("mktemp --tmpdir -d cabal-test-XXXXX")
),
Kp = create_or_load_keypair(TestDir),
Kp2 = create_or_load_keypair(TestDir),
?assertEqual(Kp, Kp2).
format_address_test() ->
?assertEqual("127.0.0.1:3113", format_address({{127, 0, 0, 1}, 3113})),
?assertEqual("192.168.1.100:8080", format_address({{192, 168, 1, 100}, 8080})),
?assertEqual("127.0.0.1:3113", format_address("127.0.0.1:3113")),
?assertEqual("localhost:3000", format_address("localhost:3000")).
-endif.