T4NU6NAZKVJ2ZSJZW6OWELN2ACVN5QACFDR2LECD6RNMM4JVMMNAC
tracing_subscriber::fmt::init();
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_target(false))
.with(
tracing_subscriber::filter::EnvFilter::from_default_env()
.add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()),
)
.init();
while let Some(res) = sync_stream.try_next().await? {
for (id, room) in res.rooms.join {
state.update_room(id, room);
let update_complete = Notify::new();
let update = async {
while let Some(res) = sync_stream.try_next().await? {
for (id, room) in res.rooms.join {
state.borrow_mut().update_room(&config, id, room);
}
update_complete.notify_one();
Ok(())
let sleep = async {
loop {
let least_active = state.borrow().least_active();
if let Some((time, room, user)) = least_active {
let dt = (time + Duration::from_secs_f32(config.idle_days * 60.0 * 60.0 * 24.0))
.duration_since(SystemTime::now())
.unwrap_or(Duration::new(0, 0));
debug!(%room, %user, time = ?dt, "waiting");
tokio::select! {
_ = tokio::time::sleep(dt) => {
state.borrow_mut().rooms.get_mut(&room).unwrap().remove(&user);
if let Err(error) = client.request(kick_user::Request::new(&room, &user)).await {
error!(%room, %user, %error, "kick failed");
} else {
info!(%room, %user, "kicked");
}
}
_ = update_complete.notified() => {}
}
} else {
update_complete.notified().await;
}
}
};
loop {
tokio::select! {
result = update => { return result; }
_ = sleep => unreachable!(),
}
}
self.users.remove(user);
if let Some(time) = self.users.remove(user) {
if let btree_map::Entry::Occupied(mut activity) = self.activity.entry(time) {
activity.get_mut().retain(|x| x != user);
if activity.get().is_empty() {
activity.remove();
}
}
}
}
fn least_active(&self) -> Option<(SystemTime, &UserId)> {
let (&time, users) = self.activity.iter().next()?;
Some((time, &users[0]))