6Z6PYXTSU3DYKZKXWSZ5JZXAO2MO4KJADTYNDLUYMD4LROXRREZAC
let (db, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls)
.await
.unwrap();
let db = Arc::new(Mutex::new(db));
let dbt = tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
tokio::pin!(dbt);
let db = Arc::new(Mutex::new(None));
let db_ = db.clone();
let lead_ = lead.clone();
tokio::spawn(async move {
let mut is_normal = false;
loop {
debug!("reconnect");
if let Ok((db, connection)) =
tokio_postgres::connect("host=localhost user=postgres", NoTls).await
{
if is_normal {
is_normal = false;
}
*db_.lock().await = Some(db);
let e = connection.await;
eprintln!("connection ended: {:?}", e);
// Ignore "normal" restarts (i.e. promotes).
let db = db_.lock().await.take();
if db.is_some() {
lead_.send(false).unwrap();
*db_.lock().await = None;
} else {
debug!("normal restart, reconnecting");
is_normal = true;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
});
let mut race = tokio::spawn(race(
host.clone(),
signal.clone(),
lead.clone(),
is_leader.clone(),
));
let addr = SocketAddr::from((
[0, 0, 0, 0],
let mut race = tokio::spawn(race(host.clone(), signal.clone(), is_leader.clone()));
// let addr = SocketAddr::from((
// [0, 0, 0, 0],
// matches
// .value_of("port")
// .and_then(|x| x.parse().ok())
// .unwrap_or(8008),
// ));
let addr6 = SocketAddr::from((
[0, 0, 0, 0, 0, 0, 0, 0],
_ = &mut dbt => {
// If this database breaks, release the lock.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
info!("Restarting database connection");
let (db_, connection) = if let Ok(x) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await {
x
} else {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue
};
lead.send(false).unwrap();
dbt.set(tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
}));
*db.lock().await = db_
}
_ = &mut race => {
error!("race stopped");
e = &mut race => {
error!("race stopped: {:?}", e);
let mut client = Client::connect(["localhost:2379"], None).await.unwrap();
let lease = client.lease_grant(LEASE_DURATION, None).await?;
let id = lease.id();
debug!("id = {:?}", id);
let (mut keeper, _stream) = client.lease_keep_alive(id).await?;
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;
loop {
trace!("sending keep alive");
if let Err(e) = keeper.keep_alive().await {
error!("{:?}", e);
break;
}
tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;
}
debug!("done renewing");
});
let (mut keeper, _stream) = client.lease_keep_alive(id).await?;
let is_leader_ = is_leader.clone();
let t = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;
while *is_leader_.borrow() {
debug!("sending keep alive");
if let Err(e) = keeper.keep_alive().await {
error!("{:?}", e);
break;
}
tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;
}
debug!("done renewing");
});
let pool = pool.lock().await;
pool.execute(format!("ALTER SYSTEM SET primary_conninfo='host={} port=5432 user=replication password={}'", leader, password).as_str(), &[]).await?;
let pool_ = pool.lock().await.take();
if let Some(pool_) = pool_ {
pool_.execute(format!("ALTER SYSTEM SET primary_conninfo='host={} port=5432 user=replication password={}'", leader, password).as_str(), &[]).await?;
if leader == host {
info!("promoting");
pool_.execute("SELECT pg_promote()", &[]).await.unwrap_or(0);
// Don't replace pool_ into *pool, since the server might restart.
*pool.lock().await = Some(pool_)
} else {
*pool.lock().await = Some(pool_)
}
} else {
lead.send(false)?;
}