For some reason it didn't record this as part of the other change
M32SWK6ZR3ZXXQCCGFCE4DZQIDKUGHESHECI6R2Q5WQLRWV52NTAC JGN2BVS5RYX6EFOG7WJGIPTSWG3KE757J5BUGCC64QWCUSXZKZPQC MU5GSJAW65PEG3BRYUKZ7O37BPHW3MOX3S5E2RFOXKGUOJEEDQ5AC SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC C3L2TLQWREYOM3YHL37L7PS74YGLHBEDQRSCVMYIU6HKBEPNN2SAC DO2Y5TY5JQISUHCVNPI2FXO7WWZVJQ3LGPWF4DNADMGZRIO6PT2QC PYTC7DPVCWKYDXXBY44BBNB4DHZ3N4OQW3EOEQ7H6Z5P5XBG2EIAC Q45QHPO4HDTEZF2W4UDZSYYQ46BPEIWSW4GJILZR5HTJNLKXJABQC A3RM526Y7LUXNYW4TL56YKQ5GVOK2R5D7JJVTSQ6TT5MEXIR6YAAC IBPVOKM5MXTGB2P7LCD75MISAYUNDPEKQAUEVCXJJWLWCX2TJZBAC UDGL7ER2R6SY2CBSPA4O4ULQ5PCUDWQBGYOWTN3MQGSS5L2EI2LQC 2GQCLJZGIXMTKDVMYIIQJDOR5EXGBZS5FKH2S4DTN25WKKBUMQQQC YN63NUZO4LVJ7XPMURDULTXBVJKW5MVCTZ24R7Z52QMHO3HPDUVQC RM225IDQR36MNBMN7OT2R562M4FUD6L34RY7A3WDBBETZIIDKZ5AC X6YFD4WVMUYJCR5IYPJH6UKYVWSA7DKBRVJ6XQFXHOE2TRYUTAHAC 2D7P2VKJASU7QDQZHGCLBIT6G2V5WUFYLWTCEVVEI2EZHGM6XYRAC L2VH4BYK3IULLGBHXMZJWKRKDQY43QEMQRTXFJCNRDE7PODLXWTAC EUZFFJSOWV4PXDFFPDAFBHFUUMOFEU6ST7JH57YYRRR2SEOXLN6QC 76PCXGML77EZWTRI5E6KHLVRAFTJ2AB5YRN5EKOYNAPKTWY2KCGAC ZDK3GNDBWXJ2OXFDYB72ZCEBGLBF4MKE5K3PVHDZATHJ7HJIDPRQC JUYSZJSHULJFR4HUJF72TEKKFMBPG4ZOGAGOJ2BX6P3D4DRZAU5QC G7HJHNFDZCGOPGVETNYK7BDDPJXHEIPGZJEJXBGBXSWPWEX3BIQQC LKIKT4FRKVZAYITMKO23RRFZG25UFX7J6GBOOUQOD24YBRNLHLOAC TKEVOH7HXON7SOBGXTUDHAHO2U2GPTQRNESP6ERKUQAS526OZIRAC ABQDWHNGSBF2REQDCGXSBFAU4RUMXYAF2KHJ5O3D32M7Z3A3FEDAC I52XSRUH5RVHQBFWVMAQPTUSPAJ4KNVID2RMI3UGCVKFLYUO6WZAC CCLLB7OIFNFYJZTG3UCI7536TOCWSCSXR67VELSB466R24WLJSDAC I24UEJQLCH2SOXA4UHIYWTRDCHSOPU7AFTRUOTX7HZIAV4AZKYEQC C5XGFNKIX3RM6KOKRYTECBDDRDAE33JWIVJAJJTEFKFXQNXHEKHQC 27RZYCM3XP72CW7FYGE373YAFD6EEZOZ4YAHEKV6JE5L6Z5N3JNAC 5SLOJYHGPMZVCOE3IS7ICNMJJYX3RBT6CDG5MAV6T4CJIOW7YZ6QC 4XLHUME7YLJV6XUZBOW7PX62TCJXIWW2CPITXO5GZOULWGXRVDZAC XQHABMC2FOMH7SZIYVYAR5MNH2DK2AOUCX2RJKZM3PDG2H5JIXYQC QWIYNMI5SOTLRPYE4O3AG7R75JXM2TB3ZADU646PG6ACPBGSYUYAC 3E2KY6Y4SQ2UO6PLA4K3MQKHIRHSQCBAJIPJT7NQ6BIETMAQX5QAC BNPSVXIC72C3WT33YKCH766OBLLNCS7POX6U6JXZSQQPJF2M22MQC HXEIH4UQ6EX3MAY33JK4WQUE5GUSZ673OX57JKNFXC2N2QLTXKXAC 3WO4H2MMMQTYKCPBWYE67IRAEX7DFA2XAOMIKICA6BYDN23K6DQQC Ok(())}).await?;}}debug!("not applying {:?}", h)} else {}}if !touches_inodes {touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);}if !touches_inodes {continue;}to_apply_inodes.insert(h);if let Some(apply_bar) = &apply_bar {info!("Applying {:?}", h);apply_bar.inc(1);debug!("apply");if let CS::Change(ref h) = h {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;}debug!("applied");})})if !touches_inodes {if let CS::Change(ref h) = h {let changes = repo.changes.get_changes(h)?;touches_inodes |= changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);inodes.contains(&Position {change: inode.change.unwrap_or(*h),pos: inode.pos,})debug!("inodes = {:?}", inodes);}if let CS::Change(hash) = item {rev_deps.insert(hash, Vec::new());}}tag: caps.name("tag").is_some(),}fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>whereC: FnOnce(mpsc::Sender<Result<T, E>>) -> F,F: Future<Output = Result<(), E>>,{stream(|sender| {let fut = op(sender.clone());async move {if let Err(e) = fut.await {let _ = sender.send(Err(e));}}})}/// Compare the remote set (theirs_ge_dichotomy) with our current/// version of that (ours_ge_dichotomy) and return the changes in our/// current version that are not in the remote anymore.fn remote_unrecs<T: TxnTExt + ChannelTxnT>(txn: &T,current_channel: &ChannelRef<T>,ours_ge_dichotomy: &[(u64, CS)],theirs_ge_dichotomy_set: &HashSet<CS>,) -> Result<Vec<(u64, CS)>, anyhow::Error> {let mut remote_unrecs = Vec::new();for (n, hash) in ours_ge_dichotomy {debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);if theirs_ge_dichotomy_set.contains(hash) {// If this change is still present in the remote, skipdebug!("still present");continue;} else {let has_it = match hash {CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),CS::State(state) => {let ch = current_channel.read();if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {txn.is_tagged(txn.tags(&*ch), n.into())?} else {false}}};if has_it {remote_unrecs.push((*n, *hash))} else {// If this unrecord wasn't in our current channel, skipcontinue;}}}Ok(remote_unrecs)fn stream<C, F, T>(op: C) -> impl Stream<Item = T>whereC: FnOnce(mpsc::Sender<T>) -> F,F: Future<Output = ()>,{struct Impl<F, T> {fut: Once<F>,rx: mpsc::Receiver<T>,}impl<F, T> Stream for Impl<F, T>whereF: Future<Output = ()>,{type Item = T;fn poll_next(self: Pin<&mut Self>,cx: &mut std::task::Context<'_>,) -> Poll<Option<Self::Item>> {let (fut, mut rx) = unsafe {let this = self.get_unchecked_mut();(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))};match (fut.poll_next(cx), rx.poll_recv(cx)) {(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),_ => Poll::Pending,}}}let (tx, rx) = mpsc::channel(1);Impl {fut: stream::once(op(tx)),rx,}}}debug!("offending line: {:?}", data);bail!("Protocol error")}}));pos: ChangePosition(caps.name("num").unwrap().as_str().parse::<u64>().unwrap().into(),),if let Some(caps) = PATHS_LINE.captures(data) {return Ok(ListLine::Position(Position {change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),if data.starts_with("error:") {return Ok(ListLine::Error(data.split_at(6).1.to_string()));}});}}h,m,n: caps.name("num").unwrap().as_str().parse().unwrap(),if let Some(caps) = CHANGELIST_LINE.captures(data) {if let (Some(h), Some(m)) = (Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),) {return Ok(ListLine::Change {debug!("data = {:?}", data);}fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {Error(String),Position(Position<Hash>),},tag: bool,Change {n: u64,h: Hash,m: Merkle,static ref PATHS_LINE: Regex =Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();}enum ListLine {static ref CHANGELIST_LINE: Regex = Regex::new(r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#).unwrap();lazy_static! {}}Ok(())self.complete_changes(repo, txn, local_channel, &pullable, false).await?;self.pull(repo, txn, local_channel, &pullable, &inodes, true).await?;self.update_identities(repo, &remote_changes).await?;}}pullable.push(CS::Change(p.a.into()))for x in txn.iter_remote(&rem.remote, 0)? {let (_, p) = x?;let rem = remote_changes.lock();{let mut pullable = Vec::new();let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {x} else {bail!("Channel not found")};path: &[String],) -> Result<(), anyhow::Error> {&mut self,repo: &mut Repository,txn: &mut T,local_channel: &mut ChannelRef<T>,pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(Ok(())}Ok(())}).await?;}for w in waiting {w.await?;}self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {let mut waiting = Vec::new();for c in changes {let CS::Change(c) = c else { continue };let sc = c.into();if repo.changes.has_contents(*c, txn.get_internal(&sc)?.cloned()){debug!("has contents {:?}", c);continue;}if full {waiting.push(cx.download(CS::Change(*c))?);continue;}let Some(&change) = txn.get_internal(&sc)? else {debug!("could not find internal for {:?}", sc);continue;};// Check if at least one non-empty vertex from c is still alive.let v = libpijul::pristine::Vertex {change,start: ChangePosition(0u64.into()),end: ChangePosition(0u64.into()),};let channel = local_channel.read();let graph = txn.graph(&channel);for x in txn.iter_graph(graph, Some(&v))? {let (v, e) = x?;if v.change > change {break;} else if e.flag().is_alive_parent() {waiting.push(cx.download(CS::Change(*c))?);break;}}let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;debug!("complete changes {:?}", changes);full: bool,) -> Result<(), anyhow::Error> {changes: &[CS],local_channel: &ChannelRef<T>,txn: &T,repo: &pijul_repository::Repository,&mut self,pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(Ok(())}.await?;self.complete_changes(repo, txn, channel, &to_pull, false)self.update_identities(repo, &remote).await?;.await?;self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)}bail!("State not found: {:?}", state)found = true;break;}}if !found {if p.b == state {to_pull.push(CS::Change(p.a.into()));let (n, p) = x?;debug!("{:?} {:?}", n, p);for x in txn.iter_remote(&remote.lock().remote, 0)? {let mut to_pull = Vec::new();let mut found = false;let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();self.update_changelist(txn, &[]).await?;let id = if let Some(id) = self.get_id(txn).await? {id} else {return Ok(());};) -> Result<(), anyhow::Error> {&mut self,repo: &mut Repository,txn: &mut T,channel: &mut ChannelRef<T>,state: Merkle,pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(Ok(())}self.complete_changes(repo, txn, channel, &hashes, false).await?;}Ok(())}).await?;}while let Some(cs) = stream.try_next().await? {if let CS::Change(hash) = cs {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;}hashes.push(cs);let mut ws = ApplyWorkspace::new();self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));pin_mut!(stream);{let txn = &mut *txn;let hashes = &mut hashes;let mut hashes = Vec::new();let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;tag: &[Hash],) -> Result<(), anyhow::Error> {channel: &ChannelRef<T>,txn: &mut T,repo: &Repository,&mut self,pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(for item in items {let item = *item.borrow();tasks.push(make_download_job(item));};}// there is probably a way to model this using futures, but I// couldn't find a nice way to do it// so here, have three funny collectionslet mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();let mut fetched = HashSet::new();let make_download_job = |cs| async move {match cx.download(cs) {Ok(v) => (cs, v.await),Err(e) => (cs, Err(e)),fn download_changes_rec<'a, I>(cx: &'a DownloadContext,repo: &'a Repository,items: I,) -> impl Stream<Item = anyhow::Result<CS>> + 'awhereI: IntoIterator + 'a,I::Item: Borrow<CS>,{try_stream(move |sender| async move {let mut tasks = FuturesUnordered::new();Ok(result)debug!("finished");}}let mut result = Vec::with_capacity(to_apply_inodes.len());for h in to_apply {if to_apply_inodes.contains(&h) {result.push(*h)}let mut ws = ApplyWorkspace::new();while let Some(h) = stream.try_next().await? {debug!("to_apply: {:?}", h);let mut touches_inodes = inodes.is_empty();
let mut ws = ApplyWorkspace::new();while let Some(h) = stream.try_next().await? {debug!("to_apply: {:?}", h);let mut touches_inodes = inodes.is_empty();debug!("inodes = {:?}", inodes);if !touches_inodes {if let CS::Change(ref h) = h {let changes = repo.changes.get_changes(h)?;touches_inodes |= changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);inodes.contains(&Position {change: inode.change.unwrap_or(*h),pos: inode.pos,})})})}}if !touches_inodes {touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);}if !touches_inodes {continue;}to_apply_inodes.insert(h);if let Some(apply_bar) = &apply_bar {info!("Applying {:?}", h);apply_bar.inc(1);debug!("apply");if let CS::Change(ref h) = h {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;}debug!("applied");} else {debug!("not applying {:?}", h)}}Ok(())}).await?;}let mut result = Vec::with_capacity(to_apply_inodes.len());for h in to_apply {if to_apply_inodes.contains(&h) {result.push(*h)}}debug!("finished");Ok(result)}pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(&mut self,repo: &Repository,txn: &mut T,channel: &ChannelRef<T>,tag: &[Hash],) -> Result<(), anyhow::Error> {let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;let mut hashes = Vec::new();{let txn = &mut *txn;let hashes = &mut hashes;self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));pin_mut!(stream);let mut ws = ApplyWorkspace::new();while let Some(cs) = stream.try_next().await? {if let CS::Change(hash) = cs {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;}hashes.push(cs);}Ok(())}).await?;}self.complete_changes(repo, txn, channel, &hashes, false).await?;Ok(())}pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(&mut self,repo: &mut Repository,txn: &mut T,channel: &mut ChannelRef<T>,state: Merkle,) -> Result<(), anyhow::Error> {let id = if let Some(id) = self.get_id(txn).await? {id} else {return Ok(());};self.update_changelist(txn, &[]).await?;let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();let mut to_pull = Vec::new();let mut found = false;for x in txn.iter_remote(&remote.lock().remote, 0)? {let (n, p) = x?;debug!("{:?} {:?}", n, p);to_pull.push(CS::Change(p.a.into()));if p.b == state {found = true;break;}}if !found {bail!("State not found: {:?}", state)}self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true).await?;self.update_identities(repo, &remote).await?;self.complete_changes(repo, txn, channel, &to_pull, false).await?;Ok(())}pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(&mut self,repo: &pijul_repository::Repository,txn: &T,local_channel: &ChannelRef<T>,changes: &[CS],full: bool,) -> Result<(), anyhow::Error> {debug!("complete changes {:?}", changes);let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {let mut waiting = Vec::new();for c in changes {let CS::Change(c) = c else { continue };let sc = c.into();if repo.changes.has_contents(*c, txn.get_internal(&sc)?.cloned()){debug!("has contents {:?}", c);continue;}if full {waiting.push(cx.download(CS::Change(*c))?);continue;}let Some(&change) = txn.get_internal(&sc)? else {debug!("could not find internal for {:?}", sc);continue;};// Check if at least one non-empty vertex from c is still alive.let v = libpijul::pristine::Vertex {change,start: ChangePosition(0u64.into()),end: ChangePosition(0u64.into()),};let channel = local_channel.read();let graph = txn.graph(&channel);for x in txn.iter_graph(graph, Some(&v))? {let (v, e) = x?;if v.change > change {break;} else if e.flag().is_alive_parent() {waiting.push(cx.download(CS::Change(*c))?);break;}}}for w in waiting {w.await?;}Ok(())}).await?;Ok(())}pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(&mut self,repo: &mut Repository,txn: &mut T,local_channel: &mut ChannelRef<T>,path: &[String],) -> Result<(), anyhow::Error> {let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {x} else {bail!("Channel not found")};let mut pullable = Vec::new();{let rem = remote_changes.lock();for x in txn.iter_remote(&rem.remote, 0)? {let (_, p) = x?;pullable.push(CS::Change(p.a.into()))}}self.pull(repo, txn, local_channel, &pullable, &inodes, true).await?;self.update_identities(repo, &remote_changes).await?;self.complete_changes(repo, txn, local_channel, &pullable, false).await?;Ok(())}}lazy_static! {static ref CHANGELIST_LINE: Regex = Regex::new(r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#).unwrap();static ref PATHS_LINE: Regex =Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();}enum ListLine {Change {n: u64,h: Hash,m: Merkle,tag: bool,},Position(Position<Hash>),Error(String),}fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {debug!("data = {:?}", data);if let Some(caps) = CHANGELIST_LINE.captures(data) {if let (Some(h), Some(m)) = (Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),) {return Ok(ListLine::Change {n: caps.name("num").unwrap().as_str().parse().unwrap(),h,m,tag: caps.name("tag").is_some(),});}}if data.starts_with("error:") {return Ok(ListLine::Error(data.split_at(6).1.to_string()));}if let Some(caps) = PATHS_LINE.captures(data) {return Ok(ListLine::Position(Position {change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),pos: ChangePosition(caps.name("num").unwrap().as_str().parse::<u64>().unwrap().into(),),}));}debug!("offending line: {:?}", data);bail!("Protocol error")}/// Compare the remote set (theirs_ge_dichotomy) with our current/// version of that (ours_ge_dichotomy) and return the changes in our/// current version that are not in the remote anymore.fn remote_unrecs<T: TxnTExt + ChannelTxnT>(txn: &T,current_channel: &ChannelRef<T>,ours_ge_dichotomy: &[(u64, CS)],theirs_ge_dichotomy_set: &HashSet<CS>,) -> Result<Vec<(u64, CS)>, anyhow::Error> {let mut remote_unrecs = Vec::new();for (n, hash) in ours_ge_dichotomy {debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);if theirs_ge_dichotomy_set.contains(hash) {// If this change is still present in the remote, skipdebug!("still present");continue;} else {let has_it = match hash {CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),CS::State(state) => {let ch = current_channel.read();if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {txn.is_tagged(txn.tags(&*ch), n.into())?} else {false}}};if has_it {remote_unrecs.push((*n, *hash))} else {// If this unrecord wasn't in our current channel, skipcontinue;}}}Ok(remote_unrecs)}fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>whereC: FnOnce(mpsc::Sender<Result<T, E>>) -> F,F: Future<Output = Result<(), E>>,{stream(|sender| {let fut = op(sender.clone());async move {if let Err(e) = fut.await {let _ = sender.send(Err(e));}}})}fn stream<C, F, T>(op: C) -> impl Stream<Item = T>whereC: FnOnce(mpsc::Sender<T>) -> F,F: Future<Output = ()>,{struct Impl<F, T> {fut: Once<F>,rx: mpsc::Receiver<T>,}impl<F, T> Stream for Impl<F, T>whereF: Future<Output = ()>,{type Item = T;fn poll_next(self: Pin<&mut Self>,cx: &mut std::task::Context<'_>,) -> Poll<Option<Self::Item>> {let (fut, mut rx) = unsafe {let this = self.get_unchecked_mut();(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))};match (fut.poll_next(cx), rx.poll_recv(cx)) {(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),_ => Poll::Pending,}}}let (tx, rx) = mpsc::channel(1);Impl {fut: stream::once(op(tx)),rx,}}fn download_changes_rec<'a, I>(cx: &'a DownloadContext,repo: &'a Repository,items: I,) -> impl Stream<Item = anyhow::Result<CS>> + 'awhereI: IntoIterator + 'a,I::Item: Borrow<CS>,{try_stream(move |sender| async move {let mut tasks = FuturesUnordered::new();// there is probably a way to model this using futures, but I// couldn't find a nice way to do it// so here, have three funny collectionslet mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();let mut fetched = HashSet::new();let make_download_job = |cs| async move {match cx.download(cs) {Ok(v) => (cs, v.await),Err(e) => (cs, Err(e)),}};for item in items {let item = *item.borrow();tasks.push(make_download_job(item));if let CS::Change(hash) = item {rev_deps.insert(hash, Vec::new());}}