HSZ7IZYPWNACXC5PJO7CL6P7PWM3FNC3IZTRK2R2TWEWFCJFP6RQC XGOBDG23Q4PBQCE5IDY4ACQQKXJ77VM43Q433GBWI4VK3PT5NLEAC CHVZUNVDYINFBZWWY3TG4O7IGTL3ISTBRTWE4AU7QVE2ZZEOFHBAC DCSQUOY35OK5QE5YQVM654PYIYDGPMRV5JNCXEIYMJIDMGQ3Y3GAC U5FBSDZETGGNW7DDWIX54S47USLJA3S6UUMVBLGUX2E3CFTHTQCQC C7DQVKR64FXYO37D72SNDMCJOM5PCZ3QW23MBFYNLRXFRNV2R4IAC CTO6D5DFEM2N3FD7HOJART6MSTV3LNZ56UJKHW3X3SVQCDTCHGIQC 3RUDKBK2XSZZSMORSCHSLHZCVSQFCCZEHQFB2BFSPCGXC5XVLJMQC 2E4H4QPHKUDVTUDO335LRPAAU7374N5EX6TJW2NKDW7JK3N4HRTQC 6RQQDL46IO2ZFTJSEJREWJIMTNHOH4UBSO2VXAYNLEWNUR72OWHQC 34UNGHWEZK2J2VHCULNFD5AQPZQPD6MXU7ZAKY4DLJ5TQSROQ4BAC 55WLMLEEVBRSTAFRZ5RGF7TOGUF5OPVCPA2TMHAQK45OUO7PA3YQC IHBNW3GI2XB6KAWUYRLL6KDOBUNUOU3N7RRLS6BFNW6SE7ZDHLWQC 76BR2NBTN2IMHYXE4SXEZFPYUZDP55XVPXVHBJFG3T6EKDXXODFQC JR3F3TQ3A7I2K5AWRK7SKAOZI5XE3ZGAS3VK4O7FKODUONQSY35QC NBMKIBO6UJKXCOXXVPPENLEBYI4YOU2VCHH5KIOUGH7WJG47N4PQC G7UELMX4I2NFSZKZFFACQL5M3DRWQDQGM3NSHHVYAE75JSLXAVWQC QTLCENKPK4QOQJTHEAWAJYTJWVH7ZI5KQ2CQTMJRMA4TOMCHTVBAC MONVI5STEDKY5ALVMEXJJXDUX6XQRKTFLP7BBNOQML3VSJEM2JAAC LJYBM2AL5Z3TRSMCRTWSJ7XADJ6H5UWCMLFXMJIHEJZX2YDWJTMQC R4JDMB7LL3FLA4NJEAV2DQEXII5XS5KIMG3H4YS5P6W7ZZUE7FIQC BXNLFE3IMYFUYXQ6RYXKU4FVWV4T2H7WI5AL2DVYJMA4X2I5XB6QC Result = try% Set the controlling process to the transport gen_server% so it receives {cable_transport, ConnPid, Data} messagesok = enoise_cable:controlling_process(ConnPid, TransportPid),% Get the peer address from the connection{ok, {PeerIP, PeerPort}} = enoise_cable:peername(ConnPid),PeerAddr = {PeerIP, PeerPort},io:format("[Transport] Peer address: ~p~n", [PeerAddr]),TransportPid ! {new_connection, ConnPid, PeerAddr},okcatchError:ErrorReason:Stacktrace ->io:format("[Transport] Error setting up connection ~p: ~p:~p~n Stacktrace: ~p~n",[ConnPid, Error, ErrorReason, Stacktrace]),% Close the connection on errorcatch enoise_cable:close(ConnPid),errorend,
Result =try% Set the controlling process to the transport gen_server% so it receives {cable_transport, ConnPid, Data} messagesok = enoise_cable:controlling_process(ConnPid, TransportPid),% Get the peer address from the connection{ok, {PeerIP, PeerPort}} = enoise_cable:peername(ConnPid),PeerAddr = {PeerIP, PeerPort},io:format("[Transport] Peer address: ~p~n", [PeerAddr]),TransportPid ! {new_connection, ConnPid, PeerAddr},okcatchError:ErrorReason:Stacktrace ->io:format("[Transport] Error setting up connection ~p: ~p:~p~n Stacktrace: ~p~n",[ConnPid, Error, ErrorReason, Stacktrace]),% Close the connection on errorcatch enoise_cable:close(ConnPid),errorend,
{_OrigConnPid, OrigMsg} = maps:get(ReqId, ActiveOut, {ConnPid, undefined}),OrigMsgType = case OrigMsg ofundefined -> unknown;[OrigHeader, _OrigBody] ->proplists:get_value(msgType, OrigHeader, unknown);_ -> unknownend,
{_OrigConnPid, OrigMsg} = maps:get(ReqId, ActiveOut, {ConnPid, undefined}),OrigMsgType =case OrigMsg ofundefined ->unknown;[OrigHeader, _OrigBody] ->proplists:get_value(msgType, OrigHeader, unknown);_ ->unknownend,
{HashesToFetch, HashesToStore, Channel} = case OrigMsgType of5 -> % channel_state_request - fetch all eagerlyio:format("[Peer] Hash response for channel_state_request: fetching all ~p hashes eagerly~n", [length(NewHashes)]),{NewHashes, [], undefined};4 -> % channel_time_range_request - hybrid strategyChan = case OrigMsg of[_OH, OrigBody] -> proplists:get_value(channel, OrigBody, undefined);_ -> undefinedend,EagerCount = min(100, length(NewHashes)),Eager = lists:sublist(NewHashes, EagerCount),Lazy = lists:sublist(NewHashes, EagerCount + 1, length(NewHashes)),io:format("[Peer] Hash response for channel_time_range_request (~s): fetching ~p hashes eagerly, storing ~p as unfetched~n",[Chan, length(Eager), length(Lazy)]),{Eager, Lazy, Chan};_ -> % unknown - default to eagerio:format("[Peer] Hash response for unknown request type: fetching all ~p hashes eagerly (fallback)~n", [length(NewHashes)]),{NewHashes, [], undefined}end,
{HashesToFetch, HashesToStore, Channel} =case OrigMsgType of% channel_state_request - fetch all eagerly5 ->io:format("[Peer] Hash response for channel_state_request: fetching all ~p hashes eagerly~n",[length(NewHashes)]),{NewHashes, [], undefined};% channel_time_range_request - hybrid strategy4 ->Chan =case OrigMsg of[_OH, OrigBody] ->proplists:get_value(channel, OrigBody, undefined);_ ->undefinedend,EagerCount = min(100, length(NewHashes)),Eager = lists:sublist(NewHashes, EagerCount),Lazy = lists:sublist(NewHashes, EagerCount + 1, length(NewHashes)),io:format("[Peer] Hash response for channel_time_range_request (~s): fetching ~p hashes eagerly, storing ~p as unfetched~n",[Chan, length(Eager), length(Lazy)]),{Eager, Lazy, Chan};% unknown - default to eager_ ->io:format("[Peer] Hash response for unknown request type: fetching all ~p hashes eagerly (fallback)~n",[length(NewHashes)]),{NewHashes, [], undefined}end,
Channels = lists:filtermap(fun(P) ->[_PostHeader, PostBody] = posts:decode(P),case proplists:get_value(channel, PostBody, undefined) ofundefined -> false;Chan -> {true, Chan}endend, NewPosts),
Channels = lists:filtermap(fun(P) ->[_PostHeader, PostBody] = posts:decode(P),case proplists:get_value(channel, PostBody, undefined) ofundefined -> false;Chan -> {true, Chan}endend,NewPosts),
Target = case string:split(Address, ":") of[HostStr, PortStr] ->tryPort = list_to_integer(PortStr),case inet:getaddr(HostStr, inet) of{ok, IP} -> {IP, Port};_ -> undefinedendcatch_:_ -> undefinedend;_ -> undefinedend,
Target =case string:split(Address, ":") of[HostStr, PortStr] ->tryPort = list_to_integer(PortStr),case inet:getaddr(HostStr, inet) of{ok, IP} -> {IP, Port};_ -> undefinedendcatch_:_ -> undefinedend;_ ->undefinedend,
lists:foreach(fun(Chan) ->tryHandlerPid ! {channel_event, {new_messages, Chan}}catch_:_ ->io:format("[Peer] Failed to send event to handler ~p~n", [HandlerPid])endend, PendingList),io:format("[Peer] Notified handler ~p of ~p channels~n", [HandlerPid, length(PendingList)])
lists:foreach(fun(Chan) ->tryHandlerPid ! {channel_event, {new_messages, Chan}}catch_:_ ->io:format("[Peer] Failed to send event to handler ~p~n", [HandlerPid])endend,PendingList),io:format("[Peer] Notified handler ~p of ~p channels~n", [HandlerPid, length(PendingList)])
NewHandlers = maps:map(fun(_Pid, Handler) ->Pending = maps:get(pending, Handler),NewPending = lists:foldl(fun sets:add_element/2, Pending, Channels),Handler#{pending => NewPending}end, Handlers),
NewHandlers = maps:map(fun(_Pid, Handler) ->Pending = maps:get(pending, Handler),NewPending = lists:foldl(fun sets:add_element/2, Pending, Channels),Handler#{pending => NewPending}end,Handlers),
score => case Score of null -> 0; S -> S end,last_seen => case LastSeen of null -> 0; LS -> LS end,last_attempt => case LastAttempt of null -> 0; LA -> LA end,attempt_count => case AttemptCount of null -> 0; AC -> AC end,
score =>case Score ofnull -> 0;S -> Send,last_seen =>case LastSeen ofnull -> 0;LS -> LSend,last_attempt =>case LastAttempt ofnull -> 0;LA -> LAend,attempt_count =>case AttemptCount ofnull -> 0;AC -> ACend,
Result = case sqlite3:read(Db, peers, {address, Address}) of[{columns, _Cols}, {rows, [{Id, Addr, Score, LastSeen, LastAttempt, AttemptCount, CreatedAt, Notes}]}] ->{ok, #{id => Id,address => binary_to_list(Addr),score => case Score of null -> 0; S -> S end,last_seen => case LastSeen of null -> 0; LS -> LS end,last_attempt => case LastAttempt of null -> 0; LA -> LA end,attempt_count => case AttemptCount of null -> 0; AC -> AC end,created_at => CreatedAt,notes => case Notes ofnull -> undefined;N -> binary_to_list(N)end}};_ ->not_foundend,
Result =case sqlite3:read(Db, peers, {address, Address}) of[{columns, _Cols},{rows, [{Id, Addr, Score, LastSeen, LastAttempt, AttemptCount, CreatedAt, Notes}]}] ->{ok, #{id => Id,address => binary_to_list(Addr),score =>case Score ofnull -> 0;S -> Send,last_seen =>case LastSeen ofnull -> 0;LS -> LSend,last_attempt =>case LastAttempt ofnull -> 0;LA -> LAend,attempt_count =>case AttemptCount ofnull -> 0;AC -> ACend,created_at => CreatedAt,notes =>case Notes ofnull -> undefined;N -> binary_to_list(N)end}};_ ->not_foundend,
Schema = <<"create table IF NOT EXISTS posts (-- these exist for all posts--id integer primary key AUTOINCREMENT,hash blob unique not null check (length(hash) = 32),type integer not null check (type >= 0 AND type <= 5 ),-- milliseconds since UNIX Epochtimestamp integer not null check (timestamp > 0),user_id integer not null,raw_post blob not null,-- depends on postTypechannel text, -- all but delete have this one-- join & leave don't have other fields then channeltext text,topic text,deletedHashes blob, -- concatenatedinfos blob, -- encoded as (json?) map {k=>v, k=>v}-- `post/text` | a textual chat message, posted to a channel |CONSTRAINT is_text check (type != 0 OR (channel is not null AND text is not null)),-- `post/delete`| the deletion of a previously published post |CONSTRAINT is_delete check (type != 1 OR (deletedHashes is not null)),-- `post/info` | set or clear informative key/value pairs on a userCONSTRAINT is_info check (type != 2 OR (infos is not null)),-- `post/topic` | set or clear a channel's topic stringCONSTRAINT is_topic check (type != 3 OR (channel is not null AND topic is not null)),-- `post/join` | announce membership to a channelCONSTRAINT is_join check (type != 4 OR (channel is not null)),-- `post/leave` | announce cessation of membership to a channelCONSTRAINT is_leave check (type != 5 OR (channel is not null)),foreign key(user_id) references users(id));create table IF NOT EXISTS links (channel text,source blob not null,parent blob not null-- we will get posts for which we don't have the parent's (yet)-- foreign key(source) references posts(hash),-- foreign key(parent) references posts(hash),);CREATE INDEX IF NOT EXISTS parent_idx ON links(parent);create table IF NOT EXISTS channel_members (id integer primary key AUTOINCREMENT,channel text not null,user_id integer not null,last_hash blob not null,UNIQUE(channel, user_id));">>,
Schema =<<"\n""create table IF NOT EXISTS posts (\n"" -- these exist for all posts\n"" --\n"" id integer primary key AUTOINCREMENT,\n"" hash blob unique not null check (length(hash) = 32),\n"" type integer not null check (type >= 0 AND type <= 5 ),\n"" -- milliseconds since UNIX Epoch\n"" timestamp integer not null check (timestamp > 0),\n""\n"" user_id integer not null,\n""\n"" raw_post blob not null,\n""\n""\n"" -- depends on postType\n"" channel text, -- all but delete have this one\n"" -- join & leave don't have other fields then channel\n"" text text,\n"" topic text,\n"" deletedHashes blob, -- concatenated\n"" infos blob, -- encoded as (json?) map {k=>v, k=>v}\n""\n"" -- `post/text` | a textual chat message, posted to a channel |\n"" CONSTRAINT is_text check (\n"" type != 0 OR (channel is not null AND text is not null)\n"" ),\n"" -- `post/delete`| the deletion of a previously published post |\n"" CONSTRAINT is_delete check (\n"" type != 1 OR (deletedHashes is not null)\n"" ),\n"" -- `post/info` | set or clear informative key/value pairs on a user\n"" CONSTRAINT is_info check (\n"" type != 2 OR (infos is not null)\n"" ),\n"" -- `post/topic` | set or clear a channel's topic string\n"" CONSTRAINT is_topic check (\n"" type != 3 OR (channel is not null AND topic is not null)\n"" ),\n"" -- `post/join` | announce membership to a channel\n"" CONSTRAINT is_join check (\n"" type != 4 OR (channel is not null)\n"" ),\n"" -- `post/leave` | announce cessation of membership to a channel\n"" CONSTRAINT is_leave check (\n"" type != 5 OR (channel is not null)\n"" ),\n""\n"" foreign key(user_id) references users(id)\n"");\n""\n""create table IF NOT EXISTS links (\n"" channel text,\n"" source blob not null,\n"" parent blob not null\n""\n"" -- we will get posts for which we don't have the parent's (yet)\n"" -- foreign key(source) references posts(hash),\n"" -- foreign key(parent) references posts(hash),\n"");\n""CREATE INDEX IF NOT EXISTS parent_idx ON links(parent);\n""\n""create table IF NOT EXISTS channel_members (\n"" id integer primary key AUTOINCREMENT,\n"" channel text not null,\n"" user_id integer not null,\n"" last_hash blob not null,\n"" UNIQUE(channel, user_id)\n"");\n">>,