CBHKQGLDCAH2E4ZNACITBSMADOKPERFCWQPUGMH7UN5TLJXLYI4QC M4TNRFRPHEH6T673JAMJ3CHABASCWMAJVU57HH2XEMDJCB3QPT5QC QHK3BXHEC6IAAI7HK76Y7W3IQQIEFH7TD7AYMBCLOE7U2L7BT2LQC R4JDMB7LL3FLA4NJEAV2DQEXII5XS5KIMG3H4YS5P6W7ZZUE7FIQC RZB6HZ2NI5PIUIWQL63CSKXZGGUG4L6XCSCGR3TI723OO57NDT3AC YWCRGWVDIMCPXBQFM23MKCYZCXJJSD733NJEPB6WUU4G4BSREILAC DZZ4B3UGIYTN3OHAKS2HNCLK7KM2ZSHPZ4JC6YVQL6I2H4KCD5PAC 755UGKECZ3PFYEA2TFFOUZTF27CRQTBZWO7UYFU6WQDJMZDBVPRAC 7CB7WD2VQGU2ZZV3CWXAOFLPM5SY6RADIXRSFLUVXKQXC5472UOAC YDZ3R5NY7II7WX4Q5O2UQ72TMCABDPPRLEMTTIBH5DGKOLAOPOPAC OJ6KWAG7XUCYNQ6T3FRJK2QOP7DBO26PZEWDVH3Q2MJKIBMNWRPQC 55WLMLEEVBRSTAFRZ5RGF7TOGUF5OPVCPA2TMHAQK45OUO7PA3YQC IHBNW3GI2XB6KAWUYRLL6KDOBUNUOU3N7RRLS6BFNW6SE7ZDHLWQC ULS4X3VORQZFBSNPXN76UFH2PWS2MVBA64CJQS3IR4YUHIKEF6GQC MONVI5STEDKY5ALVMEXJJXDUX6XQRKTFLP7BBNOQML3VSJEM2JAAC file:make_dir("test-dbs"), %% TODO: better cleanup{ok, Db} = db:start_link("test-dbs/" ++ Name ++ ".db"),
%% TODO: cleanup on successTestDb = string:chomp(os:cmd("mktemp --tmpdir -d " ++ Name ++ "-XXXXX")),?debugFmt("Using DB Path: ~p~n", [TestDb]),{ok, Db} = db:start_link(TestDb),
-export([encode_post_request/2, encode_cancel_request/2, encode_channel_state_request/3, encode_channel_list_response/2]).
-export([encode_post_request/2, encode_cancel_request/2,encode_channel_time_trange_request/5,encode_channel_state_request/3,encode_channel_list_request/3, encode_hash_response/2,encode_post_response/2, encode_channel_list_response/2]).
encode_hash_response(Header, Hashes) when is_list(Hashes) ->[ {requestId, RequestId}, {circuitId, CircuitId}] = Header,length_encode_fields([ <<0>>, CircuitId, RequestId, encode_varint(length(Hashes)), Hashes]).encode_post_response(Header, Posts) when is_list(Posts) ->[ {requestId, RequestId}, {circuitId, CircuitId}] = Header,PostWithLen = fun(Post) ->PostLen = encode_varint(byte_size(Post)),[PostLen, Post]end,PostsBin = lists:map(PostWithLen, Posts),length_encode_fields([ <<1>>, CircuitId, RequestId, PostsBin, 0]).
[{requestId, RequestId}, {circuitId, CircuitId}, {ttl, TTL}] = Header,TTLBin = encode_varint(TTL),HashCount = encode_varint(length(Hashes)),Msg = [<<2>>, CircuitId, RequestId, TTLBin, HashCount] ++ Hashes,length_encode_fields(Msg).
[ {requestId, RequestId}, {circuitId, CircuitId}, {ttl, TTL}] = Header,length_encode_fields([ <<2>>, CircuitId, RequestId, encode_varint(TTL), encode_varint(length(Hashes)), Hashes]).
[{requestId, RequestId}, {circuitId, CircuitId}, {ttl, TTL}] = Header,TTLBin = encode_varint(TTL),length_encode_fields([<<3>>, CircuitId, RequestId, TTLBin,CancelId]).
[ {requestId, RequestId}, {circuitId, CircuitId}, {ttl, TTL}] = Header,length_encode_fields([ <<3>>, CircuitId, RequestId, encode_varint(TTL), CancelId]).
encode_channel_state_request(Header, Channel, Future) when is_list(Channel), is_boolean(Future) ->[{requestId, RequestId}, {circuitId, CircuitId}, {ttl, TTL}] = Header,ChannelLen = encode_varint(length(Channel)),
encode_channel_time_trange_request(Header, Channel, Start, End, Limit) ->[ {requestId, RequestId}, {circuitId, CircuitId}, {ttl, TTL}] = Header,length_encode_fields([ <<4>>, CircuitId, RequestId, encode_varint(TTL), encode_varint(length(Channel)), Channel, encode_varint(Start), encode_varint(End), encode_varint(Limit)]).encode_channel_state_request(Header, Channel, Future) ->[ {requestId, RequestId}, {circuitId, CircuitId}, {ttl, TTL}] = Header,
FutureBin = encode_bool(Future),TTLbin = encode_varint(TTL),length_encode_fields([<<5>>, CircuitId, RequestId, TTLbin,ChannelLen, ChannelBin, FutureBin]).
length_encode_fields([ <<5>>, CircuitId, RequestId, encode_varint(TTL), encode_varint(byte_size(ChannelBin)), ChannelBin, encode_bool(Future)]).
[{requestId, RequestId}, {circuitId, CircuitId}] = Header,ChannelsBin = lists:map(fun(Chan) ->BinChan = unicode:characters_to_binary(Chan),ChanLen = encode_varint(byte_size(BinChan)),[ChanLen, BinChan]end, Channels),length_encode_fields([<<7>>, CircuitId, RequestId, ChannelsBin, 0]).
[ {requestId, RequestId}, {circuitId, CircuitId}] = Header,WithLen = fun(Chan) ->BinChan = unicode:characters_to_binary(Chan),ChanLen = encode_varint(byte_size(BinChan)),[ChanLen, BinChan]end,ChannelsBin = lists:map(WithLen, Channels),length_encode_fields([ <<7>>, CircuitId, RequestId, ChannelsBin, 0]).
Db = db:start_link(cabal),State = #state{db=Db},% TODO: supervise these processes
%% open persistence layerStorePath = proplists:get_value(storage, Args),Kp = create_or_load_keypair(StorePath),{ok, Db} = db:start_link(StorePath),%% restore active channels from db{ok, Chans} = db:channelsList(Db),%% construct state for event loopState = #state{ db=Db, keyPair=Kp, channels = maps:from_keys(Chans, [])},%% start event loop and incoming socket handler%% TODO: supervise these processes
io:format("[Peer:~p] lost | LastSeen: ~p | Sent: ~p | Received: ~p~n", [Peer, LastSeen, human_bytesize(Sent), human_bytesize(Received)]),
io:format("[Peer:~p] lost | LastSeen: ~p | Sent: ~p | Received: ~p~n",[Peer, LastSeen, human_bytesize(Sent), human_bytesize(Received)]),
[Header, Body] = Msg,[{msgType, MsgType}, _, {requestId, ReqId}] = Header,ReqIdStr = hex:bin_to_hexstr(ReqId),IsActiveOut = maps:is_key(ReqId, ActiveOut),io:format("[DEBUG] new message Type:~p - IsActiveOut: ~p - Body:~p~n", [MsgType, IsActiveOut, Body]),case IsActiveOut oftrue ->io:format("[Peer:~p] Received reply to ~p!~n", [Peer, ReqIdStr]),case MsgType of0 ->%% TODO: currently we we request all the hashes all the time.%% this needs to be deduped and spread out across conns.Hashes = proplists:get_value(hashes, Body),{ok, {PostReqId, _PostReqMsg, Size}} = send_post_request(Peer, Hashes),UpdatedOut = maps:put(PostReqId, Peer, ActiveOut),%% update loop stateevent_loop(State#state{peers = update_peer_sent(Peers, Peer, Size),activeOut = UpdatedOut});1 ->Posts = proplists:get_value(posts, Body),[io:format("[TEMP] got post reply:~n~p~n", [posts:decode(P)]) || P <- Posts],event_loop(State)end;false -> %% incoming requestcase MsgType of%% 0, 1 and 7 are responses [hash, post, channel list]
NewState = handle_incoming_message(State, Peer, Msg, MsgSize),event_loop(NewState)end,%% TODO: sup setupthrow({eventLoopExited}).
3 -> % cancel reqCancelId = proplists:get_value(cancelId, Body, <<>>),case maps:take(CancelId, ActiveIn) oferror -> event_loop(State); % ignore{_, NewActiveIn} ->UpdateChans = fun(_Chan, ChanReqs) ->%% we might need to put the received atom in a variable. not sure if this pattern matching works like i think it does.UpdatedReqs = lists:filter(fun({received, ChanReqId, _}) -> CancelId =/= ChanReqId end, ChanReqs),{true, UpdatedReqs}end,event_loop(State#state{peers = update_peer_received(Peers, Peer, MsgSize),channels = maps:filtermap(UpdateChans, Chans),activeIn = NewActiveIn})end;
handle_incoming_message(S, Peer, Msg, MsgSize) ->#state{activeOut = ActiveOut, activeIn = ActiveIn, peers = Peers, channels = Chans, db = Db} = S,[Header, Body] = Msg,[{msgType, MsgType}, _, {requestId, ReqId}] = Header,ReqIdStr = hex:bin_to_hexstr(ReqId),IsActiveOut = maps:is_key(ReqId, ActiveOut),io:format("[DEBUG] new message Type:~p - IsActiveOut: ~p - Body:~p~n", [MsgType, IsActiveOut, Body]),case IsActiveOut of
4 -> % channel time reqRequestedChan = proplists:get_value(channel, Body),case maps:take(RequestedChan, Chans) oferror -> event_loop(State); % ignore{ChanReqs, TmpChans} ->%% TODO: sendio:format("[TEMP] Received incoming chan time req for ~p from ~p~n", [RequestedChan, Peer]),NewChanReqs = ChanReqs ++ [{received, ReqId, Peer}],event_loop(State#state{peers = update_peer_received(Peers, Peer, MsgSize),channels = maps:put(RequestedChan, NewChanReqs, TmpChans),activeIn = maps:put(ReqId, {Peer, Msg}, ActiveIn)})end;
true ->io:format("[Peer:~p] Received reply to ~p!~n", [Peer, ReqIdStr]),case MsgType of0 -> %% hash response%% this needs to be spread out across conns%% asking the responder directly should be okay enough for nowHashes = proplists:get_value(hashes, Body),FilterUnknownPosts = fun(H) ->IsUnknown = not db:hasPost(Db, H),io:format("[DEBUG] skipping post: ~p? ~p~n",[hex:bin_to_hexstr(H), IsUnknown]),IsUnknownend,NewHashes = lists:filter(FilterUnknownPosts, Hashes),%% TODO: keep track of which we asked for{ok, {PostReqId, _PostReqMsg, Size}} = send_post_request(Peer, NewHashes),UpdatedOut = maps:put(PostReqId, Peer, ActiveOut),%% update loop stateS#state{peers = update_peer_sent(Peers, Peer, Size),activeOut = UpdatedOut};1 -> %% post response%% TODO: check we actually asked for thisPosts = proplists:get_value(posts, Body),[{ok, _, _} = db:savePost(Db, P) || P <- Posts],[io:format("[TEMP] got post reply:~n~p~n", [posts:decode(P)]) || P <- Posts],Send;
5 -> % channel state reqRequestedChan = proplists:get_value(channel, Body),case maps:take(RequestedChan, Chans) oferror -> event_loop(State); % ignore{ChanReqs, TmpChans} ->%% TODO: send recent channel state hashesio:format("[TEMP] Received incoming state req for ~p from ~p~n", [RequestedChan, Peer]),NewChanReqs = ChanReqs ++ [{received, ReqId, Peer}],event_loop(State#state{peers = update_peer_received(Peers, Peer, MsgSize),channels = maps:put(RequestedChan, NewChanReqs, TmpChans),activeIn = maps:put(ReqId, {Peer, Msg}, ActiveIn)})end;
false -> %% incoming requestcase MsgType of%% 0, 1 and 7 are responses [hash, post, channel list]2 -> % post reqHashes = proplists:get_value(hashes, Body),LoadKnownPosts = fun(H) ->case db:loadPost(Db, H) ofnotFound ->false;{ok, [_Head, _Body, Bin]} ->{true, Bin}endend,KnownPosts = lists:filtermap(LoadKnownPosts, Hashes),{ok, {_ReqId, _Msg, Size}} = send_post_response(Peer, ReqId, KnownPosts),S#state{peers = update_peer_sent(Peers, Peer, Size)};
6 -> % channel list requestChanList = maps:keys(Chans),RxPeers = update_peer_received(Peers, Peer, MsgSize),{ok, {_, _, ResponseSize}} = send_channel_list_response(Peer, ReqId, ChanList),RTxPeers = update_peer_sent(RxPeers, Peer, ResponseSize),event_loop(State#state{peers = RTxPeers});
3 -> % cancel reqCancelId = proplists:get_value(cancelId, Body, <<>>),case maps:take(CancelId, ActiveIn) oferror -> S; % ignore{_, NewActiveIn} ->UpdateChans = fun(_Chan, ChanReqs) ->F = fun({received, ChanReqId, _}) -> CancelId =/= ChanReqId end,UpdatedReqs = lists:filter(F, ChanReqs),{true, UpdatedReqs}end,S#state{peers = update_peer_received(Peers, Peer, MsgSize),channels = maps:filtermap(UpdateChans, Chans),activeIn = NewActiveIn}end;4 -> % channel time reqRequestedChan = proplists:get_value(channel, Body),case maps:take(RequestedChan, Chans) oferror -> S; % ignore{ChanReqs, TmpChans} ->%% TODO: send current stateio:format("[TEMP] Received incoming chan time req for ~p from ~p~n", [RequestedChan, Peer]),NewChanReqs = ChanReqs ++ [{received, ReqId, Peer}],S#state{peers = update_peer_received(Peers, Peer, MsgSize),channels = maps:put(RequestedChan, NewChanReqs, TmpChans),activeIn = maps:put(ReqId, {Peer, Msg}, ActiveIn)}end;
Unhandled ->io:format("[Warning] unhandled incoming message of type ~p that isn't state:activeOut - RequestId:~p~n", [Unhandled, ReqIdStr]),event_loop(State)end
5 -> % channel state reqRequestedChan = proplists:get_value(channel, Body),case maps:take(RequestedChan, Chans) oferror -> S; % ignore{ChanReqs, TmpChans} ->%% TODO: send recent channel state hashesio:format("[TEMP] Received incoming state req for ~p from ~p~n", [RequestedChan, Peer]),NewChanReqs = ChanReqs ++ [{received, ReqId, Peer}],S#state{peers = update_peer_received(Peers, Peer, MsgSize),channels = maps:put(RequestedChan, NewChanReqs, TmpChans),activeIn = maps:put(ReqId, {Peer, Msg}, ActiveIn)}end;6 -> % channel list requestChanList = maps:keys(Chans),RxPeers = update_peer_received(Peers, Peer, MsgSize),{ok, {_, _, ResponseSize}} = send_channel_list_response(Peer, ReqId, ChanList),RTxPeers = update_peer_sent(RxPeers, Peer, ResponseSize),S#state{peers = RTxPeers};Unhandled ->io:format("[Warning] unhandled incoming message of type ~p that isn't state:activeOut - RequestId:~p~n", [Unhandled, ReqIdStr]),S
human_bytesize(Size) -> human_bytesize(Size, ["b","kb","mb","gb","tb","pb", "eb"]).human_bytesize(S, [_|[_|_] = L]) when S >= 1024 -> human_bytesize(S/1024, L);
%% these could be their own moduledefault_caberl_location() ->H = os:getenv("HOME"),Loc = filename:join([H, ".caberl"]),ok = case file:make_dir(Loc) ofok -> ok;{error, eexist} -> okend,Loc.create_or_load_keypair(Location) ->Fname = filename:join(Location, "secret"),Seed = case file:read_file(Fname) of%% load existing{ok, Content} when byte_size(Content) =:= 32 ->Content;%% generate new keypair{error, enoent} ->S = crypto:strong_rand_bytes(32),ok = file:write_file(Fname, S),ok = file:change_mode(Fname, 8#0400),io:format("[Cable] Created fresh keypair~n"),Send,Kp = enacl:sign_seed_keypair(Seed),PubKeyStr = hex:bin_to_hexstr(maps:get(public, Kp)),io:format("[Cable] Public Key: ~s~n", [PubKeyStr]),Kp.%% one off utilityhuman_bytesize(Size) ->human_bytesize(Size, ["b","kb","mb","gb"]).human_bytesize(S, [_|[_|_] = L]) when S >= 1024 ->human_bytesize(S/1024, L);
hasPost(Db, Hash) ->gen_server:call(Db, {hasPostByHash, Hash}).%% manage our active channelschannelsJoin(Db, Name) ->gen_server:call(Db, {channelsJoin, Name}).channelsLeave(Db, Name) ->gen_server:call(Db, {channelsLeave, Name}).channelsList(Db) ->gen_server:call(Db, {channelsList}).
{rowid, _RowId} = sqlite3:sql_exec(Db, "INSERT INTO users (public_key) VALUES (?) ON CONFLICT DO NOTHING;", [{blob, PubKey}]),[{columns, ["id","public_key","name"]}, {rows, [{UserId, {blob, PubKey}, _Name}]}] = sqlite3:read(Db, users, {public_key, {blob, PubKey}}),
Qry = "INSERT INTO users (public_key) VALUES (?) ON CONFLICT DO NOTHING;",{rowid, _RowId} = sqlite3:sql_exec(Db, Qry, [{blob, PubKey}]),[{columns, ["id","public_key","name"]},{rows, [{UserId, {blob, PubKey}, _Name}]}] = sqlite3:read(Db, users, {public_key, {blob, PubKey}}),
PostInsert = [{user_id, UserId}, {raw_post, Binary}] ++ HeaderTrimmed ++ case proplists:get_value(type, Header) of0 -> Body;1 ->Concat = iolist_to_binary(proplists:get_value(hashes, Body)),[{deletedHashes, Concat}];
PostInsert = [{user_id, UserId}, {raw_post, Binary}] ++HeaderTrimmed ++ case proplists:get_value(type, Header) of0 -> Body;1 ->Concat = iolist_to_binary(proplists:get_value(hashes, Body)),[{deletedHashes, Concat}];
2 -> %% infosChan = proplists:get_value(channel, Body),Infos = proplists:get_value(infos, Body),[{channel, Chan}, {infos, {blob, jsone:encode(Infos)}}];3 -> %% topicBody;%% join / leave4 -> Body;5 -> Bodyend,
2 -> %% infosChan = proplists:get_value(channel, Body),Infos = proplists:get_value(infos, Body),[{channel, Chan}, {infos, {blob, jsone:encode(Infos)}}];3 -> %% topicBody;%% join / leave4 -> Body;5 -> Bodyend,
{reply, {ok, unpackPost(Db, Res)}, State};
{reply, unpackPost(Db, Res), State};handle_call({hasPostByHash, Hash}, _From, [{sql, Db}, _] = State) when is_binary(Hash) ->Qry = "SELECT count(*) as count from posts where hash = ?",[{columns, ["count"]},{rows, [{Count}]}] = sqlite3:sql_exec(Db, Qry, [{blob, Hash}]),Result = case Count of0 -> false;1 -> true;Other -> throw({unexpectedCount, Other})end,{reply, Result, State};
handle_call({addchannel, Name}, _From, [{sql, Db}, _] = State) ->io:format("INSERTing channel: ~p~n", [Name]),ok = sqlite3:write(Db, channels, [{name, Name}]),
handle_call({channelsLeave, Name}, _From, [{sql, Db}, _] = State) ->ok = sqlite3:delete(Db, channels, [{name, Name}]),
handle_call(listchannels, _From, [{sql, Db}, _] = State) ->[_, {rows, Rows}] = sqlite3:read_all(Db, channels),Chans = lists:map(fun({_Id, Name}) ->
handle_call({channelsJoin, Name}, _From, [{sql, Db}, _] = State) ->Res = case sqlite3:write(Db, channels, [{name, Name}, {topic, ""}]) of{rowid, _RowId} -> ok;{error, 19, _Msg} -> ok %% constraint violation => already joinedend,{reply, Res, State};handle_call({channelsList}, _From, [{sql, Db}, _] = State) ->[{columns, ["name", "topic"]}, {rows, Rows}] = sqlite3:read_all(Db, channels),Chans = lists:map(fun({Name, _Topic}) ->
ExpectedCols = ["id", "hash", "type", "timestamp", "user_id", "raw_post", "channel", "text", "topic", "deletedHashes", "infos"],[{columns, ExpectedCols}, {rows, [Row]}] = ReadResult,{_RowId, {blob, Hash}, Type, Timestamp, UserId, {blob, RawPost},
ExpectedCols = ["id", "hash", "type", "timestamp", "user_id", "raw_post","channel", "text", "topic", "deletedHashes", "infos"],case ReadResult of[{columns, ExpectedCols},{rows, [Row]}] -> {ok, unpackPostRow(Db, Row)};_Other -> notFoundend.unpackPostRow(Db, Row) ->{_RowId, {blob, Hash}, Type, Timestamp, UserId, {blob, RawPost},
[{columns, ["public_key"]}, {rows, [{{blob, PubKey}}]}] = sqlite3:sql_exec(Db, "SELECT public_key from users where id = ?", [UserId]),
Qry = "SELECT public_key from users where id = ?",[{columns, ["public_key"]},{rows, [{{blob, PubKey}}]}] = sqlite3:sql_exec(Db, Qry, [UserId]),