ZBMYYG7IVKZNNBZFGATRKXRVV2SDHIZ5FJG343POPMMK3S2PN22AC api::client::r0::{alias::get_alias, membership::join_room_by_id, message::send_message_event},events::{room::message::MessageEventContent, AnyMessageEventContent},
api::client::r0::{alias::get_alias,filter::FilterDefinition,membership::join_room_by_id,sync::sync_events::{self, JoinedRoom},},assign,events::{room::member::MembershipState, EventType},presence::PresenceState,RoomId, UserId,
client.request(send_message_event::Request::new(&room_id,"1",&AnyMessageEventContent::RoomMessage(MessageEventContent::text_plain("Hello World!",)),)).await?;
}debug!("joined all rooms");let mut filter = FilterDefinition::ignore_all();let fields = ["type".into(),"sender".into(),"content.membership".into(),"state_key".into(),];filter.event_fields = Some(&fields);filter.room.rooms = None;filter.room.state.not_senders = std::slice::from_ref(&id);filter.room.timeline.not_senders = std::slice::from_ref(&id);filter.room.ephemeral.types = Some(&[]);filter.room.account_data.types = Some(&[]);let filter = filter.into();let initial_sync = client.request(assign!(sync_events::Request::new(), {filter: Some(&filter),})).await?;let mut state = State::new();for (id, room) in initial_sync.rooms.join {state.update_room(id, room);}info!("synchronized");let mut sync_stream = Box::pin(client.sync(Some(&filter),initial_sync.next_batch,&PresenceState::Online,Some(Duration::from_secs(30)),));while let Some(res) = sync_stream.try_next().await? {for (id, room) in res.rooms.join {state.update_room(id, room);}
}pub struct State {rooms: HashMap<RoomId, RoomState>,}impl State {fn new() -> Self {Self {rooms: HashMap::new(),}}fn update_room(&mut self, id: RoomId, room: JoinedRoom) {let span = error_span!("synchronizing", room = %id);let _guard = span.enter();let state = self.rooms.entry(id).or_insert_with(RoomState::new);for event in room.state.events {state.handle_event(event.json());}for event in room.timeline.events {state.handle_event(event.json());}}}struct RoomState {users: HashMap<UserId, SystemTime>,activity: BTreeMap<SystemTime, Vec<UserId>>,
impl RoomState {fn new() -> Self {Self {users: HashMap::new(),activity: BTreeMap::new(),}}fn handle_event(&mut self, event: &serde_json::value::RawValue) {let event: AnyEvent = match serde_json::from_str(event.get()) {Err(e) => {error!(raw = event.get(), error = %e, "error parsing event");return;}Ok(e) => e,};self.update(&event.sender);if event.ty == EventType::RoomMember&& event.content== Some(Content {membership: Some(MembershipState::Leave),}){if let Some(key) = event.state_key {match serde_json::from_str(key) {Ok(user) => self.remove(&user),Err(e) => {error!(key, error = %e, "malformed state key for membership event");}}}}}fn update(&mut self, user: &UserId) {let time = self.users.entry(user.clone()).or_insert_with(SystemTime::now);if let btree_map::Entry::Occupied(mut activity) = self.activity.entry(*time) {activity.get_mut().retain(|x| x != user);if activity.get().is_empty() {activity.remove();}}let now = SystemTime::now();let idle = now.duration_since(*time).unwrap_or(Duration::new(0, 0));*time = now;trace!(id = %user, ?idle, "updated user");self.activity.entry(*time).or_default().push(user.clone());}fn remove(&mut self, user: &UserId) {self.users.remove(user);}}#[derive(Deserialize)]struct AnyEvent<'a> {#[serde(rename = "type")]ty: EventType,sender: UserId,content: Option<Content>,state_key: Option<&'a str>,}#[derive(Deserialize, PartialEq)]struct Content {membership: Option<MembershipState>,}
tracing = "0.1"tracing-subscriber = "0.2"tokio-stream = { version = "0.1.1", default-features = false }serde_json = "1.0"