UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQC state: Arc<Mutex<State>>,}enum State {None,State {sender: Option<tokio::sync::oneshot::Sender<Option<(u64, Merkle)>>>,},Changes {sender: Option<tokio::sync::mpsc::Sender<Hash>>,remaining_len: usize,file: std::fs::File,path: PathBuf,hashes: Vec<libpijul::pristine::Hash>,current: usize,},Changelist {sender: tokio::sync::mpsc::Sender<Option<(u64, Hash, Merkle)>>,},Channel {sender: tokio::sync::mpsc::Sender<Vec<u8>>,},Archive {sender: Option<tokio::sync::oneshot::Sender<u64>>,len: u64,conflicts: u64,len_n: u64,w: Box<dyn Write + Send>,},
type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
type FutureUnit = Pin<Box<dyn futures::future::Future<Output = Result<(Self, Session), anyhow::Error>> + Send>,>;// type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
}fn data(self,channel: thrussh::ChannelId,data: &[u8],session: thrussh::client::Session,) -> Self::FutureUnit {debug!("data {:?} {:?}", channel, data.len());let data = data.to_vec();Box::pin(async move {match *self.state.lock().await {State::State { ref mut sender } => {debug!("state: State");if let Some(sender) = sender.take() {// If we can't parse `data` (for example if the// remote returns the standard "-\n"), this// returns None.let mut s = std::str::from_utf8(&data).unwrap().split(' ');debug!("s = {:?}", s);if let (Some(n), Some(m)) = (s.next(), s.next()) {let n = n.parse().unwrap();sender.send(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap()))).unwrap_or(());} else {sender.send(None).unwrap_or(());}}}State::Changes {ref mut sender,ref mut remaining_len,ref mut file,ref mut path,ref hashes,ref mut current,} => {debug!("state changes");let mut p = 0;while p < data.len() {if *remaining_len == 0 {*remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;p += 8;debug!("remaining_len = {:?}", remaining_len);}if data.len() >= p + *remaining_len {file.write_all(&data[p..p + *remaining_len])?;// We have enough data to write the// file, write it and move to the next// file.p += *remaining_len;*remaining_len = 0;file.flush()?;let mut final_path = path.clone();final_path.set_extension("change");debug!("moving {:?} to {:?}", path, final_path);std::fs::rename(&path, &final_path)?;debug!("sending");if let Some(ref mut sender) = sender {sender.send(hashes[*current].clone()).await.unwrap();}debug!("sent");*current += 1;if *current < hashes.len() {// If we're still waiting for// another change.libpijul::changestore::filesystem::pop_filename(path);libpijul::changestore::filesystem::push_filename(path,&hashes[*current],);std::fs::create_dir_all(&path.parent().unwrap())?;path.set_extension("");debug!("creating file {:?}", path);*file = std::fs::File::create(&path)?;} else {// Else, just finish.debug!("dropping channel");*sender = None;break;}} else {// not enough data, we need more.file.write_all(&data[p..])?;file.flush()?;*remaining_len -= data.len() - p;debug!("need more data");break;}}debug!("finished, {:?} {:?}", p, data.len());}State::Changelist { ref mut sender } => {debug!("state changelist");if &data[..] == b"\n" {debug!("log done");sender.send(None).await.unwrap_or(())} else if let Ok(data) = std::str::from_utf8(&data) {for l in data.lines() {if !l.is_empty() {debug!("line = {:?}", l);let (n, h, m) = parse_line(l)?;sender.send(Some((n, h, m))).await.unwrap_or(());} else {sender.send(None).await.unwrap_or(());}}}}State::Channel { ref mut sender } => sender.send(data).await?,State::Archive { ref mut sender, ref mut w, ref mut len, ref mut len_n, ref mut conflicts } => {let mut off = 0;while *len_n < 16 && off < data.len() {if *len_n < 8 {*len = (*len << 8) | (data[off] as u64);} else {*conflicts = (*conflicts << 8) | (data[off] as u64);}*len_n += 1;off += 1;}if *len_n >= 16 {w.write_all(&data[off..])?;*len -= (data.len() - off) as u64;if *len == 0 {if let Some(sender) = sender.take() {sender.send(*conflicts).unwrap_or(())}}}}State::None => {debug!("None state");}}Ok((self, session))})
while let Some(msg) = self.c.wait().await {match msg {thrussh::ChannelMsg::Data { data } => {// If we can't parse `data` (for example if the// remote returns the standard "-\n"), this// returns None.let mut s = std::str::from_utf8(&data)?.split(' ');debug!("s = {:?}", s);if let (Some(n), Some(m)) = (s.next(), s.next()) {let n = n.parse().unwrap();return Ok(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap())));} else {break;}}thrussh::ChannelMsg::ExtendedData { data, ext } => {if ext == 1 {debug!("{:?}", std::str::from_utf8(&data))}}thrussh::ChannelMsg::Eof => {}thrussh::ChannelMsg::ExitStatus { exit_status } => {if exit_status != 0 {return Err((Error::RemoteExit {status: exit_status,}).into());}}msg => panic!("wrong message {:?}", msg),}}Ok(None)
Ok(receiver.await?)
}let mut len = 0;let mut conflicts = 0;let mut len_n = 0;while let Some(msg) = self.c.wait().await {match msg {thrussh::ChannelMsg::Data { data } => {let mut off = 0;while len_n < 16 && off < data.len() {if len_n < 8 {len = (len << 8) | (data[off] as u64);} else {conflicts = (conflicts << 8) | (data[off] as u64);}len_n += 1;off += 1;}if len_n >= 16 {w.write_all(&data[off..])?;len -= (data.len() - off) as u64;if len == 0 {break;}}}thrussh::ChannelMsg::ExtendedData { data, ext } => {if ext == 1 {debug!("{:?}", std::str::from_utf8(&data))}}thrussh::ChannelMsg::Eof => {}thrussh::ChannelMsg::ExitStatus { exit_status } => {if exit_status != 0 {return Err((Error::RemoteExit {status: exit_status,}).into());}}msg => panic!("wrong message {:?}", msg),}
'msg: while let Some(msg) = self.c.wait().await {debug!("msg = {:?}", msg);match msg {thrussh::ChannelMsg::Data { data } => {if &data[..] == b"\n" {debug!("log done");break;} else if let Ok(data) = std::str::from_utf8(&data) {for l in data.lines() {if !l.is_empty() {debug!("line = {:?}", l);let (n, h, m) = parse_line(l)?;txn.put_remote(remote, n, (h, m))?;} else {break 'msg;}}}}thrussh::ChannelMsg::ExtendedData { data, ext } => {debug!("{:?} {:?}", ext, std::str::from_utf8(&data[..]));/*return Err((crate::Error::Remote {msg: std::str::from_utf8(&data[..]).unwrap().to_string()}).into())*/}thrussh::ChannelMsg::WindowAdjusted { .. } => {}thrussh::ChannelMsg::Eof => {}thrussh::ChannelMsg::ExitStatus { exit_status } => {if exit_status != 0 {return Err((Error::RemoteExit {status: exit_status,}).into());}}msg => panic!("wrong message {:?}", msg),}
while let Some(Some((n, h, m))) = receiver.recv().await {txn.put_remote(remote, n, (h, m))?;
self.run_protocol().await?;debug!("download_change {:?}", full);if full {self.c.data(format!("change {}\n", c.to_base32()).as_bytes()).await?;} else {self.c.data(format!("partial {}\n", c.to_base32()).as_bytes()).await?;}Ok(())}pub async fn wait_downloads(&mut self,changes_dir: &Path,hashes: &[libpijul::pristine::Hash],send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,) -> Result<(), anyhow::Error> {debug!("wait_downloads");if !self.is_running {return Ok(());}let mut remaining_len = 0;let mut current: usize = 0;let mut path = changes_dir.to_path_buf();libpijul::changestore::filesystem::push_filename(&mut path, &hashes[current]);
let (sender_, mut recv) = tokio::sync::mpsc::channel(100);let mut path = changes_dir.clone();libpijul::changestore::filesystem::push_filename(&mut path, &c[0]);
let mut file = std::fs::File::create(&path)?;'outer: while let Some(msg) = self.c.wait().await {match msg {thrussh::ChannelMsg::Data { data } => {debug!("data = {:?}", &data[..]);let mut p = 0;while p < data.len() {if remaining_len == 0 {remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;p += 8;debug!("remaining_len = {:?}", remaining_len);}if data.len() >= p + remaining_len {file.write_all(&data[p..p + remaining_len])?;// We have enough data to write the// file, write it and move to the next// file.p += remaining_len;remaining_len = 0;file.flush()?;let mut final_path = path.clone();final_path.set_extension("change");debug!("moving {:?} to {:?}", path, final_path);std::fs::rename(&path, &final_path)?;debug!("sending");send.send(hashes[current].clone()).await.unwrap();debug!("sent");current += 1;if current < hashes.len() {// If we're still waiting for// another change.libpijul::changestore::filesystem::pop_filename(&mut path);libpijul::changestore::filesystem::push_filename(&mut path,&hashes[current],);std::fs::create_dir_all(&path.parent().unwrap())?;path.set_extension("");file = std::fs::File::create(&path)?;} else {// Else, just finish.break 'outer;}} else {// not enough data, we need more.file.write_all(&data[p..])?;remaining_len -= data.len() - p;break;}}}thrussh::ChannelMsg::ExitStatus { exit_status } => {debug!("exit: {:?}", exit_status);if exit_status != 0 {error!("Remote command returned {:?}", exit_status)}self.is_running = false;return Ok(());}msg => {debug!("{:?}", msg);}
let file = std::fs::File::create(&path)?;*self.state.lock().await = State::Changes {sender: Some(sender_),remaining_len: 0,path,file,hashes: c.to_vec(),current: 0,};self.run_protocol().await?;for c in c {debug!("download_change {:?} {:?}", c, full);if full {self.c.data(format!("change {}\n", c.to_base32()).as_bytes()).await?;} else {self.c.data(format!("partial {}\n", c.to_base32()).as_bytes()).await?;}}while let Some(_hash) = recv.recv().await {debug!("received hash {:?}", _hash);if sender.send(_hash).await.is_err() {break
while let Some(msg) = self.c.wait().await {match msg {thrussh::ChannelMsg::Data { data } => {debug!("data = {:?}", &data[..]);if from_dump.read(&data)? {break;}}thrussh::ChannelMsg::ExtendedData { data, ext } => {debug!("data = {:?}, ext = {:?}", &data[..], ext);}thrussh::ChannelMsg::ExitStatus { exit_status } => {if exit_status != 0 {error!("Remote command returned {:?}", exit_status)}self.is_running = false;break;}msg => {debug!("msg = {:?}", msg);}
while let Some(msg) = recv.recv().await {if from_dump.read(&msg)? {break;
debug!("start_change_download");libpijul::changestore::filesystem::push_filename(path, &c);if std::fs::metadata(&path).is_ok() && !full {debug!("metadata {:?} ok", path);libpijul::changestore::filesystem::pop_filename(path);return Ok(false);}std::fs::create_dir_all(&path.parent().unwrap())?;
debug!("download_changes");
RemoteRepo::Local(ref mut l) => l.start_change_download(c, path).await?,RemoteRepo::Ssh(ref mut s) => s.start_change_download(c, full).await?,RemoteRepo::Http(ref h) => {let mut f = std::fs::File::create(&path)?;let c32 = c.to_base32();let url = format!("{}/{}", h.url, DOT_DIR);let mut res = h.client.get(&url).query(&[("change", c32)]).send().await?;if !res.status().is_success() {return Err((crate::Error::Http {status: res.status(),}).into());}while let Some(chunk) = res.chunk().await? {f.write_all(&chunk)?;}}
RemoteRepo::Local(ref mut l) => l.download_changes(hashes, send, path).await?,RemoteRepo::Ssh(ref mut s) => s.download_changes(hashes, send, path, full).await?,RemoteRepo::Http(ref mut h) => h.download_changes(hashes, send, path, full).await?,
}pub async fn wait_downloads(&mut self,changes_dir: &Path,hashes: &[libpijul::pristine::Hash],send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,) -> Result<(), anyhow::Error> {if hashes.is_empty() {return Ok(());}if let RemoteRepo::Ssh(ref mut s) = *self {s.wait_downloads(changes_dir, hashes, send).await?} else {for h in hashes {send.send(*h).await?}}Ok(())
let mut self_ = std::mem::replace(self, RemoteRepo::None);let t = tokio::spawn(async move {let mut hashes = Vec::new();for h in to_download_.iter() {if self_.start_change_download(*h, &mut change_path_, false).await?{hashes.push(*h);}}debug!("hashes = {:?}", hashes);self_.wait_downloads(&change_path_, &hashes, &mut send).await?;Ok(self_)});
self.download_changes(&to_download_, &mut send, &mut change_path_, false).await?;
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);debug!("hard link {:?} {:?}", self.changes_dir, path);if std::fs::hard_link(&self.changes_dir, path).is_err() {std::fs::copy(&self.changes_dir, path)?;
for c in c {libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, c);libpijul::changestore::filesystem::push_filename(&mut path, c);if std::fs::metadata(&path).is_ok() {debug!("metadata {:?} ok", path);libpijul::changestore::filesystem::pop_filename(&mut path);continue}std::fs::create_dir_all(&path.parent().unwrap())?;if std::fs::hard_link(&self.changes_dir, &path).is_err() {std::fs::copy(&self.changes_dir, &path)?;}debug!("hard link done");libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);libpijul::changestore::filesystem::pop_filename(&mut path);debug!("sent");send.send(*c).await.unwrap();
version = "0.29.6"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "907849cfee4388f2d6bb1558f1d72ef80d70b5cb8d3583fb2c06391f9c9d71b4"
version = "0.29.8"