}
pub async fn get_state(
&mut self,
mid: Option<u64>,
) -> Result<Option<(u64, libpijul::Merkle)>, anyhow::Error> {
debug!("get_state {:?}", self.url);
let url = format!("{}/{}", self.url, super::DOT_DIR);
let q = if let Some(mid) = mid {
[
("state", format!("{}", mid)),
("channel", self.channel.clone()),
]
} else {
[("state", String::new()), ("channel", self.channel.clone())]
};
let res = self.client.get(&url).query(&q).send().compat().await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
let resp = res.bytes().compat().await?;
let resp = std::str::from_utf8(&resp)?;
debug!("resp = {:?}", resp);
let mut s = resp.split(' ');
if let (Some(n), Some(m)) = (
s.next().and_then(|s| s.parse().ok()),
s.next()
.and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
) {
Ok(Some((n, m)))
} else {
Ok(None)
}
}
pub async fn archive<W: std::io::Write + Send + 'static>(
&mut self,
prefix: Option<String>,
state: Option<(libpijul::Merkle, &[Hash])>,
mut w: W,
) -> Result<u64, anyhow::Error> {
let url = self.url.clone() + "/" + super::DOT_DIR;
let res = self.client.get(&url).query(&[("channel", &self.channel)]);
let res = if let Some((ref state, ref extra)) = state {
let mut q = vec![("archive".to_string(), state.to_base32())];
if let Some(pre) = prefix {
q.push(("outputPrefix".to_string(), pre));
}
for e in extra.iter() {
q.push(("change".to_string(), e.to_base32()))
}
res.query(&q)
} else {
res
};
let res = res.send().compat().await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
use futures_util::StreamExt;
let mut stream = res.bytes_stream();
let mut conflicts = 0;
let mut n = 0;
while let Some(item) = stream.next().compat().await {
let item = item?;
let mut off = 0;
while n < 8 && off < item.len() {
conflicts = (conflicts << 8) | (item[off] as u64);
off += 1;
n += 1
}
w.write_all(&item[off..])?;
}
Ok(conflicts as u64)