As pointed out to me on fediverse, gen_server has to serialze calls. Which can drastically reduce the performance of the database.
JYBBIFEKDXX2YPVVBR7ZCDDECV3RMFMM4QWOB2DYRPA5CVN3XVCAC HSZ7IZYPWNACXC5PJO7CL6P7PWM3FNC3IZTRK2R2TWEWFCJFP6RQC JMYRP5MUPSBHABHBPIO45F6RC7QSQSNO2EJDUDUIQXX3MEEFDOAQC R4JDMB7LL3FLA4NJEAV2DQEXII5XS5KIMG3H4YS5P6W7ZZUE7FIQC CBHKQGLDCAH2E4ZNACITBSMADOKPERFCWQPUGMH7UN5TLJXLYI4QC 2E4H4QPHKUDVTUDO335LRPAAU7374N5EX6TJW2NKDW7JK3N4HRTQC 6RQQDL46IO2ZFTJSEJREWJIMTNHOH4UBSO2VXAYNLEWNUR72OWHQC 3RUDKBK2XSZZSMORSCHSLHZCVSQFCCZEHQFB2BFSPCGXC5XVLJMQC 55WLMLEEVBRSTAFRZ5RGF7TOGUF5OPVCPA2TMHAQK45OUO7PA3YQC DJ7EM5ZXZRSOBHEAA5EVZNVULJCQ7EX4DQBSK2PKPXITWJDPIUXQC NBMKIBO6UJKXCOXXVPPENLEBYI4YOU2VCHH5KIOUGH7WJG47N4PQC DZZ4B3UGIYTN3OHAKS2HNCLK7KM2ZSHPZ4JC6YVQL6I2H4KCD5PAC MONVI5STEDKY5ALVMEXJJXDUX6XQRKTFLP7BBNOQML3VSJEM2JAAC BSISJB2O2HKYGSCX6HIIMLBIXCZ66BRCZH3622G2NOQJRJ5S3HLAC EDLKGFB5NWTZTHEO6IAR5M4W4533KJI4O6673MKJNTW3JPHQZP5AC QTLCENKPK4QOQJTHEAWAJYTJWVH7ZI5KQ2CQTMJRMA4TOMCHTVBAC QHK3BXHEC6IAAI7HK76Y7W3IQQIEFH7TD7AYMBCLOE7U2L7BT2LQC 34UNGHWEZK2J2VHCULNFD5AQPZQPD6MXU7ZAKY4DLJ5TQSROQ4BAC 2R3WFEOT3WWS6NFBBABSVRUNUPTXHFFMGPZZQOCPLTD2WB3U55HQC CTO6D5DFEM2N3FD7HOJART6MSTV3LNZ56UJKHW3X3SVQCDTCHGIQC YWCRGWVDIMCPXBQFM23MKCYZCXJJSD733NJEPB6WUU4G4BSREILAC RZB6HZ2NI5PIUIWQL63CSKXZGGUG4L6XCSCGR3TI723OO57NDT3AC EVG6AOW4UUH7C6COH5XHYLPBXC3QEZR2372SE3LJCUPPEZQ3BUQQC CHVZUNVDYINFBZWWY3TG4O7IGTL3ISTBRTWE4AU7QVE2ZZEOFHBAC ConnInfo ->PeerAddr = maps:get(addr, ConnInfo),io:format("[Transport] Sending ~p bytes to ~p via encrypted connection ~p~n",[byte_size(Binary), PeerAddr, ConnPid]),
_ConnInfo ->%% PeerAddr = maps:get(addr, ConnInfo),%% io:format(%% "[Transport] Sending ~p bytes to ~p via encrypted connection ~p~n",%% [byte_size(Binary), PeerAddr, ConnPid]%% ),
ConnInfo ->PeerAddr = maps:get(addr, ConnInfo),io:format("[Transport] Received ~p bytes from ~p (conn: ~p)~n", [byte_size(Data), PeerAddr, ConnPid]),
_ConnInfo ->%% PeerAddr = maps:get(addr, ConnInfo),%% io:format("[Transport] Received ~p bytes from ~p (conn: ~p)~n", [%% byte_size(Data), PeerAddr, ConnPid%% ]),
-behavior(gen_server).-export([start_link/1, close/1, save_post/3, save_post/2, load_post/2]).-export([has_post/2, get_text_hashes_by_time_range/5, get_texts_for_channel/2]).
-export([open/1, close/1]).-export([save_post/3, save_post/2, load_post/2, has_post/2]).-export([get_text_hashes_by_time_range/5, get_texts_for_channel/2]).
% gen_server things-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
%% read and write postssave_post(Db, Post, Data) when is_binary(Data) ->gen_server:call(Db, {savePost, Post, Data}).save_post(Db, Data) when is_binary(Data) ->Post = posts:decode(Data),gen_server:call(Db, {savePost, Post, Data}).load_post(Db, IntId) when is_integer(IntId), IntId >= 0 ->gen_server:call(Db, {loadPostById, IntId});load_post(Db, Hash) when is_binary(Hash) ->gen_server:call(Db, {loadPostByHash, Hash}).has_post(Db, Hash) when is_binary(Hash) ->gen_server:call(Db, {hasPostByHash, Hash}).%% TODO: time windows and limitget_texts_for_channel(Db, Chan) ->gen_server:call(Db, {textsForChannel, Chan}).get_text_hashes_by_time_range(Db, Chan, Start, End, Limit) ->ActualEnd =case End of%% 00:00:00 01.01.21800 -> 6626966400000;_ -> End
open(Path) ->DbPath =case file:read_file_info(Path) of{ok, #file_info{type = regular}} -> Path;_ -> filename:join(Path, "sqlite.db")
ActualLimit =case Limit of0 -> 9001;_ -> Limitend,gen_server:call(Db, {hashesByTimeRange, Chan, Start, ActualEnd, ActualLimit}).%% manage our active channelschannels_join(Db, Name) ->gen_server:call(Db, {channelsJoin, Name}).channels_leave(Db, Name) ->gen_server:call(Db, {channelsLeave, Name}).channels_list(Db) ->gen_server:call(Db, {channelsList}).channels_known(Db) ->gen_server:call(Db, {channelsKnown}).%% returns a map of the membersget_channel_members(Db, Chan) ->gen_server:call(Db, {channelsMembers, Chan}).%% helps with channel state requestsget_channel_state(Db, Chan) ->gen_server:call(Db, {channelsState, Chan}).%% helps with creating links for new postsget_channel_heads(Db, Chan) ->gen_server:call(Db, {channelsHeads, Chan}).%% manage persistent peerspeer_add(Db, Address) ->gen_server:call(Db, {peerAdd, Address}).peer_update(Db, Address, Updates) ->gen_server:call(Db, {peerUpdate, Address, Updates}).peer_list(Db) ->gen_server:call(Db, {peerList}).peer_delete(Db, Address) ->gen_server:call(Db, {peerDelete, Address}).peer_get(Db, Address) ->gen_server:call(Db, {peerGet, Address}).%% unfetched hash queries (internal - called by peer module only)unfetched_get(Db, Channel, Limit) ->gen_server:call(Db, {unfetchedGet, Channel, Limit}).unfetched_count(Db, Channel) ->gen_server:call(Db, {unfetchedCount, Channel}).%% Note: unfetched_add and unfetched_delete are internal operations%% called directly via gen_server:call from peer module, no wrapper functions%% server thingsinit(Args) ->process_flag(trap_exit, true),Path = proplists:get_value(dbPath, Args),open_db(Path).open_db(Path) ->{ok, Db} = sqlite3:open(list_to_atom(Path), [{file, Path}]),
{ok, Db} = sqlite3:open(list_to_atom(DbPath), [{file, DbPath}]),
{reply, {ok, Id, PostHash}, State};handle_call({loadPostByHash, Hash}, _From, [{sql, Db}, _] = State) when is_binary(Hash) ->
{ok, Id, PostHash}.load_post(Db, IntId) when is_integer(IntId), IntId >= 0 ->Res = sqlite3:read(Db, posts, {id, IntId}),unpackPost(Db, Res);load_post(Db, Hash) when is_binary(Hash) ->
{reply, unpackPost(Db, Res), State};handle_call({loadPostById, IntId}, _From, [{sql, Db}, _] = State) whenis_integer(IntId), IntId >= 0->Res = sqlite3:read(Db, posts, {id, IntId}),{reply, unpackPost(Db, Res), State};handle_call({hasPostByHash, Hash}, _From, [{sql, Db}, _] = State) when is_binary(Hash) ->
unpackPost(Db, Res).has_post(Db, Hash) when is_binary(Hash) ->
{reply, {ok, Result}, State};handle_call({hashesByTimeRange, Chan, Start, End, Limit}, _From, [{sql, Db}, _] = State) ->
{ok, Result}.get_text_hashes_by_time_range(Db, Chan, Start, End, Limit) ->ActualEnd =case End of%% 00:00:00 01.01.21800 -> 6626966400000;_ -> Endend,ActualLimit =case Limit of0 -> 9001;_ -> Limitend,
%% TODO: add testshandle_call({channelsHeads, Chan}, _, [{sql, Db}, _] = State) ->Qry ="SELECT DISTINCT source FROM links " ++"WHERE source NOT IN (" ++" SELECT DISTINCT parent FROM links" ++" WHERE channel = ? or channel is null)",QryChan =case Chan ofnull -> null;_ when is_list(Chan) -> {blob, list_to_binary(Chan)}
channels_leave(Db, Name) ->sqlite3:delete(Db, channels, {name, Name}).channels_list(Db) ->[{columns, ["name", "topic"]}, {rows, Rows}] = sqlite3:read_all(Db, channels),Chans = lists:map(fun({Name, _Topic}) ->binary_to_list(Name)
[{columns, ["source"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Qry, [QryChan]),Result = [Hash || {{blob, Hash}} <- Rows],{reply, {ok, Result}, State};handle_call({channelsState, Chan}, _, [{sql, Db}, _] = State) ->Qry = "SELECT hash, user_id from posts where channel = ? and type > 2 order by timestamp",QryChan = {blob, list_to_binary(Chan)},[{columns, ["hash", "user_id"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Qry, [QryChan]),JoinLeaveHashes = [Hash || {{blob, Hash}, _} <- Rows],
Rows),{ok, Chans}.
%% add info posts for all users involvedInfoPostQry ="SELECT hash from posts where user_id = ? and type = 2 order by timestamp desc limit 1",InfosHashes = lists:foldl(fun({_, UserId}, Acc) ->[{columns, ["hash"]},{rows, InfoRows}] = sqlite3:sql_exec(Db, InfoPostQry, [UserId]),case InfoRows of[{{blob, H}}] -> sets:add_element(H, Acc);[] -> Accend
channels_known(Db) ->%% Get all unique channels from posts table where channel is not nullQuery = "SELECT DISTINCT channel FROM posts WHERE channel IS NOT NULL ORDER BY channel",[{columns, ["channel"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Query),Chans = lists:map(fun({ChannelData}) ->%% Handle both {blob, Bin} and plain BinaryChannelBin =case ChannelData of{blob, B} -> B;B when is_binary(B) -> Bend,binary_to_list(ChannelBin)
{reply, {ok, Result}, State};handle_call({channelsLeave, Name}, _From, [{sql, Db}, _] = State) ->ok = sqlite3:delete(Db, channels, {name, Name}),{reply, ok, State};handle_call({channelsJoin, Name}, _From, [{sql, Db}, _] = State) ->Res =case sqlite3:write(Db, channels, [{name, Name}, {topic, ""}]) of{rowid, _RowId} -> ok;%% constraint violation => already joined{error, 19, _Msg} -> okend,{reply, Res, State};handle_call({channelsList}, _From, [{sql, Db}, _] = State) ->[{columns, ["name", "topic"]}, {rows, Rows}] = sqlite3:read_all(Db, channels),Chans = lists:map(fun({Name, _Topic}) ->binary_to_list(Name)
{ok, Result}.%% helps with channel state requestsget_channel_state(Db, Chan) ->Qry = "SELECT hash, user_id from posts where channel = ? and type > 2 order by timestamp",QryChan = {blob, list_to_binary(Chan)},[{columns, ["hash", "user_id"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Qry, [QryChan]),JoinLeaveHashes = [Hash || {{blob, Hash}, _} <- Rows],%% add info posts for all users involvedInfoPostQry ="SELECT hash from posts where user_id = ? and type = 2 order by timestamp desc limit 1",InfosHashes = lists:foldl(fun({_, UserId}, Acc) ->[{columns, ["hash"]},{rows, InfoRows}] = sqlite3:sql_exec(Db, InfoPostQry, [UserId]),case InfoRows of[{{blob, H}}] -> sets:add_element(H, Acc);[] -> Accend
%% io:format("[DEBUG] ChannelsList: ~p~n", [Chans]),{reply, {ok, Chans}, State};handle_call({channelsKnown}, _From, [{sql, Db}, _] = State) ->%% Get all unique channels from posts table where channel is not nullQuery = "SELECT DISTINCT channel FROM posts WHERE channel IS NOT NULL ORDER BY channel",[{columns, ["channel"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Query),Chans = lists:map(fun({ChannelData}) ->%% Handle both {blob, Bin} and plain BinaryChannelBin =case ChannelData of{blob, B} -> B;B when is_binary(B) -> Bend,binary_to_list(ChannelBin)
Result = JoinLeaveHashes ++ sets:to_list(InfosHashes),{ok, Result}.%% helps with creating links for new posts%% TODO: add testsget_channel_heads(Db, Chan) ->Qry ="SELECT DISTINCT source FROM links " ++"WHERE source NOT IN (" ++" SELECT DISTINCT parent FROM links" ++" WHERE channel = ? or channel is null)",QryChan =case Chan ofnull -> null;_ when is_list(Chan) -> {blob, list_to_binary(Chan)}
Res =case sqlite3:write(Db, peers, Row) of{rowid, RowId} -> {ok, RowId};{error, 19, _Msg} -> {error, already_exists}end,{reply, Res, State};handle_call({peerUpdate, Address, Updates}, _From, [{sql, Db}, _] = State) ->%% Updates is a proplist like [{score, 10}, {last_seen, Timestamp}]Res =case sqlite3:update(Db, peers, {address, Address}, Updates) ofok -> ok;Error -> {error, Error}end,{reply, Res, State};handle_call({peerList}, _From, [{sql, Db}, _] = State) ->
case sqlite3:write(Db, peers, Row) of{rowid, RowId} -> {ok, RowId};{error, 19, _Msg} -> {error, already_exists}end.%% Updates is a proplist like [{score, 10}, {last_seen, Timestamp}]peer_update(Db, Address, Updates) ->case sqlite3:update(Db, peers, {address, Address}, Updates) ofok -> ok;Error -> {error, Error}end.peer_list(Db) ->
fun({Id, Address, Score, LastSeen, LastAttempt, AttemptCount, CreatedAt, Notes}) ->#{id => Id,address => binary_to_list(Address),score =>case Score ofnull -> 0;S -> Send,last_seen =>case LastSeen ofnull -> 0;LS -> LSend,last_attempt =>case LastAttempt ofnull -> 0;LA -> LAend,attempt_count =>case AttemptCount ofnull -> 0;AC -> ACend,created_at => CreatedAt,notes =>case Notes ofnull -> undefined;N -> binary_to_list(N)end}end,
unpack_peer,
{reply, {ok, Peers}, State};handle_call({peerDelete, Address}, _From, [{sql, Db}, _] = State) ->ok = sqlite3:delete(Db, peers, {address, Address}),{reply, ok, State};handle_call({peerGet, Address}, _From, [{sql, Db}, _] = State) ->Result =case sqlite3:read(Db, peers, {address, Address}) of[{columns, _Cols},{rows, [{Id, Addr, Score, LastSeen, LastAttempt, AttemptCount, CreatedAt, Notes}]}] ->{ok, #{id => Id,address => binary_to_list(Addr),score =>case Score ofnull -> 0;S -> Send,last_seen =>case LastSeen ofnull -> 0;LS -> LSend,last_attempt =>case LastAttempt ofnull -> 0;LA -> LAend,attempt_count =>case AttemptCount ofnull -> 0;AC -> ACend,created_at => CreatedAt,notes =>case Notes ofnull -> undefined;N -> binary_to_list(N)end}};_ ->not_foundend,{reply, Result, State};
{ok, Peers}.peer_delete(Db, Address) ->sqlite3:delete(Db, peers, {address, Address}).peer_get(Db, Address) ->case sqlite3:read(Db, peers, {address, Address}) of[{columns, _Cols},{rows, [Row]}] ->{ok, unpack_peer(Row)};_ ->not_foundend.
%% unfetched hash queries (internal - called by peer module only)unfetched_get(Db, Channel, Limit) ->ChannelBin = list_to_binary(Channel),Query ="SELECT hash FROM unfetched_hashes WHERE channel = ? ORDER BY discovered_at ASC LIMIT ?",[{columns, ["hash"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Query, [ChannelBin, Limit]),Hashes = [Hash || {{blob, Hash}} <- Rows],{ok, Hashes}.
handle_call({unfetchedAdd, Hashes, Channel, Source}, _From, [{sql, Db}, _] = State) ->
unfetched_count(Db, Channel) ->ChannelBin = list_to_binary(Channel),Query = "SELECT count(*) as count FROM unfetched_hashes WHERE channel = ?",[{columns, ["count"]}, {rows, [{Count}]}] = sqlite3:sql_exec(Db, Query, [ChannelBin]),{ok, Count}.unfetched_add(Db, Hashes, Channel, Source) ->
{reply, {ok, Results}, State};handle_call({unfetchedGet, Channel, Limit}, _From, [{sql, Db}, _] = State) ->ChannelBin = list_to_binary(Channel),Query ="SELECT hash FROM unfetched_hashes WHERE channel = ? ORDER BY discovered_at ASC LIMIT ?",[{columns, ["hash"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Query, [ChannelBin, Limit]),Hashes = [Hash || {{blob, Hash}} <- Rows],{reply, {ok, Hashes}, State};handle_call({unfetchedCount, Channel}, _From, [{sql, Db}, _] = State) ->ChannelBin = list_to_binary(Channel),Query = "SELECT count(*) as count FROM unfetched_hashes WHERE channel = ?",[{columns, ["count"]}, {rows, [{Count}]}] = sqlite3:sql_exec(Db, Query, [ChannelBin]),{reply, {ok, Count}, State};handle_call({unfetchedDelete, Hashes}, _From, [{sql, Db}, _] = State) ->
{ok, Results}.unfetched_delete(Db, Hashes) ->
{reply, ok, State}.%%%%%%%%%%%%%%%%%%%%%%%%% gen_server things %%%%%%%%%%%%%%%%%%%%%%%%%handle_cast(Msg, State) ->io:format("Unexpected async call: ~p~n", [Msg]),{noreply, State}.handle_info(Msg, State) ->io:format("Unexpected message: ~p~n", [Msg]),{noreply, State}.terminate(_Reason, [{sql, Db}, _] = _State) ->sqlite3:close(Db),
unpack_peer(Row) ->{Id, Addr, Score, LastSeen, LastAttempt, AttemptCount, CreatedAt, Notes} = Row,#{id => Id,address => binary_to_list(Addr),score =>case Score ofnull -> 0;S -> Send,last_seen =>case LastSeen ofnull -> 0;LS -> LSend,last_attempt =>case LastAttempt ofnull -> 0;LA -> LAend,attempt_count =>case AttemptCount ofnull -> 0;AC -> ACend,created_at => CreatedAt,notes =>case Notes ofnull -> undefined;N -> binary_to_list(N)end}.
<<"\n""create table IF NOT EXISTS posts (\n"" -- these exist for all posts\n"" --\n"" id integer primary key AUTOINCREMENT,\n"" hash blob unique not null check (length(hash) = 32),\n"" type integer not null check (type >= 0 AND type <= 5 ),\n"" -- milliseconds since UNIX Epoch\n"" timestamp integer not null check (timestamp > 0),\n""\n"" user_id integer not null,\n""\n"" raw_post blob not null,\n""\n""\n"" -- depends on postType\n"" channel text, -- all but delete have this one\n"" -- join & leave don't have other fields then channel\n"" text text,\n"" topic text,\n"" deletedHashes blob, -- concatenated\n"" infos blob, -- encoded as (json?) map {k=>v, k=>v}\n""\n"" -- `post/text` | a textual chat message, posted to a channel |\n"" CONSTRAINT is_text check (\n"" type != 0 OR (channel is not null AND text is not null)\n"" ),\n"" -- `post/delete`| the deletion of a previously published post |\n"" CONSTRAINT is_delete check (\n"" type != 1 OR (deletedHashes is not null)\n"" ),\n"" -- `post/info` | set or clear informative key/value pairs on a user\n"" CONSTRAINT is_info check (\n"" type != 2 OR (infos is not null)\n"" ),\n"" -- `post/topic` | set or clear a channel's topic string\n"" CONSTRAINT is_topic check (\n"" type != 3 OR (channel is not null AND topic is not null)\n"" ),\n"" -- `post/join` | announce membership to a channel\n"" CONSTRAINT is_join check (\n"" type != 4 OR (channel is not null)\n"" ),\n"" -- `post/leave` | announce cessation of membership to a channel\n"" CONSTRAINT is_leave check (\n"" type != 5 OR (channel is not null)\n"" ),\n""\n"" foreign key(user_id) references users(id)\n"");\n""\n""create table IF NOT EXISTS links (\n"" channel text,\n"" source blob not null,\n"" parent blob not null\n""\n"" -- we will get posts for which we don't have the parent's (yet)\n"" -- foreign key(source) references posts(hash),\n"" -- foreign key(parent) references posts(hash),\n"");\n""CREATE INDEX IF NOT EXISTS parent_idx ON links(parent);\n""\n""create table IF NOT EXISTS channel_members (\n"" id integer primary key AUTOINCREMENT,\n"" channel text not null,\n"" user_id integer not null,\n"" last_hash blob not null,\n"" UNIQUE(channel, user_id)\n"");\n">>,
<<"\n""create table IF NOT EXISTS posts (\n"" -- these exist for all posts\n"" --\n"" id integer primary key AUTOINCREMENT,\n"" hash blob unique not null check (length(hash) = 32),\n"" type integer not null check (type >= 0 AND type <= 5 ),\n"" -- milliseconds since UNIX Epoch\n"" timestamp integer not null check (timestamp > 0),\n""\n"" user_id integer not null,\n""\n"" raw_post blob not null,\n""\n""\n"" -- depends on postType\n"" channel text, -- all but delete have this one\n"" -- join & leave don't have other fields then channel\n"" text text,\n"" topic text,\n"" deletedHashes blob, -- concatenated\n"" infos blob, -- encoded as (json?) map {k=>v, k=>v}\n""\n"" -- `post/text` | a textual chat message, posted to a channel |\n"" CONSTRAINT is_text check (\n"" type != 0 OR (channel is not null AND text is not null)\n"" ),\n"" -- `post/delete`| the deletion of a previously published post |\n"" CONSTRAINT is_delete check (\n"" type != 1 OR (deletedHashes is not null)\n"" ),\n"" -- `post/info` | set or clear informative key/value pairs on a user\n"" CONSTRAINT is_info check (\n"" type != 2 OR (infos is not null)\n"" ),\n"" -- `post/topic` | set or clear a channel's topic string\n"" CONSTRAINT is_topic check (\n"" type != 3 OR (channel is not null AND topic is not null)\n"" ),\n"" -- `post/join` | announce membership to a channel\n"" CONSTRAINT is_join check (\n"" type != 4 OR (channel is not null)\n"" ),\n"" -- `post/leave` | announce cessation of membership to a channel\n"" CONSTRAINT is_leave check (\n"" type != 5 OR (channel is not null)\n"" ),\n""\n"" foreign key(user_id) references users(id)\n"");\n""\n""create table IF NOT EXISTS links (\n"" channel text,\n"" source blob not null,\n"" parent blob not null\n""\n"" -- we will get posts for which we don't have the parent's (yet)\n"" -- foreign key(source) references posts(hash),\n"" -- foreign key(parent) references posts(hash),\n"");\n""CREATE INDEX IF NOT EXISTS parent_idx ON links(parent);\n""\n""create table IF NOT EXISTS channel_members (\n"" id integer primary key AUTOINCREMENT,\n"" channel text not null,\n"" user_id integer not null,\n"" last_hash blob not null,\n"" UNIQUE(channel, user_id)\n"");\n">>,