use std::collections::HashSet;
use std::io::Write;
use std::path::PathBuf;
use anyhow::bail;
use indicatif::ProgressBar;
use libpijul::pristine::{Base32, MutTxnT, Position};
use libpijul::{Hash, RemoteRef};
use log::{debug, error};
pub struct Http {
pub url: String,
pub channel: String,
pub client: reqwest::Client,
pub name: String,
}
async fn download_change(
client: reqwest::Client,
url: String,
mut path: PathBuf,
c: libpijul::pristine::Hash,
) -> Result<libpijul::pristine::Hash, anyhow::Error> {
libpijul::changestore::filesystem::push_filename(&mut path, &c);
std::fs::create_dir_all(&path.parent().unwrap())?;
let path_ = path.with_extension("tmp");
let mut f = std::fs::File::create(&path_)?;
libpijul::changestore::filesystem::pop_filename(&mut path);
let c32 = c.to_base32();
let url = format!("{}/{}", url, super::DOT_DIR);
let mut delay = 1f64;
loop {
let mut res = if let Ok(res) = client.get(&url).query(&[("change", &c32)]).send().await {
delay = 1f64;
res
} else {
error!("HTTP error, retrying in {} seconds", delay.round());
tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
delay *= 2.;
continue;
};
debug!("response {:?}", res);
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
let done = loop {
match res.chunk().await {
Ok(Some(chunk)) => {
debug!("writing {:?}", chunk.len());
f.write_all(&chunk)?;
}
Ok(None) => break true,
Err(_) => {
error!("Error while downloading {:?}, retrying", url);
tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
delay *= 2.;
break false;
}
}
};
if done {
std::fs::rename(&path_, &path_.with_extension("change"))?;
break;
}
}
Ok(c)
}
const POOL_SIZE: usize = 20;
impl Http {
pub async fn download_changes(
&mut self,
hashes: &[libpijul::pristine::Hash],
send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
path: &PathBuf,
_full: bool,
) -> Result<(), anyhow::Error> {
let progress = ProgressBar::new(hashes.len() as u64);
progress.set_style(
indicatif::ProgressStyle::default_bar()
.template(" Downloading changes {wide_bar} {pos:>5}/{len}"),
);
let progress = super::PROGRESS.add(progress);
let mut pool = <[_; POOL_SIZE]>::default();
let mut cur = 0;
for c in hashes {
debug!("downloading {:?}", c);
progress.inc(1);
let t = std::mem::replace(
&mut pool[cur],
Some(tokio::spawn(download_change(
self.client.clone(),
self.url.clone(),
path.clone(),
*c,
))),
);
if let Some(t) = t {
let c = t.await??;
if send.send(c).await.is_err() {
debug!("err for {:?}", c);
progress.abandon();
break;
}
}
cur = (cur + 1) % POOL_SIZE;
}
for f in 0..POOL_SIZE {
if let Some(t) = pool[(cur + f) % POOL_SIZE].take() {
let c = t.await??;
if send.send(c).await.is_err() {
debug!("err for {:?}", c);
progress.abandon();
break;
}
}
}
if !progress.is_finished() {
progress.set_style(
indicatif::ProgressStyle::default_bar()
.template("✓ Downloading changes {pos:>5}/{len}"),
);
progress.finish();
}
Ok(())
}
pub async fn upload_changes(
&self,
mut local: PathBuf,
to_channel: Option<&str>,
changes: &[libpijul::Hash],
) -> Result<(), anyhow::Error> {
let progress = ProgressBar::new(changes.len() as u64);
progress.set_style(
indicatif::ProgressStyle::default_bar()
.template(" Uploading changes {wide_bar} {pos:>5}/{len}"),
);
for c in changes {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
let url = self.url.clone() + "/" + super::DOT_DIR;
let change = std::fs::read(&local)?;
let mut to_channel = if let Some(ch) = to_channel {
vec![("to_channel", ch)]
} else {
Vec::new()
};
let c = c.to_base32();
to_channel.push(("apply", &c));
debug!("url {:?} {:?}", url, to_channel);
self.client
.post(&url)
.query(&to_channel)
.body(change)
.send()
.await?;
progress.inc(1);
libpijul::changestore::filesystem::pop_filename(&mut local);
}
progress.set_style(
indicatif::ProgressStyle::default_bar().template("✓ Uploading changes {pos:>5}/{len}"),
);
progress.finish();
Ok(())
}
pub async fn download_changelist<T: MutTxnT>(
&self,
txn: &mut T,
remote: &mut RemoteRef<T>,
from: u64,
paths: &[String],
) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
let url = self.url.clone() + "/" + super::DOT_DIR;
let from_ = from.to_string();
let mut query = vec![("changelist", &from_), ("channel", &self.channel)];
for p in paths.iter() {
query.push(("path", p));
}
let res = self.client.get(&url).query(&query).send().await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
let resp = res.bytes().await?;
let mut result = HashSet::new();
if let Ok(data) = std::str::from_utf8(&resp) {
for l in data.lines() {
if !l.is_empty() {
match super::parse_line(l)? {
super::ListLine::Change { n, m, h } => {
txn.put_remote(remote, n, (h, m))?;
}
super::ListLine::Position(pos) => {
result.insert(pos);
}
super::ListLine::Error(e) => {
let mut stderr = std::io::stderr();
writeln!(stderr, "{}", e)?;
}
}
} else {
break;
}
}
}
Ok(result)
}
pub async fn get_state(
&mut self,
mid: Option<u64>,
) -> Result<Option<(u64, libpijul::Merkle)>, anyhow::Error> {
debug!("get_state {:?}", self.url);
let url = format!("{}/{}", self.url, super::DOT_DIR);
let q = if let Some(mid) = mid {
[
("state", format!("{}", mid)),
("channel", self.channel.clone()),
]
} else {
[("state", String::new()), ("channel", self.channel.clone())]
};
let res = self.client.get(&url).query(&q).send().await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
let resp = res.bytes().await?;
let resp = std::str::from_utf8(&resp)?;
debug!("resp = {:?}", resp);
let mut s = resp.split(' ');
if let (Some(n), Some(m)) = (
s.next().and_then(|s| s.parse().ok()),
s.next()
.and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
) {
Ok(Some((n, m)))
} else {
Ok(None)
}
}
pub async fn archive<W: std::io::Write + Send + 'static>(
&mut self,
prefix: Option<String>,
state: Option<(libpijul::Merkle, &[Hash])>,
mut w: W,
) -> Result<u64, anyhow::Error> {
let url = self.url.clone() + "/" + super::DOT_DIR;
let res = self.client.get(&url).query(&[("channel", &self.channel)]);
let res = if let Some((ref state, ref extra)) = state {
let mut q = vec![("archive".to_string(), state.to_base32())];
if let Some(pre) = prefix {
q.push(("outputPrefix".to_string(), pre));
}
for e in extra.iter() {
q.push(("change".to_string(), e.to_base32()))
}
res.query(&q)
} else {
res
};
let res = res.send().await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
use futures_util::StreamExt;
let mut stream = res.bytes_stream();
let mut conflicts = 0;
let mut n = 0;
while let Some(item) = stream.next().await {
let item = item?;
let mut off = 0;
while n < 8 && off < item.len() {
conflicts = (conflicts << 8) | (item[off] as u64);
off += 1;
n += 1
}
w.write_all(&item[off..])?;
}
Ok(conflicts as u64)
}
}