let t = tokio::spawn(async move {self_.download_changes(cloned_download_bar,&mut hash_recv,&mut send,&changes_dir,false,).await?;Ok::<_, anyhow::Error>(self_)});let mut waiting = 0;let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);let mut asked = HashSet::new();for h in to_apply {debug!("to_apply {:?}", h);asked.insert(*h);hash_send.send(*h)?;waiting += 1;}let u = self.download_changes_rec(repo,hash_send,recv,send_ready,download_bar,waiting,asked,).await?;let mut ws = libpijul::ApplyWorkspace::new();let mut to_apply_inodes = HashSet::new();while let Some(h) = recv_ready.recv().await {debug!("to_apply: {:?}", h);let touches_inodes = inodes.is_empty()|| {debug!("inodes = {:?}", inodes);use libpijul::changestore::ChangeStore;if let CS::Change(ref h) = h {let changes = repo.changes.get_changes(h)?;changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);inodes.contains(&Position {change: inode.change.unwrap_or(*h),pos: inode.pos,})})})} else {false}}|| { inodes.iter().any(|i| CS::Change(i.change) == h) };if touches_inodes {to_apply_inodes.insert(h);} else {continue;}if let Some(apply_bar) = apply_bar.clone() {info!("Applying {:?}", h);apply_bar.inc(1);debug!("apply");if let CS::Change(h) = h {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;}debug!("applied");} else {debug!("not applying {:?}", h)
self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {for h in to_apply {
debug!("finished");debug!("waiting for spawned process");*self = t.await??;u.await??;Ok(result)
// let mut waiting = 0;// let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);//// let mut asked = HashSet::new();// for h in to_apply {// debug!("to_apply {:?}", h);//// asked.insert(*h);// hash_send.send(*h)?;// waiting += 1;// }//// let u = self// .download_changes_rec(// repo,// hash_send,// recv,// send_ready,// download_bar,// waiting,// asked,// )// .await?;//// let mut ws = libpijul::ApplyWorkspace::new();// let mut to_apply_inodes = HashSet::new();// while let Some(h) = recv_ready.recv().await {// debug!("to_apply: {:?}", h);// let touches_inodes = inodes.is_empty()// || {// debug!("inodes = {:?}", inodes);// use libpijul::changestore::ChangeStore;// if let CS::Change(ref h) = h {// let changes = repo.changes.get_changes(h)?;// changes.iter().any(|c| {// c.iter().any(|c| {// let inode = c.inode();// debug!("inode = {:?}", inode);// inodes.contains(&Position {// change: inode.change.unwrap_or(*h),// pos: inode.pos,// })// })// })// } else {// false// }// }// || { inodes.iter().any(|i| CS::Change(i.change) == h) };//// if touches_inodes {// to_apply_inodes.insert(h);// } else {// continue;// }//// if let Some(apply_bar) = apply_bar.clone() {// info!("Applying {:?}", h);// apply_bar.inc(1);// debug!("apply");// if let CS::Change(h) = h {// let mut channel = channel.write();// txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;// }// debug!("applied");// } else {// debug!("not applying {:?}", h)// }// }//// let mut result = Vec::with_capacity(to_apply_inodes.len());// for h in to_apply {// if to_apply_inodes.contains(&h) {// result.push(*h)// }// }//// debug!("finished");// debug!("waiting for spawned process");// *self = t.await??;// u.await??;// Ok(result)
mut waiting: usize,mut asked: HashSet<CS>,) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {let changes_dir = repo.changes_dir.clone();let changes = repo.changes.clone();let t = tokio::spawn(async move {let mut buf = PathBuf::new();
items: impl IntoIterator<Item = CS> + 'a,) -> impl Stream<Item=anyhow::Result<CS>> + 'a {try_stream(|sender| async move {struct State<'a> {cx: DownloadContext<'a>,repo: &'a Repository,sender: mpsc::Sender<anyhow::Result<CS>>,futs: FuturesUnordered<LocalBoxFuture<'a, anyhow::Result<()>>>,barriers: RefCell<HashMap<CS, watch::Receiver<()>>>,}let state = State {cx,repo,sender,futs: Default::default(),barriers: RefCell::new(Default::default()),};fn go<'a>(state: &'a State<'a>, cs: CS) -> watch::Receiver<()> {state.barriers.borrow_mut().entry(cs).or_insert_with(|| {let (tx, mut rx) = watch::channel(());rx.mark_unchanged();
let mut ready = Vec::new();while let Some((hash, follow)) = recv_signal.recv().await {debug!("received {:?} {:?}", hash, follow);if let CS::Change(hash) = hash {waiting -= 1;if follow {use libpijul::changestore::ChangeStore;let mut needs_dep = false;for dep in changes.get_dependencies(&hash)? {let dep: libpijul::pristine::Hash = dep;
async fn go2<'a>(state: &'a State<'a>, cs: CS) -> anyhow::Result<()> {let follow = state.cx.download(cs).await?;
if !has_dep {needs_dep = true;if asked.insert(CS::Change(dep)) {progress_bar.inc(1);send_hash.send(CS::Change(dep))?;waiting += 1}}}
if !follow {return Ok(());}wait_for(state, state.repo.changes.get_dependencies(&hash)?.into_iter().map(CS::Change)).await;Ok(())}async fn wait_for<'a>(state: &'a State<'a>, items: impl IntoIterator<Item=CS>) {let mut v = Vec::new();
info!("waiting loop done");for r in ready {send_ready.send(r).await?;
state.futs.push(async move {wait_for(&state, items).await;Ok(())}.boxed_local());while let Some(v) = state.futs.next().await {v?;
});Ok(t)
})// let t = tokio::spawn(async move {// let mut buf = PathBuf::new();//// if waiting == 0 {// return Ok(());// }// let mut ready = Vec::new();// while let Some((hash, follow)) = recv_signal.recv().await {// if let CS::Change(hash) = hash {// waiting -= 1;// if follow {// use libpijul::changestore::ChangeStore;// let mut needs_dep = false;// for dep in changes.get_dependencies(&hash)? {// let dep: libpijul::pristine::Hash = dep;//// let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);// let has_dep = std::fs::metadata(&dep_path).is_ok();//// if !has_dep {// needs_dep = true;// if asked.insert(CS::Change(dep)) {// progress_bar.inc(1);// send_hash.send(CS::Change(dep))?;// waiting += 1// }// }// }//// if !needs_dep {// send_ready.send(CS::Change(hash)).await?;// } else {// ready.push(CS::Change(hash))// }// } else {// send_ready.send(CS::Change(hash)).await?;// }// }// if waiting == 0 {// break;// }// }// info!("waiting loop done");// for r in ready {// send_ready.send(r).await?;// }// std::mem::drop(recv_signal);// Ok(())// });// Ok(t)
fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>whereC: FnOnce(mpsc::Sender<Result<T, E>>) -> F,F: Future<Output = Result<(), E>>,{stream(|sender| {let fut = op(sender.clone());async move {if let Err(e) = fut.await {let _ = sender.send(Err(e));}}})}fn stream<C, F, T>(op: C) -> impl Stream<Item = T>whereC: FnOnce(mpsc::Sender<T>) -> F,F: Future<Output = ()>{struct Impl<F, T> {fut: Once<F>,rx: mpsc::Receiver<T>,}impl<F, T> Stream for Impl<F, T>whereF: Future<Output=()>,{type Item = T;fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {let (fut, mut rx) = unsafe {let this = self.get_unchecked_mut();(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))};match (fut.poll_next(cx), rx.poll_recv(cx)) {(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),_ => Poll::Pending,}}}let (tx, rx) = mpsc::channel(1);Impl {fut: stream::once(op(tx)),rx,}}
pub struct FetchChangesState<'a> {shared: Mutex<Shared<'a>>,}struct Shared<'a> {task: BoxFuture<'a, Result<bool, anyhow::Error>>,changes: BTreeMap<CS, ChangeState>,}struct ChangeState {wakers: Vec<Waker>,}impl<'a> FetchChangesState<'a> {pub fn new<T>(remote: &'a mut RemoteRepo,
impl RemoteRepo {pub async fn download_changes_with<C, F, R>(&mut self,
let notifiers: RefCell<Option<HashMap<CS, watch::Sender<Option<bool>>>>> =RefCell::new(Some(HashMap::new()));let dc = DownloadContext {data: Default::default(),notifiers: ¬ifiers,in_tx,};let download_task = self.download_changes(progress_bar, &mut in_rx, &mut out_tx, dest_dir, full).fuse();let control_task = op(dc).fuse();
let task = async move {remote.download_changes(progress_bar,&mut in_rx,&mut out_tx,dest_dir.as_ref(),full,).await
pin_mut!(download_task);pin_mut!(control_task);let mut out_rx = Some(out_rx);let mut dl_err = None;let mut control_res = None;loop {futures::select_biased! {next = out_rx.as_mut().map(|v| v.recv().fuse()).unwrap_or(Fuse::terminated()) => {let Some((cs, value)) = next else {out_rx.take();continue;};if let Some(ref notifiers) = *notifiers.borrow() {if let Some(tx) = notifiers.get(&cs) {let _ = tx.send(Some(value));}};}res = download_task => {notifiers.borrow_mut().take();dl_err = res.err();}res = control_task => {control_res = Some(res);}complete => break,}
shared.task.poll_unpin(cx);todo!()
match *self.notifiers.borrow_mut() {None => {bail!("map is gone");}Some(ref mut v) => {assert!(v.insert(cs, tx).is_none());}}self.in_tx.send(cs)?;v.insert(ChangeData { rx }).rx.clone()}}};rx.changed().await?;let res = rx.borrow_and_update().unwrap();Ok(res)