-module(cabal_db).
-export([open/1, close/1]).
-export([save_post/3, save_post/2, load_post/2, has_post/2]).
-export([get_text_hashes_by_time_range/5, get_texts_for_channel/2]).
-export([channels_join/2, channels_leave/2, channels_list/1, channels_known/1]).
-export([get_channel_state/2, get_channel_members/2, get_channel_heads/2]).
-export([peer_add/2, peer_update/3, peer_list/1, peer_delete/2, peer_get/2]).
-export([unfetched_get/3, unfetched_count/2, unfetched_add/4, unfetched_delete/2]).
-export([get_oldest_timestamp/2]).
-include_lib("kernel/include/file.hrl").
open(Path) ->
DbPath =
case file:read_file_info(Path) of
{ok, #file_info{type = regular}} -> Path;
_ -> filename:join(Path, "sqlite.db")
end,
{ok, Db} = sqlite3:open(list_to_atom(DbPath), [{file, DbPath}]),
ok = sqlite3:sql_exec(Db, "PRAGMA foreign_keys = ON;"),
io:format("[DB] Path: ~p~n", [DbPath]),
ok = init_tables(Db),
{ok, Db}.
close(Db) ->
sqlite3:close(Db).
save_post(Db, Data) when is_binary(Data) ->
Post = cabal_posts:decode(Data),
save_post(Db, Post, Data).
save_post(Db, Post, Data) when is_binary(Data) ->
[Header, Body] = Post,
{value, {links, Links}, HeaderWOLinks} = lists:keytake(links, 1, Header),
{value, {public_key, PubKey}, HeaderTrimmed} = lists:keytake(public_key, 1, HeaderWOLinks),
PostHash = proplists:get_value(hash, Header),
LinkChan = proplists:get_value(channel, Body, null),
lists:foreach(
fun(Link) ->
Row =
[{source, {blob, PostHash}}, {parent, {blob, Link}}] ++
case LinkChan of
null -> [];
_ -> [{channel, LinkChan}]
end,
{rowid, _EdgeId} = sqlite3:write(Db, links, Row),
io:format("[DB/Debug] Inserted Edge: ~p~n", [_EdgeId])
end,
Links
),
UserId = get_or_create_user_id(Db, PubKey),
PostInsert =
[{user_id, UserId}, {raw_post, Data}] ++ HeaderTrimmed ++
case proplists:get_value(type, Header) of
1 ->
Concat = iolist_to_binary(proplists:get_value(hashes, Body)),
[{deletedHashes, Concat}];
2 ->
Chan = proplists:get_value(channel, Body),
Infos = proplists:get_value(infos, Body),
[{channel, Chan}, {infos, {blob, jsone:encode(Infos)}}];
_ ->
Body
end,
Prepared = lists:map(
fun({Col, Val}) ->
X =
case Val of
_ when is_binary(Val) -> {blob, Val};
_ -> Val
end,
{Col, X}
end,
PostInsert
),
{rowid, Id} = sqlite3:write(Db, posts, Prepared),
materialize_views(Db, Post),
{ok, Id, PostHash}.
load_post(Db, IntId) when is_integer(IntId), IntId >= 0 ->
Res = sqlite3:read(Db, posts, {id, IntId}),
unpackPost(Db, Res);
load_post(Db, Hash) when is_binary(Hash) ->
Res = sqlite3:read(Db, posts, {hash, {blob, Hash}}),
unpackPost(Db, Res).
has_post(Db, Hash) when is_binary(Hash) ->
Qry = "SELECT count(*) as count from posts where hash = ?",
[
{columns, ["count"]},
{rows, [{Count}]}
] = sqlite3:sql_exec(Db, Qry, [{blob, Hash}]),
Result =
case Count of
0 -> false;
1 -> true;
Other -> throw({unexpectedCount, Other})
end,
Result.
get_texts_for_channel(Db, Chan) ->
Qry =
"SELECT user_id, timestamp, text from posts where channel = ? and type = 0 order by timestamp asc",
QryChan = {blob, list_to_binary(Chan)},
[{columns, ["user_id", "timestamp", "text"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Qry, [
QryChan
]),
QryUser = "SELECT name, public_key FROM users where id = ?",
F = fun({UserId, Ts, {blob, TextBin}}, {Texts, Users}) ->
NewUsers =
case maps:is_key(UserId, Users) of
true ->
Users;
false ->
[
{columns, ["name", "public_key"]},
{rows, [{NameBin, {blob, PubKey}}]}
] = sqlite3:sql_exec(Db, QryUser, [UserId]),
Name = unicode:characters_to_list(NameBin),
Users#{UserId => {Name, PubKey}}
end,
Text = unicode:characters_to_list(TextBin),
{Texts ++ [{UserId, Ts, Text}], NewUsers}
end,
Result = lists:foldl(F, {[], #{}}, Rows),
{ok, Result}.
get_text_hashes_by_time_range(Db, Chan, Start, End, Limit) ->
ActualEnd =
case End of
0 -> 6626966400000;
_ -> End
end,
ActualLimit =
case Limit of
0 -> 9001;
_ -> Limit
end,
Qry =
"SELECT hash from posts where channel = ? and timestamp >= ? and timestamp <= ? and type = 0 order by timestamp desc limit ?",
QryChan = {blob, list_to_binary(Chan)},
[{columns, ["hash"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Qry, [
QryChan,
Start,
ActualEnd,
ActualLimit
]),
Result = [Hash || {{blob, Hash}} <- Rows],
{ok, Result}.
channels_join(Db, Name) ->
case sqlite3:write(Db, channels, [{name, Name}, {topic, ""}]) of
{rowid, _RowId} -> ok;
{error, 19, _Msg} -> ok
end.
channels_leave(Db, Name) ->
sqlite3:delete(Db, channels, {name, Name}).
channels_list(Db) ->
[{columns, ["name", "topic"]}, {rows, Rows}] = sqlite3:read_all(Db, channels),
Chans = lists:map(
fun({Name, _Topic}) ->
binary_to_list(Name)
end,
Rows
),
{ok, Chans}.
channels_known(Db) ->
Query = "SELECT DISTINCT channel FROM posts WHERE channel IS NOT NULL ORDER BY channel",
[{columns, ["channel"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Query),
Chans = lists:map(
fun({ChannelData}) ->
ChannelBin =
case ChannelData of
{blob, B} -> B;
B when is_binary(B) -> B
end,
binary_to_list(ChannelBin)
end,
Rows
),
{ok, Chans}.
get_channel_members(Db, Chan) ->
TopicQry = "SELECT topic from channels where name = ?",
[{columns, ["topic"]}, {rows, [{TopicBin}]}] = sqlite3:sql_exec(Db, TopicQry, [Chan]),
Topic = unicode:characters_to_list(TopicBin),
MembersQry =
"SELECT u.name as name, u.public_key as pub_key, p.timestamp as timestamp, p.type as type " ++
"FROM channel_members m " ++
"JOIN posts p ON m.last_hash = p.hash " ++
"JOIN users u ON m.user_id = u.id " ++
"WHERE m.channel = ?",
[
{columns, ["name", "pub_key", "timestamp", "type"]},
{rows, Rows}
] = sqlite3:sql_exec(Db, MembersQry, [Chan]),
M = lists:foldl(
fun(Row, Acc) ->
{NameBin, {blob, PubKey}, When, PostType} = Row,
Name = unicode:characters_to_list(NameBin),
HasLeft =
case PostType of
4 -> false;
5 -> true
end,
Acc#{PubKey => {Name, When, HasLeft}}
end,
#{},
Rows
),
Result = [
{topic, Topic},
{members, M}
],
{ok, Result}.
get_channel_state(Db, Chan) ->
Qry = "SELECT hash, user_id from posts where channel = ? and type > 2 order by timestamp",
QryChan = {blob, list_to_binary(Chan)},
[{columns, ["hash", "user_id"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Qry, [QryChan]),
JoinLeaveHashes = [Hash || {{blob, Hash}, _} <- Rows],
InfoPostQry =
"SELECT hash from posts where user_id = ? and type = 2 order by timestamp desc limit 1",
InfosHashes = lists:foldl(
fun({_, UserId}, Acc) ->
[
{columns, ["hash"]},
{rows, InfoRows}
] = sqlite3:sql_exec(Db, InfoPostQry, [UserId]),
case InfoRows of
[{{blob, H}}] -> sets:add_element(H, Acc);
[] -> Acc
end
end,
sets:new(),
Rows
),
Result = JoinLeaveHashes ++ sets:to_list(InfosHashes),
{ok, Result}.
get_channel_heads(Db, Chan) ->
Qry =
"SELECT DISTINCT source FROM links " ++
"WHERE source NOT IN (" ++
" SELECT DISTINCT parent FROM links" ++
" WHERE channel = ? or channel is null)",
QryChan =
case Chan of
null -> null;
_ when is_list(Chan) -> {blob, list_to_binary(Chan)}
end,
[{columns, ["source"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Qry, [QryChan]),
Result = [Hash || {{blob, Hash}} <- Rows],
{ok, Result}.
peer_add(Db, Address) ->
Now = os:system_time(millisecond),
Row = [
{address, Address},
{score, 0},
{attempt_count, 0},
{created_at, Now}
],
case sqlite3:write(Db, peers, Row) of
{rowid, RowId} -> {ok, RowId};
{error, 19, _Msg} -> {error, already_exists}
end.
peer_update(Db, Address, Updates) ->
case sqlite3:update(Db, peers, {address, Address}, Updates) of
ok -> ok;
Error -> {error, Error}
end.
peer_list(Db) ->
[{columns, _Cols}, {rows, Rows}] = sqlite3:read_all(Db, peers),
Peers =
case length(Rows) of
0 ->
[];
_ ->
lists:map(
unpack_peer,
Rows
)
end,
{ok, Peers}.
peer_delete(Db, Address) ->
sqlite3:delete(Db, peers, {address, Address}).
peer_get(Db, Address) ->
case sqlite3:read(Db, peers, {address, Address}) of
[
{columns, _Cols},
{rows, [Row]}
] ->
{ok, unpack_peer(Row)};
_ ->
not_found
end.
get_oldest_timestamp(Db, Channel) ->
ChannelBin = {blob, list_to_binary(Channel)},
Query = "SELECT MIN(timestamp) as oldest FROM posts WHERE channel = ? AND type = 0",
case sqlite3:sql_exec(Db, Query, [ChannelBin]) of
[{columns, ["oldest"]}, {rows, [{Oldest}]}] when
Oldest =/= undefined, Oldest =/= null
->
{ok, Oldest};
_ ->
{ok, undefined}
end.
unfetched_get(Db, Channel, Limit) ->
ChannelBin = list_to_binary(Channel),
Query =
"SELECT hash FROM unfetched_hashes WHERE channel = ? ORDER BY discovered_at ASC LIMIT ?",
[{columns, ["hash"]}, {rows, Rows}] = sqlite3:sql_exec(Db, Query, [ChannelBin, Limit]),
Hashes = [Hash || {{blob, Hash}} <- Rows],
{ok, Hashes}.
unfetched_count(Db, Channel) ->
ChannelBin = list_to_binary(Channel),
Query = "SELECT count(*) as count FROM unfetched_hashes WHERE channel = ?",
[{columns, ["count"]}, {rows, [{Count}]}] = sqlite3:sql_exec(Db, Query, [ChannelBin]),
{ok, Count}.
unfetched_add(Db, Hashes, Channel, Source) ->
Now = os:system_time(millisecond),
ChannelBin = list_to_binary(Channel),
SourceBin = list_to_binary(Source),
Results = lists:map(
fun(Hash) ->
Row = [
{hash, {blob, Hash}},
{channel, ChannelBin},
{discovered_at, Now},
{source, SourceBin}
],
case sqlite3:write(Db, unfetched_hashes, Row) of
{rowid, _RowId} -> ok;
{error, 19, _Msg} -> already_exists
end
end,
Hashes
),
{ok, Results}.
unfetched_delete(Db, Hashes) ->
lists:foreach(
fun(Hash) ->
ok = sqlite3:delete(Db, unfetched_hashes, {hash, {blob, Hash}})
end,
Hashes
),
ok.
materialize_views(Db, Post) ->
[
[
{public_key, PubKey},
{links, _Links},
{type, Type},
{timestamp, _TimeStamp},
{hash, Hash}
],
Body
] = Post,
UserId = get_or_create_user_id(Db, PubKey),
case Type of
0 ->
ok;
1 ->
[{hashes, Hashes}] = Body,
lists:foreach(
fun(H) ->
ok = sqlite3:delete(Db, posts, {hash, {blob, H}})
end,
Hashes
),
ok;
2 ->
[{infos, InfoMap}] = Body,
{BinName, #{}} = maps:take(<<"name">>, InfoMap),
Name = unicode:characters_to_list(BinName),
ok = sqlite3:update(Db, users, {id, UserId}, [{name, Name}]),
ok;
3 ->
[{channel, Chan}, {topic, TopicBin}] = Body,
Topic = unicode:characters_to_list(TopicBin),
ok = sqlite3:update(Db, channels, {name, Chan}, [{topic, Topic}]),
ok;
4 ->
[{channel, Chan}] = Body,
Qry =
"INSERT INTO channel_members(channel, user_id, last_hash) VALUES (?, ?, ?) " ++
"ON CONFLICT(channel, user_id) DO UPDATE SET last_hash=excluded.last_hash",
Res = sqlite3:sql_exec(Db, Qry, [Chan, UserId, {blob, Hash}]),
{rowid, _} = Res,
ok;
5 ->
[{channel, Chan}] = Body,
Qry = "UPDATE channel_members set last_hash = ? where channel = ? and user_id = ?",
ok = sqlite3:sql_exec(Db, Qry, [{blob, Hash}, Chan, UserId]),
ok
end.
get_or_create_user_id(Db, PubKey) ->
Qry = "INSERT INTO users (public_key, name) VALUES (?, '') ON CONFLICT DO NOTHING;",
{rowid, _RowId} = sqlite3:sql_exec(Db, Qry, [{blob, PubKey}]),
[
{columns, ["id", "public_key", "name"]},
{rows, [{UserId, {blob, PubKey}, _Name}]}
] = sqlite3:read(Db, users, {public_key, {blob, PubKey}}),
UserId.
unpackPost(Db, ReadResult) ->
ExpectedCols = [
"id",
"hash",
"type",
"timestamp",
"user_id",
"raw_post",
"channel",
"text",
"topic",
"deletedHashes",
"infos"
],
case ReadResult of
[
{columns, ExpectedCols},
{rows, [Row]}
] ->
{ok, unpackPostRow(Db, Row)};
_Other ->
notFound
end.
unpackPostRow(Db, Row) ->
{_RowId, {blob, Hash}, Type, Timestamp, UserId, {blob, RawPost}, ChannelBlob, TextBlob,
TopicBlob, Deleteds, InfosBlob} = Row,
[
{columns, ["channel", "source", "parent"]},
{rows, LinkRows}
] = sqlite3:read(Db, links, {source, {blob, Hash}}),
Links = [Parent || {_, {blob, _Source}, {blob, Parent}} <- LinkRows],
Qry = "SELECT public_key from users where id = ?",
[
{columns, ["public_key"]},
{rows, [{{blob, PubKey}}]}
] = sqlite3:sql_exec(Db, Qry, [UserId]),
Header = [
{public_key, PubKey},
{links, Links},
{type, Type},
{timestamp, Timestamp},
{hash, Hash}
],
Body =
case Type of
0 ->
{blob, Text} = TextBlob,
{blob, Channel} = ChannelBlob,
[{channel, Channel}, {text, Text}];
1 ->
{blob, DeletedConcat} = Deleteds,
[{hashes, split_concated_hashes(DeletedConcat)}];
2 ->
{blob, InfosJson} = InfosBlob,
[{infos, jsone:decode(InfosJson)}];
3 ->
{blob, Topic} = TopicBlob,
{blob, Channel} = ChannelBlob,
[{channel, Channel}, {topic, Topic}];
4 ->
{blob, Channel} = ChannelBlob,
[{channel, Channel}];
5 ->
{blob, Channel} = ChannelBlob,
[{channel, Channel}]
end,
[Header, Body, RawPost].
unpack_peer(Row) ->
{Id, Addr, Score, LastSeen, LastAttempt, AttemptCount, CreatedAt, Notes} = Row,
#{
id => Id,
address => binary_to_list(Addr),
score =>
case Score of
null -> 0;
S -> S
end,
last_seen =>
case LastSeen of
null -> 0;
LS -> LS
end,
last_attempt =>
case LastAttempt of
null -> 0;
LA -> LA
end,
attempt_count =>
case AttemptCount of
null -> 0;
AC -> AC
end,
created_at => CreatedAt,
notes =>
case Notes of
null -> undefined;
N -> binary_to_list(N)
end
}.
init_tables(Db) ->
Schema =
<<
"\n"
"create table IF NOT EXISTS posts (\n"
" -- these exist for all posts\n"
" --\n"
" id integer primary key AUTOINCREMENT,\n"
" hash blob unique not null check (length(hash) = 32),\n"
" type integer not null check (type >= 0 AND type <= 5 ),\n"
" -- milliseconds since UNIX Epoch\n"
" timestamp integer not null check (timestamp > 0),\n"
"\n"
" user_id integer not null,\n"
"\n"
" raw_post blob not null,\n"
"\n"
"\n"
" -- depends on postType\n"
" channel text, -- all but delete have this one\n"
" -- join & leave don't have other fields then channel\n"
" text text,\n"
" topic text,\n"
" deletedHashes blob, -- concatenated\n"
" infos blob, -- encoded as (json?) map {k=>v, k=>v}\n"
"\n"
" -- `post/text` | a textual chat message, posted to a channel |\n"
" CONSTRAINT is_text check (\n"
" type != 0 OR (channel is not null AND text is not null)\n"
" ),\n"
" -- `post/delete`| the deletion of a previously published post |\n"
" CONSTRAINT is_delete check (\n"
" type != 1 OR (deletedHashes is not null)\n"
" ),\n"
" -- `post/info` | set or clear informative key/value pairs on a user\n"
" CONSTRAINT is_info check (\n"
" type != 2 OR (infos is not null)\n"
" ),\n"
" -- `post/topic` | set or clear a channel's topic string\n"
" CONSTRAINT is_topic check (\n"
" type != 3 OR (channel is not null AND topic is not null)\n"
" ),\n"
" -- `post/join` | announce membership to a channel\n"
" CONSTRAINT is_join check (\n"
" type != 4 OR (channel is not null)\n"
" ),\n"
" -- `post/leave` | announce cessation of membership to a channel\n"
" CONSTRAINT is_leave check (\n"
" type != 5 OR (channel is not null)\n"
" ),\n"
"\n"
" foreign key(user_id) references users(id)\n"
");\n"
"\n"
"create table IF NOT EXISTS links (\n"
" channel text,\n"
" source blob not null,\n"
" parent blob not null\n"
"\n"
" -- we will get posts for which we don't have the parent's (yet)\n"
" -- foreign key(source) references posts(hash),\n"
" -- foreign key(parent) references posts(hash),\n"
");\n"
"CREATE INDEX IF NOT EXISTS parent_idx ON links(parent);\n"
"\n"
"create table IF NOT EXISTS channel_members (\n"
" id integer primary key AUTOINCREMENT,\n"
" channel text not null,\n"
" user_id integer not null,\n"
" last_hash blob not null,\n"
" UNIQUE(channel, user_id)\n"
");\n"
>>,
Oks = sqlite3:sql_exec_script(Db, Schema),
[] = [V || V <- Oks, V =/= ok],
io:format("[DB] schema.sql applied~n"),
ColId = {id, integer, [{primary_key, [asc, autoincrement]}]},
Tables = #{
channels => [{name, text, [primary_key]}, {topic, text, not_null}],
users => [ColId, {public_key, blob, [unique, not_null]}, {name, text, not_null}],
peers => [
ColId,
{address, text, [unique, not_null]},
{score, integer, {default, 0}},
{last_seen, integer},
{last_attempt, integer},
{attempt_count, integer, {default, 0}},
{created_at, integer, not_null},
{notes, text}
],
unfetched_hashes => [
{hash, blob, [primary_key]},
{channel, text, not_null},
{discovered_at, integer, not_null},
{source, text, not_null}
]
},
case tables_exist(Db, maps:keys(Tables)) of
missing ->
CreateTable = fun(T, Cols) ->
ok = sqlite3:create_table(Db, T, Cols),
io:format("[DB] Created table: ~p~n", [T])
end,
maps:foreach(CreateTable, Tables);
ok ->
ok
end.
tables_exist(Db, [T | Rest]) ->
case lists:member(T, sqlite3:list_tables(Db)) of
true -> tables_exist(Db, Rest);
false -> missing
end;
tables_exist(_, []) ->
ok.
split_concated_hashes(Hashes) when is_binary(Hashes) ->
split_concated_hashes(Hashes, []).
split_concated_hashes(Bin, Acc) when byte_size(Bin) > 0 ->
<<H:32/binary, Rest/binary>> = Bin,
split_concated_hashes(Rest, Acc ++ [H]);
split_concated_hashes(<<>>, Acc) ->
Acc.