UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQC
state: Arc<Mutex<State>>,
}
enum State {
None,
State {
sender: Option<tokio::sync::oneshot::Sender<Option<(u64, Merkle)>>>,
},
Changes {
sender: Option<tokio::sync::mpsc::Sender<Hash>>,
remaining_len: usize,
file: std::fs::File,
path: PathBuf,
hashes: Vec<libpijul::pristine::Hash>,
current: usize,
},
Changelist {
sender: tokio::sync::mpsc::Sender<Option<(u64, Hash, Merkle)>>,
},
Channel {
sender: tokio::sync::mpsc::Sender<Vec<u8>>,
},
Archive {
sender: Option<tokio::sync::oneshot::Sender<u64>>,
len: u64,
conflicts: u64,
len_n: u64,
w: Box<dyn Write + Send>,
},
type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
type FutureUnit = Pin<
Box<dyn futures::future::Future<Output = Result<(Self, Session), anyhow::Error>> + Send>,
>;
// type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
}
fn data(
self,
channel: thrussh::ChannelId,
data: &[u8],
session: thrussh::client::Session,
) -> Self::FutureUnit {
debug!("data {:?} {:?}", channel, data.len());
let data = data.to_vec();
Box::pin(async move {
match *self.state.lock().await {
State::State { ref mut sender } => {
debug!("state: State");
if let Some(sender) = sender.take() {
// If we can't parse `data` (for example if the
// remote returns the standard "-\n"), this
// returns None.
let mut s = std::str::from_utf8(&data).unwrap().split(' ');
debug!("s = {:?}", s);
if let (Some(n), Some(m)) = (s.next(), s.next()) {
let n = n.parse().unwrap();
sender
.send(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap())))
.unwrap_or(());
} else {
sender.send(None).unwrap_or(());
}
}
}
State::Changes {
ref mut sender,
ref mut remaining_len,
ref mut file,
ref mut path,
ref hashes,
ref mut current,
} => {
debug!("state changes");
let mut p = 0;
while p < data.len() {
if *remaining_len == 0 {
*remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;
p += 8;
debug!("remaining_len = {:?}", remaining_len);
}
if data.len() >= p + *remaining_len {
file.write_all(&data[p..p + *remaining_len])?;
// We have enough data to write the
// file, write it and move to the next
// file.
p += *remaining_len;
*remaining_len = 0;
file.flush()?;
let mut final_path = path.clone();
final_path.set_extension("change");
debug!("moving {:?} to {:?}", path, final_path);
std::fs::rename(&path, &final_path)?;
debug!("sending");
if let Some(ref mut sender) = sender {
sender.send(hashes[*current].clone()).await.unwrap();
}
debug!("sent");
*current += 1;
if *current < hashes.len() {
// If we're still waiting for
// another change.
libpijul::changestore::filesystem::pop_filename(path);
libpijul::changestore::filesystem::push_filename(
path,
&hashes[*current],
);
std::fs::create_dir_all(&path.parent().unwrap())?;
path.set_extension("");
debug!("creating file {:?}", path);
*file = std::fs::File::create(&path)?;
} else {
// Else, just finish.
debug!("dropping channel");
*sender = None;
break;
}
} else {
// not enough data, we need more.
file.write_all(&data[p..])?;
file.flush()?;
*remaining_len -= data.len() - p;
debug!("need more data");
break;
}
}
debug!("finished, {:?} {:?}", p, data.len());
}
State::Changelist { ref mut sender } => {
debug!("state changelist");
if &data[..] == b"\n" {
debug!("log done");
sender.send(None).await.unwrap_or(())
} else if let Ok(data) = std::str::from_utf8(&data) {
for l in data.lines() {
if !l.is_empty() {
debug!("line = {:?}", l);
let (n, h, m) = parse_line(l)?;
sender.send(Some((n, h, m))).await.unwrap_or(());
} else {
sender.send(None).await.unwrap_or(());
}
}
}
}
State::Channel { ref mut sender } => sender.send(data).await?,
State::Archive { ref mut sender, ref mut w, ref mut len, ref mut len_n, ref mut conflicts } => {
let mut off = 0;
while *len_n < 16 && off < data.len() {
if *len_n < 8 {
*len = (*len << 8) | (data[off] as u64);
} else {
*conflicts = (*conflicts << 8) | (data[off] as u64);
}
*len_n += 1;
off += 1;
}
if *len_n >= 16 {
w.write_all(&data[off..])?;
*len -= (data.len() - off) as u64;
if *len == 0 {
if let Some(sender) = sender.take() {
sender.send(*conflicts).unwrap_or(())
}
}
}
}
State::None => {
debug!("None state");
}
}
Ok((self, session))
})
while let Some(msg) = self.c.wait().await {
match msg {
thrussh::ChannelMsg::Data { data } => {
// If we can't parse `data` (for example if the
// remote returns the standard "-\n"), this
// returns None.
let mut s = std::str::from_utf8(&data)?.split(' ');
debug!("s = {:?}", s);
if let (Some(n), Some(m)) = (s.next(), s.next()) {
let n = n.parse().unwrap();
return Ok(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap())));
} else {
break;
}
}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
if ext == 1 {
debug!("{:?}", std::str::from_utf8(&data))
}
}
thrussh::ChannelMsg::Eof => {}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
return Err((Error::RemoteExit {
status: exit_status,
})
.into());
}
}
msg => panic!("wrong message {:?}", msg),
}
}
Ok(None)
Ok(receiver.await?)
}
let mut len = 0;
let mut conflicts = 0;
let mut len_n = 0;
while let Some(msg) = self.c.wait().await {
match msg {
thrussh::ChannelMsg::Data { data } => {
let mut off = 0;
while len_n < 16 && off < data.len() {
if len_n < 8 {
len = (len << 8) | (data[off] as u64);
} else {
conflicts = (conflicts << 8) | (data[off] as u64);
}
len_n += 1;
off += 1;
}
if len_n >= 16 {
w.write_all(&data[off..])?;
len -= (data.len() - off) as u64;
if len == 0 {
break;
}
}
}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
if ext == 1 {
debug!("{:?}", std::str::from_utf8(&data))
}
}
thrussh::ChannelMsg::Eof => {}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
return Err((Error::RemoteExit {
status: exit_status,
})
.into());
}
}
msg => panic!("wrong message {:?}", msg),
}
'msg: while let Some(msg) = self.c.wait().await {
debug!("msg = {:?}", msg);
match msg {
thrussh::ChannelMsg::Data { data } => {
if &data[..] == b"\n" {
debug!("log done");
break;
} else if let Ok(data) = std::str::from_utf8(&data) {
for l in data.lines() {
if !l.is_empty() {
debug!("line = {:?}", l);
let (n, h, m) = parse_line(l)?;
txn.put_remote(remote, n, (h, m))?;
} else {
break 'msg;
}
}
}
}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
debug!("{:?} {:?}", ext, std::str::from_utf8(&data[..]));
/*return Err((crate::Error::Remote {
msg: std::str::from_utf8(&data[..]).unwrap().to_string()
}).into())*/
}
thrussh::ChannelMsg::WindowAdjusted { .. } => {}
thrussh::ChannelMsg::Eof => {}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
return Err((Error::RemoteExit {
status: exit_status,
})
.into());
}
}
msg => panic!("wrong message {:?}", msg),
}
while let Some(Some((n, h, m))) = receiver.recv().await {
txn.put_remote(remote, n, (h, m))?;
self.run_protocol().await?;
debug!("download_change {:?}", full);
if full {
self.c
.data(format!("change {}\n", c.to_base32()).as_bytes())
.await?;
} else {
self.c
.data(format!("partial {}\n", c.to_base32()).as_bytes())
.await?;
}
Ok(())
}
pub async fn wait_downloads(
&mut self,
changes_dir: &Path,
hashes: &[libpijul::pristine::Hash],
send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
) -> Result<(), anyhow::Error> {
debug!("wait_downloads");
if !self.is_running {
return Ok(());
}
let mut remaining_len = 0;
let mut current: usize = 0;
let mut path = changes_dir.to_path_buf();
libpijul::changestore::filesystem::push_filename(&mut path, &hashes[current]);
let (sender_, mut recv) = tokio::sync::mpsc::channel(100);
let mut path = changes_dir.clone();
libpijul::changestore::filesystem::push_filename(&mut path, &c[0]);
let mut file = std::fs::File::create(&path)?;
'outer: while let Some(msg) = self.c.wait().await {
match msg {
thrussh::ChannelMsg::Data { data } => {
debug!("data = {:?}", &data[..]);
let mut p = 0;
while p < data.len() {
if remaining_len == 0 {
remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;
p += 8;
debug!("remaining_len = {:?}", remaining_len);
}
if data.len() >= p + remaining_len {
file.write_all(&data[p..p + remaining_len])?;
// We have enough data to write the
// file, write it and move to the next
// file.
p += remaining_len;
remaining_len = 0;
file.flush()?;
let mut final_path = path.clone();
final_path.set_extension("change");
debug!("moving {:?} to {:?}", path, final_path);
std::fs::rename(&path, &final_path)?;
debug!("sending");
send.send(hashes[current].clone()).await.unwrap();
debug!("sent");
current += 1;
if current < hashes.len() {
// If we're still waiting for
// another change.
libpijul::changestore::filesystem::pop_filename(&mut path);
libpijul::changestore::filesystem::push_filename(
&mut path,
&hashes[current],
);
std::fs::create_dir_all(&path.parent().unwrap())?;
path.set_extension("");
file = std::fs::File::create(&path)?;
} else {
// Else, just finish.
break 'outer;
}
} else {
// not enough data, we need more.
file.write_all(&data[p..])?;
remaining_len -= data.len() - p;
break;
}
}
}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
debug!("exit: {:?}", exit_status);
if exit_status != 0 {
error!("Remote command returned {:?}", exit_status)
}
self.is_running = false;
return Ok(());
}
msg => {
debug!("{:?}", msg);
}
let file = std::fs::File::create(&path)?;
*self.state.lock().await = State::Changes {
sender: Some(sender_),
remaining_len: 0,
path,
file,
hashes: c.to_vec(),
current: 0,
};
self.run_protocol().await?;
for c in c {
debug!("download_change {:?} {:?}", c, full);
if full {
self.c
.data(format!("change {}\n", c.to_base32()).as_bytes())
.await?;
} else {
self.c
.data(format!("partial {}\n", c.to_base32()).as_bytes())
.await?;
}
}
while let Some(_hash) = recv.recv().await {
debug!("received hash {:?}", _hash);
if sender.send(_hash).await.is_err() {
break
while let Some(msg) = self.c.wait().await {
match msg {
thrussh::ChannelMsg::Data { data } => {
debug!("data = {:?}", &data[..]);
if from_dump.read(&data)? {
break;
}
}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
debug!("data = {:?}, ext = {:?}", &data[..], ext);
}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
error!("Remote command returned {:?}", exit_status)
}
self.is_running = false;
break;
}
msg => {
debug!("msg = {:?}", msg);
}
while let Some(msg) = recv.recv().await {
if from_dump.read(&msg)? {
break;
debug!("start_change_download");
libpijul::changestore::filesystem::push_filename(path, &c);
if std::fs::metadata(&path).is_ok() && !full {
debug!("metadata {:?} ok", path);
libpijul::changestore::filesystem::pop_filename(path);
return Ok(false);
}
std::fs::create_dir_all(&path.parent().unwrap())?;
debug!("download_changes");
RemoteRepo::Local(ref mut l) => l.start_change_download(c, path).await?,
RemoteRepo::Ssh(ref mut s) => s.start_change_download(c, full).await?,
RemoteRepo::Http(ref h) => {
let mut f = std::fs::File::create(&path)?;
let c32 = c.to_base32();
let url = format!("{}/{}", h.url, DOT_DIR);
let mut res = h.client.get(&url).query(&[("change", c32)]).send().await?;
if !res.status().is_success() {
return Err((crate::Error::Http {
status: res.status(),
})
.into());
}
while let Some(chunk) = res.chunk().await? {
f.write_all(&chunk)?;
}
}
RemoteRepo::Local(ref mut l) => l.download_changes(hashes, send, path).await?,
RemoteRepo::Ssh(ref mut s) => s.download_changes(hashes, send, path, full).await?,
RemoteRepo::Http(ref mut h) => h.download_changes(hashes, send, path, full).await?,
}
pub async fn wait_downloads(
&mut self,
changes_dir: &Path,
hashes: &[libpijul::pristine::Hash],
send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
) -> Result<(), anyhow::Error> {
if hashes.is_empty() {
return Ok(());
}
if let RemoteRepo::Ssh(ref mut s) = *self {
s.wait_downloads(changes_dir, hashes, send).await?
} else {
for h in hashes {
send.send(*h).await?
}
}
Ok(())
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let t = tokio::spawn(async move {
let mut hashes = Vec::new();
for h in to_download_.iter() {
if self_
.start_change_download(*h, &mut change_path_, false)
.await?
{
hashes.push(*h);
}
}
debug!("hashes = {:?}", hashes);
self_
.wait_downloads(&change_path_, &hashes, &mut send)
.await?;
Ok(self_)
});
self
.download_changes(&to_download_, &mut send, &mut change_path_, false)
.await?;
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
debug!("hard link {:?} {:?}", self.changes_dir, path);
if std::fs::hard_link(&self.changes_dir, path).is_err() {
std::fs::copy(&self.changes_dir, path)?;
for c in c {
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, c);
libpijul::changestore::filesystem::push_filename(&mut path, c);
if std::fs::metadata(&path).is_ok() {
debug!("metadata {:?} ok", path);
libpijul::changestore::filesystem::pop_filename(&mut path);
continue
}
std::fs::create_dir_all(&path.parent().unwrap())?;
if std::fs::hard_link(&self.changes_dir, &path).is_err() {
std::fs::copy(&self.changes_dir, &path)?;
}
debug!("hard link done");
libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
libpijul::changestore::filesystem::pop_filename(&mut path);
debug!("sent");
send.send(*c).await.unwrap();
version = "0.29.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907849cfee4388f2d6bb1558f1d72ef80d70b5cb8d3583fb2c06391f9c9d71b4"
version = "0.29.8"