ZSXJNP7BGC3UR35ORVPFBXXAMDOGK2HPZB3A5JVJXM5HC55GHH3QC
3YOGOAKUFTRSFYT2BY5ZA5TRTIO3YIWT7DASZLT4QH62QZEG4IOAC
NPSWSVZNDMXYBVCBFSHZ2PVLUSX45LZVUMD6ROS3KLHNMBTYASZQC
DSKXOP5Q6NRJDD5NNEX7G3L3NHZFJZJE3TZFX25HC7ZHW2T3V6NAC
ENEREHTW2UEXJGIOSSKZA2FQKFDLWQEIOAD6NIT2EH6BJW6E3AZQC
6Z6PYXTSU3DYKZKXWSZ5JZXAO2MO4KJADTYNDLUYMD4LROXRREZAC
FIIWK33FEEUURH7QGHQPHGQHSVN7XXLDDPLI5CKKWRK2DGIAHLGAC
UZXIRPR3NPWWFX7R2PB7TI2GPHH6XYQWCY5IOFZ2ALHGSWGYI37AC
UEGA35KU63HPNZLXVR5CS2BNMCVOV7HOBTDR4K6YZSPDFWXIKKZQC
424R2VFMTHIPTPPNCGFBKHQUD3UO2CBNA3KNL3PQ5IKEJUGIJJTQC
let mut election_observer = tokio::spawn(observe_elections(
host.clone(),
port,
password.clone(),
db_name.clone(),
path.clone(),
lead.clone(),
db.clone(),
process_lock.clone(),
));
let mut race = tokio::spawn(race(host.clone(), is_leader.clone(), db.clone()));
let mut election_observer = tokio::spawn(observe_elections(host.clone(), lead.clone()));
let mut race = tokio::spawn(race(host.clone(), lease, is_leader.clone(), db.clone()));
async fn observe_elections(
async fn observe_elections(host: String, lead: Arc<Sender<String>>) -> Result<(), anyhow::Error> {
let mut client = Client::connect(["localhost:2379"], None).await?;
let mut obs = client.observe("postrep").await?;
while let Some(m) = obs.message().await? {
let leader = m.kv().unwrap().value_str()?;
debug!(
"observe_elections: leader = {:?}, host = {:?}",
leader, host
);
lead.send(leader.to_string()).unwrap();
}
Ok(())
}
async fn check_postgres(
let mut obs = client.observe("postrep").await?;
while let Some(m) = obs.message().await? {
let leader = m.kv().unwrap().value_str()?;
debug!("leader = {:?}, host = {:?}", leader, host);
loop {
is_leader_.changed().await.unwrap();
let leader = is_leader_.borrow().to_string();
async fn check_postgres(
host: String,
port: u16,
password: String,
db_name: String,
path: String,
leader: Arc<Sender<String>>,
pool: Arc<Mutex<Option<tokio_postgres::Client>>>,
process_lock: Arc<Mutex<()>>,
) -> Result<(), anyhow::Error> {
loop {
let leader = leader.borrow().to_string();
if leader != host {
let mut needs_rewind = false;
let pool_ = { pool.lock().await.take() };
if let Some(pool_) = pool_ {
needs_rewind = pool_
.query_opt("SELECT 1 FROM pg_stat_wal_receiver", &[])
.await?
.is_none();
*pool.lock().await = Some(pool_);
}
if needs_rewind {
debug!("leader: {:?}", leader);
rewind(&process_lock, &path, port, &db_name, &leader, &password).await
}
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await
}
}
[[package]]
name = "axum"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2cc6e8e8c993cb61a005fab8c1e5093a29199b7253b05a6883999312935c1ff"
dependencies = [
"async-trait",
"axum-core",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde",
"sync_wrapper",
"tokio",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4d047478b986f14a13edad31a009e2e05cb241f9805d0d75e4cba4e129ad4d"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
]
name = "tokio-util"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
name = "tower-http"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
[[package]]
name = "windows-sys"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"