ZBMYYG7IVKZNNBZFGATRKXRVV2SDHIZ5FJG343POPMMK3S2PN22AC
api::client::r0::{alias::get_alias, membership::join_room_by_id, message::send_message_event},
events::{room::message::MessageEventContent, AnyMessageEventContent},
api::client::r0::{
alias::get_alias,
filter::FilterDefinition,
membership::join_room_by_id,
sync::sync_events::{self, JoinedRoom},
},
assign,
events::{room::member::MembershipState, EventType},
presence::PresenceState,
RoomId, UserId,
client
.request(send_message_event::Request::new(
&room_id,
"1",
&AnyMessageEventContent::RoomMessage(MessageEventContent::text_plain(
"Hello World!",
)),
))
.await?;
}
debug!("joined all rooms");
let mut filter = FilterDefinition::ignore_all();
let fields = [
"type".into(),
"sender".into(),
"content.membership".into(),
"state_key".into(),
];
filter.event_fields = Some(&fields);
filter.room.rooms = None;
filter.room.state.not_senders = std::slice::from_ref(&id);
filter.room.timeline.not_senders = std::slice::from_ref(&id);
filter.room.ephemeral.types = Some(&[]);
filter.room.account_data.types = Some(&[]);
let filter = filter.into();
let initial_sync = client
.request(assign!(sync_events::Request::new(), {
filter: Some(&filter),
}))
.await?;
let mut state = State::new();
for (id, room) in initial_sync.rooms.join {
state.update_room(id, room);
}
info!("synchronized");
let mut sync_stream = Box::pin(client.sync(
Some(&filter),
initial_sync.next_batch,
&PresenceState::Online,
Some(Duration::from_secs(30)),
));
while let Some(res) = sync_stream.try_next().await? {
for (id, room) in res.rooms.join {
state.update_room(id, room);
}
}
pub struct State {
rooms: HashMap<RoomId, RoomState>,
}
impl State {
fn new() -> Self {
Self {
rooms: HashMap::new(),
}
}
fn update_room(&mut self, id: RoomId, room: JoinedRoom) {
let span = error_span!("synchronizing", room = %id);
let _guard = span.enter();
let state = self.rooms.entry(id).or_insert_with(RoomState::new);
for event in room.state.events {
state.handle_event(event.json());
}
for event in room.timeline.events {
state.handle_event(event.json());
}
}
}
struct RoomState {
users: HashMap<UserId, SystemTime>,
activity: BTreeMap<SystemTime, Vec<UserId>>,
impl RoomState {
fn new() -> Self {
Self {
users: HashMap::new(),
activity: BTreeMap::new(),
}
}
fn handle_event(&mut self, event: &serde_json::value::RawValue) {
let event: AnyEvent = match serde_json::from_str(event.get()) {
Err(e) => {
error!(raw = event.get(), error = %e, "error parsing event");
return;
}
Ok(e) => e,
};
self.update(&event.sender);
if event.ty == EventType::RoomMember
&& event.content
== Some(Content {
membership: Some(MembershipState::Leave),
})
{
if let Some(key) = event.state_key {
match serde_json::from_str(key) {
Ok(user) => self.remove(&user),
Err(e) => {
error!(key, error = %e, "malformed state key for membership event");
}
}
}
}
}
fn update(&mut self, user: &UserId) {
let time = self
.users
.entry(user.clone())
.or_insert_with(SystemTime::now);
if let btree_map::Entry::Occupied(mut activity) = self.activity.entry(*time) {
activity.get_mut().retain(|x| x != user);
if activity.get().is_empty() {
activity.remove();
}
}
let now = SystemTime::now();
let idle = now.duration_since(*time).unwrap_or(Duration::new(0, 0));
*time = now;
trace!(id = %user, ?idle, "updated user");
self.activity.entry(*time).or_default().push(user.clone());
}
fn remove(&mut self, user: &UserId) {
self.users.remove(user);
}
}
#[derive(Deserialize)]
struct AnyEvent<'a> {
#[serde(rename = "type")]
ty: EventType,
sender: UserId,
content: Option<Content>,
state_key: Option<&'a str>,
}
#[derive(Deserialize, PartialEq)]
struct Content {
membership: Option<MembershipState>,
}
tracing = "0.1"
tracing-subscriber = "0.2"
tokio-stream = { version = "0.1.1", default-features = false }
serde_json = "1.0"