DO2Y5TY5JQISUHCVNPI2FXO7WWZVJQ3LGPWF4DNADMGZRIO6PT2QC 2GQCLJZGIXMTKDVMYIIQJDOR5EXGBZS5FKH2S4DTN25WKKBUMQQQC FGVTXN52IVYIDIBPFOOUE7QQAZMPJQRADGKXW3DENIC6EVS4AYFAC 3OW3YNZXF6DR2EI5XS7PDHUUZMARDU2JUJFJKIEWXQNSMCIBVAOQC YD7QFAD7XC35U5N4DOS3B4MBJL4KWSHVEZ6OLO5RYXOUPFQNPQPAC ILZ44DEYAPNWZRHHEML2GPNSMGP4QXXI4GCL4T24R7A4LKGRV23AC JZADJIA3P3EOKPBGEKEXJVGWHNF2SIHYNNMB3XFNPBU4BTVGM3YQC TA5VXGFGDBLENEH4SRHD7FOUXPXRKIHN7DIBUREW3DZHMDVJNNCAC 5MRZLKBHOFFUAJELWL34XILG2XVWPUEHOFPDXBK5ASCO6H26MBFAC CQ3FIUY4HDRAI3EQWJ7D5VHLC4EBO2LHWVRHX56XA6LRGEKP63EAC L2VH4BYK3IULLGBHXMZJWKRKDQY43QEMQRTXFJCNRDE7PODLXWTAC L4JXJHWXYNCL4QGJXNKKTOKKTAXKKXBJUUY7HFZGEUZ5A2V5H34QC SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQC 3WO4H2MMMQTYKCPBWYE67IRAEX7DFA2XAOMIKICA6BYDN23K6DQQC BNPSVXIC72C3WT33YKCH766OBLLNCS7POX6U6JXZSQQPJF2M22MQC MU5GSJAW65PEG3BRYUKZ7O37BPHW3MOX3S5E2RFOXKGUOJEEDQ5AC RM225IDQR36MNBMN7OT2R562M4FUD6L34RY7A3WDBBETZIIDKZ5AC XWETQ4DE4KL2GQWSEBE5NENLTSLIDF7RIWOCCVWJFQOVQJE5P33AC Q7CAYX5N2GFOGMZL3VXVWORMAPWEOECXE22BLXK7Q4WEPS4CE2SAC KTTKF3RWYAK2YSH2DYYW5QVG4KSNGWUBJBFHKE24OJ7LFCBF5FEAC 367UBQ6KNAKUEWG32R4QRJ6H7IE7NAZFOPTC3ZOE4Z6E44RV3ISQC ISCWVXO6UN37V2QMR75ZWS7H5E7WYZCGR6TX3EDFOFRVCAPMIVUAC 2D7P2VKJASU7QDQZHGCLBIT6G2V5WUFYLWTCEVVEI2EZHGM6XYRAC TKEVOH7HXON7SOBGXTUDHAHO2U2GPTQRNESP6ERKUQAS526OZIRAC WLUID7NANDWTN5GOECNEKFTLZF3MUVS7K26YWLYLSGJ56G63NV4QC IVLLXQ5ZWZDKHO4TNQG3TPXN34H6Y2WXPAGSO4PWCYNSKUZWOEJQC K4CVMIUKNWBZ676IKSR5MYKTCDPPCRGWVAGYU772CE2B3AGAP4KQC 7HOBLRD43W2R5OVOYZVDO5LYPBZS7OSLDS7FH4NVNMBL3AZGMLSQC 2V33SO6IVPKBRPIJ3U3WYD6QKGN36BJVPP3M2YUDN7GTX3YH5UCAC 3ZAS64J675LPNHM2X32RH45B4A2LGK7NAIFDGQBQLDDSZVWEBTIQC A3RM526Y7LUXNYW4TL56YKQ5GVOK2R5D7JJVTSQ6TT5MEXIR6YAAC EUZFFJSOWV4PXDFFPDAFBHFUUMOFEU6ST7JH57YYRRR2SEOXLN6QC I24UEJQLCH2SOXA4UHIYWTRDCHSOPU7AFTRUOTX7HZIAV4AZKYEQC YN63NUZO4LVJ7XPMURDULTXBVJKW5MVCTZ24R7Z52QMHO3HPDUVQC 27RZYCM3XP72CW7FYGE373YAFD6EEZOZ4YAHEKV6JE5L6Z5N3JNAC 76PCXGML77EZWTRI5E6KHLVRAFTJ2AB5YRN5EKOYNAPKTWY2KCGAC X6YFD4WVMUYJCR5IYPJH6UKYVWSA7DKBRVJ6XQFXHOE2TRYUTAHAC 5SLOJYHGPMZVCOE3IS7ICNMJJYX3RBT6CDG5MAV6T4CJIOW7YZ6QC CCLLB7OIFNFYJZTG3UCI7536TOCWSCSXR67VELSB466R24WLJSDAC GNMZNKB46GTPTWBR452FITHPBCMYPSDLV5VZQSY7BX6OJHWTWTZAC I52XSRUH5RVHQBFWVMAQPTUSPAJ4KNVID2RMI3UGCVKFLYUO6WZAC OIOMXESDNMLOTMNYCZZBYSBAQTYPAXXMUHTLA2AYCMNHZMPSLX2AC AI73GKAO5QBPR6YGW7H5UNZYAEGYGIHAFO6DM2DWCPMVYLHE547QC FBXYP7QM7SG6P2JDJVQPPCRKJE3GVYXNQ5GVV4GRDUNG6Q4ZRDJQC IQ4FCHPZYGTZHCQHUIRCMUI5LCHIDSJCM2AZXGRJARWLCPPLXZOQC VBMXB443FGZL6DLT6KAP2ICFCCQNXCUMDEUL67HB4CNKFMBBNSSAC GYXIF25T2BCTCWCQI5DQOF3F4HBWDJUDJUMZ2WLHVBIOYATJTFAQC KI2AFWOSN3PTGBGYQ7UKHFOZERZWEUWQ4AQNADG5S4QDJ53ESXFAC SLJ3OHD4F6GJGZ3SV2D7DMR3PXYHPSI64X77KZ3RJ24EGEX6ZNQAC AAXP2534BWX2ZUDZZHUMLYDBMGFGUH32CNRA3KOLER3JKOIJUZLAC QL6K2ZM35B3NIXEMMCJWUSFXOBQHAGXRDMO7ID5DCKTJH4QJVY7QC L3RCAPPKPURGFWF4TKDVIJRRMPJDMQAZ6T5CITGMS7KP5ROZ7IWAC 5OGOE4VWS5AIG4U2UYLLIGA3HY6UB7SNQOSESHNXBLET3VQXFBZAC C267PHOH3QJBSBEWQB3J7PPOOXIUKM3DIIZIPLHPU4D5OXRCGLZAC AEPEFS7O3YT7CRRFYQVJWUXUUSRGJ6K6XZQVK62B6N74UXOIFWYAC 5HF7C67M4DZMYTCIG32XEQQ662AHQMIHTHUK7TAVSO52XLMFBZPAC 4H2XTVJ2BNXDNHQ3RQTMOG3I4NRGZT7JDLC2GRINS56TIYTYTO4QC BAUL3WR2ACY2HCJIM7K6HJOJ3UXDJISGLMDCSPH3WMPGJPL5AR4QC 44BN7FWSIXKG75IJUTCXLJE7VANNQFPRHQXTPLQHFU7AKGLSPQRAC XSRTXUAS3DXJA42TZESMETFVTKU2OBUDGDE4N5F2CVWI4CLOUJ4AC 3KRGVQFUWFHPOGZOXVTJYNCM4XBRVYITAEOVPKBSAZ5GZIUO5KVQC Y6EVFMTA6FOH3OQH6QCSWMI3F6SYZT2FSHO6GF4M3ICENDCWFM4QC M5FK3ABTKBDG6HHW32G7UKRJEJQKD2U7BPXNZ3HVHBKULWVV6CTQC L5IUD2DSLEK4SYPF6PLNO7C3TZEFYFHNM42HGEHY5VWW5MHD7CXAC WZVCLZKY34KQBQU6YBGJLQCDADBQ67LQVDNRVCMQVY3O3C3EIWSQC 7ZROQSSN2M3LW6ASYMM6DPR5AERWV4K4TKWKEBKTCEJPMIJAHHXQC ZRUPLBBTT4S6S7A3LOAHG4ONYEGPA5CFO4L2XBCNFKK45MWX3BDAC FDEVV5NGUMTEULP25EFYFZEVICWYLGV7XMED25PNKD36DL4NA46AC N3X5YP7PV2XVQKRRWSRCGJG34HZPLV4BGBLZGJG55KGIB7ORJ77QC FXEDPLRI7PXLDXV634ZA6D5Q3ZWG3ESTKJTMRPJ4MAHI7PKU3M6AC IIV3EL2XYI2X7HZWKXEXQFAE3R3KC2Q7SGOT3Q332HSENMYVF32QC 5BRU2RRWOQBMS2V3RQM7PRFR5UILYZ73GISHAKJA6KIZGC5M2MFAC ZDK3GNDBWXJ2OXFDYB72ZCEBGLBF4MKE5K3PVHDZATHJ7HJIDPRQC RIAA2QKFQM2BIDU3KIUXI5VYIUKTIHD2BURROGY6UXNJ7KRPY4IQC VO5OQW4W2656DIYYRNZ3PO7TQ4JOKQ3GVWE5ALUTYVMX3WMXJOYQC libpijul::changestore::filesystem::push_filename(final_path,&hashes[*current],);final_path.set_extension("change");debug!("moving {:?} to {:?}", path, final_path);std::fs::create_dir_all(&final_path.parent().unwrap())?;let r = std::fs::rename(&path, &final_path);libpijul::changestore::filesystem::pop_filename(final_path);r?;
match hashes[*current] {CS::Change(ref h) => {libpijul::changestore::filesystem::push_filename(final_path, h);debug!("moving {:?} to {:?}", path, final_path);std::fs::create_dir_all(&final_path.parent().unwrap())?;let r = std::fs::rename(&path, &final_path);libpijul::changestore::filesystem::pop_filename(final_path);r?;}CS::State(h) => {libpijul::changestore::filesystem::push_tag_filename(final_path, &h,);debug!("moving {:?} to {:?}", path, final_path);std::fs::create_dir_all(&final_path.parent().unwrap())?;let r = std::fs::rename(&path, &final_path);libpijul::changestore::filesystem::pop_filename(final_path);r?;}}
libpijul::changestore::filesystem::push_filename(&mut local, &c);let mut change_file = std::fs::File::open(&local)?;let change_len = change_file.metadata()?.len();let mut change = thrussh::CryptoVec::new_zeroed(change_len as usize);use std::io::Read;change_file.read_exact(&mut change[..])?;
self.c.data(format!("apply {} {} {}\n", to_channel, c.to_base32(), change_len).as_bytes()).await?;self.c.data(&change[..]).await?;libpijul::changestore::filesystem::pop_filename(&mut local);
match c {CS::Change(c) => {libpijul::changestore::filesystem::push_filename(&mut local, &c);let mut change_file = std::fs::File::open(&local)?;let change_len = change_file.metadata()?.len();let mut change = thrussh::CryptoVec::new_zeroed(change_len as usize);use std::io::Read;change_file.read_exact(&mut change[..])?;self.c.data(format!("apply {} {} {}\n", to_channel, c.to_base32(), change_len).as_bytes(),).await?;self.c.data(&change[..]).await?;libpijul::changestore::filesystem::pop_filename(&mut local);}CS::State(c) => {libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);let mut tag_file = libpijul::tag::OpenTagFile::open(&local, &c)?;let mut v = Vec::new();tag_file.short(&mut v)?;self.c.data(format!("tagup {} {} {}\n", c.to_base32(), to_channel, v.len()).as_bytes(),).await?;self.c.data(&v[..]).await?;libpijul::changestore::filesystem::pop_filename(&mut local);}}
c: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,sender: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
c: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,sender: &mut tokio::sync::mpsc::Sender<CS>,
c: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,sender: Option<&mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>>,
c: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,sender: Option<&mut tokio::sync::mpsc::Sender<CS>>,
if full {self.c.data(format!("change {}\n", h.to_base32()).as_bytes()).await?;} else {self.c.data(format!("partial {}\n", h.to_base32()).as_bytes()).await?;
match h {CS::Change(h) if full => {self.c.data(format!("change {}\n", h.to_base32()).as_bytes()).await?;}CS::Change(h) => {self.c.data(format!("partial {}\n", h.to_base32()).as_bytes()).await?;}CS::State(h) => {self.c.data(format!("tag {}\n", h.to_base32()).as_bytes()).await?;}
let mut tags: HashSet<Merkle> = HashSet::new();for x in txn.rev_iter_tags(&channel.read().tags, None)? {let (n, m) = x?;debug!("rev_iter_tags {:?} {:?}", n, m);// First, if the remote has exactly the same first n tags, break.if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {if p.b == m.b {debug!("the remote has tag {:?}", p.a);break;}if p.a != m.a {// What to do here? It is possible that state// `n` is a different state than `m.a` in the// remote, and is also tagged.}} else {tags.insert(m.a.into());}}debug!("tags = {:?}", tags);
if txn.remote_has_state(remote_ref, &m)? {break;
let h_unrecorded = self.remote_unrecs.iter().any(|(_, hh)| hh == &CS::Change(h.into()));if !h_unrecorded {if txn.remote_has_state(remote_ref, &m)?.is_some() {debug!("remote_has_state: {:?}", m);break;}
let unknown_changes = self.theirs_ge_dichotomy.iter().filter_map(|(_, h, _, _)| {if self.ours_ge_dichotomy_set.contains(h)|| txn.get_revchanges(&channel, h).unwrap().is_some()
let mut unknown_changes = Vec::new();for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();let change = CS::Change(*h);if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {unknown_changes.push(change)}if *is_tag {let m_is_known = if let Some(n) = txn.channel_has_state(txn.states(&*channel.read()), &m.into()).unwrap()
let n = self.dichotomy_changelist(txn, &remote.lock().remote).await?;
let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
}async fn update_changelist_pushpull_from_scratch(&mut self,txn: &mut MutTxn<()>,path: &[String],current_channel: &ChannelRef<MutTxn<()>>,) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {debug!("no id, starting from scratch");let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;let mut theirs_ge_dichotomy_set = HashSet::new();let mut to_download = Vec::new();for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {theirs_ge_dichotomy_set.insert(CS::Change(*h));if txn.get_revchanges(current_channel, h)?.is_none() {to_download.push(CS::Change(*h));}if *is_tag {let ch = current_channel.read();if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {if !txn.is_tagged(txn.tags(&*ch), n.into())? {to_download.push(CS::State(*m));}} else {to_download.push(CS::State(*m));}}}Ok(RemoteDelta {inodes,remote_ref: None,to_download,ours_ge_dichotomy_set: HashSet::new(),theirs_ge_dichotomy,theirs_ge_dichotomy_set,remote_unrecs: Vec::new(),})
debug!("no id, starting from scratch");let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;let mut theirs_ge_dichotomy_set = HashSet::new();let mut to_download = Vec::new();let mut tags = HashSet::new();for (_, h, m, is_tagged) in theirs_ge_dichotomy.iter() {theirs_ge_dichotomy_set.insert(*h);if txn.get_revchanges(current_channel, h)?.is_none() {to_download.push(*h);if *is_tagged {tags.insert(*m);}}}return Ok(RemoteDelta {inodes,remote_ref: None,to_download,tags,ours_ge_dichotomy_set: HashSet::new(),theirs_ge_dichotomy,theirs_ge_dichotomy_set,remote_unrecs: Vec::new(),});};let mut remote_ref = if let Some(name) = self.name() {txn.open_or_create_remote(id, name).unwrap()} else {unreachable!()
return self.update_changelist_pushpull_from_scratch(txn, path, current_channel).await;
let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock().remote).await?;let ours_ge_dichotomy: Vec<(u64, Hash)> = txn
let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;let ours_ge_dichotomy: Vec<(u64, CS)> = txn
.collect::<HashSet<Hash>>();// remote_unrecs = {x: (u64, Hash) | x \in ours_ge_dichot /\ ~(x \in theirs_ge_dichot) /\ x \in current_channel }let mut remote_unrecs = Vec::new();for (n, hash) in &ours_ge_dichotomy {if theirs_ge_dichotomy_set.contains(hash) {// If this change is still present in the remote, skipcontinue;} else if txn.get_revchanges(¤t_channel, &hash)?.is_none() {// If this unrecord wasn't in our current channel, skipcontinue;} else {remote_unrecs.push((*n, *hash))
.collect::<HashSet<CS>>();let mut theirs_ge_dichotomy_set = HashSet::new();for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {theirs_ge_dichotomy_set.insert(CS::Change(*h));if *is_tag {theirs_ge_dichotomy_set.insert(CS::State(*m));
let should_cache = force_cache.unwrap_or_else(|| remote_unrecs.is_empty());
// remote_unrecs = {x: (u64, Hash) | x \in ours_ge_dichot /\ ~(x \in theirs_ge_dichot) /\ x \in current_channel }let remote_unrecs = remote_unrecs(txn,current_channel,&ours_ge_dichotomy,&theirs_ge_dichotomy_set,)?;let should_cache = if let Some(true) = force_cache {true} else {remote_unrecs.is_empty()};debug!("should_cache = {:?} {:?} {:?}",force_cache, remote_unrecs, should_cache);
for (k, _) in ours_ge_dichotomy.iter().copied() {txn.del_remote(&mut remote_ref, k)?;
use libpijul::ChannelMutTxnT;for (k, t) in ours_ge_dichotomy.iter().copied() {match t {CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,CS::Change(_) => {txn.del_remote(&mut remote_ref, k)?;}}
for (n, h, m, t) in theirs_ge_dichotomy.iter().copied() {txn.put_remote(&mut remote_ref, n, (h, m, t))?;
for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {debug!("theirs: {:?} {:?} {:?}", n, h, m);txn.put_remote(&mut remote_ref, n, (h, m))?;if is_tag {txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;}
let state_cond = |txn: &MutTxn<()>, merkle: &libpijul::pristine::SerializedMerkle| {txn.channel_has_state(txn.states(&*current_channel.read()), merkle).map(|x| x.is_some())};let change_cond = |txn: &MutTxn<()>, hash: &Hash| {txn.get_revchanges(¤t_channel, hash).unwrap().is_none()};// IF:// The user only wanted to push/pull specific changes// ELIF:// The user specified no changes and there were no remote unrecords// effecting the current channel means we can auto-cache// the local remote cache// ELSE:// The user specified no changes but there were remote unrecords// effecting the current channel meaning we can't auto-cache// the local remote cache.
} else if should_cache {let mut to_download: Vec<Hash> = Vec::new();let mut tags = HashSet::new();{let rem = remote_ref.lock();for thing in txn.iter_remote(&rem.remote, 0)? {let (n, libpijul::pristine::Pair { a: hash, b: merkle }) = thing?;if state_cond(txn, &merkle)? {break;} else if change_cond(txn, &hash.into()) {to_download.push(Hash::from(hash));if txn.is_tagged(&rem.tags, (*n).into())? {tags.insert(merkle.into());}
} else {let mut to_download: Vec<CS> = Vec::new();for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {// In all cases, add this new change/state/tag to `to_download`.let ch = CS::Change(*h);if txn.get_revchanges(¤t_channel, h).unwrap().is_none() {to_download.push(ch.clone());if *is_tag {to_download.push(CS::State(*m));
}}Ok(RemoteDelta {inodes,remote_ref: Some(remote_ref),to_download,tags,ours_ge_dichotomy_set,theirs_ge_dichotomy,theirs_ge_dichotomy_set,remote_unrecs,})} else {let mut to_download: Vec<Hash> = Vec::new();for thing in txn.iter_remote(&remote_ref.lock().remote, 0)? {let (n, libpijul::pristine::Pair { a: hash, b: merkle }) = thing?;if u64::from(*n) < dichotomy_n {if state_cond(txn, &merkle)? {continue;} else if change_cond(txn, &hash.into()) {to_download.push(Hash::from(hash));
} else if *is_tag {let has_tag = if let Some(n) =txn.channel_has_state(txn.states(¤t_channel.read()), &m.into())?{txn.is_tagged(txn.tags(¤t_channel.read()), n.into())?} else {false};if !has_tag {to_download.push(CS::State(*m));
}let mut tags = HashSet::new();for (_, hash, merkle, t) in &theirs_ge_dichotomy {if state_cond(txn, &merkle.into())? {continue;} else if change_cond(txn, &hash) {to_download.push(Hash::from(*hash));if *t {tags.insert(*merkle);
// Additionally, if there are no remote unrecords// (i.e. if `should_cache`), cache.if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {use libpijul::ChannelMutTxnT;txn.put_remote(&mut remote_ref, *n, (*h, *m))?;if *is_tag {let mut rem = remote_ref.lock();txn.put_tags(&mut rem.tags, *n, m)?;
if let Some((_, s)) = self.get_state(txn, Some(b)).await? {if s == state {
let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {v.into()} else {Merkle::zero()};debug!("last_state: {:?} {:?}", state, last_statet);if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {debug!("remote last_state: {:?} {:?}", s, st);if s == state && st == last_statet {
let (mid, state) = txn.get_remote_state(remote, mid)?.unwrap();
let (mid, state) = {let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();(a, b.b)};let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {// There's still a tag at position >= mid in the// sequence.b.b.into()} else {// No tag at or after mid, the last state, `statet`,// is the right answer in that case.last_statet};
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,send: &mut tokio::sync::mpsc::Sender<CS>,
libpijul::changestore::filesystem::push_filename(&mut change_path_, h);if std::fs::metadata(&change_path_).is_err() {hash_send.send(*h)?;to_download.insert(*h);
if let CS::Change(h) = h {libpijul::changestore::filesystem::push_filename(&mut change_path_, h);if std::fs::metadata(&change_path_).is_err() {hash_send.send(CS::Change(*h))?;to_download.insert(CS::Change(*h));}libpijul::changestore::filesystem::pop_filename(&mut change_path_);
let changes = repo.changes.get_changes(h)?;changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);if let Some(h) = inode.change {inodes.contains(&Position {change: h,pos: inode.pos,})} else {false}
if let CS::Change(h) = h {let changes = repo.changes.get_changes(h)?;changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);if let Some(h) = inode.change {inodes.contains(&Position {change: h,pos: inode.pos,})} else {false}})
let mut channel = channel.write();txn.apply_change_ws(&repo.changes, &mut channel, h, &mut ws)?;
if let CS::Change(h) = h {let mut channel = channel.write();txn.apply_change_ws(&repo.changes, &mut channel, h, &mut ws)?;}
libpijul::changestore::filesystem::push_filename(&mut change_path, &hash);std::fs::create_dir_all(change_path.parent().unwrap())?;use libpijul::changestore::ChangeStore;hashes.push(hash);for dep in repo.changes.get_dependencies(&hash)? {let dep: libpijul::pristine::Hash = dep;send_hash.send(dep)?;
if let CS::Change(hash) = hash {libpijul::changestore::filesystem::push_filename(&mut change_path, &hash);std::fs::create_dir_all(change_path.parent().unwrap())?;use libpijul::changestore::ChangeStore;hashes.push(CS::Change(hash));for dep in repo.changes.get_dependencies(&hash)? {let dep: libpijul::pristine::Hash = dep;send_hash.send(CS::Change(dep))?;}libpijul::changestore::filesystem::pop_filename(&mut change_path);
}/// Compare the remote set (theirs_ge_dichotomy) with our current/// version of that (ours_ge_dichotomy) and return the changes in our/// current version that are not in the remote anymore.fn remote_unrecs<T: TxnTExt + ChannelTxnT>(txn: &T,current_channel: &ChannelRef<T>,ours_ge_dichotomy: &[(u64, CS)],theirs_ge_dichotomy_set: &HashSet<CS>,) -> Result<Vec<(u64, CS)>, anyhow::Error> {let mut remote_unrecs = Vec::new();for (n, hash) in ours_ge_dichotomy {debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);if theirs_ge_dichotomy_set.contains(hash) {// If this change is still present in the remote, skipdebug!("still present");continue;} else {let has_it = match hash {CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),CS::State(state) => {let ch = current_channel.read();if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {txn.is_tagged(txn.tags(&*ch), n.into())?} else {false}}};if has_it {remote_unrecs.push((*n, *hash))} else {// If this unrecord wasn't in our current channel, skipcontinue;}}}Ok(remote_unrecs)
) -> Result<Option<(u64, Merkle)>, anyhow::Error> {if let Some(mid) = mid {Ok(txn.get_changes(&channel, mid)?.map(|(_, m)| (mid, m)))
) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {if let Some(x) = txn.reverse_log(&*channel.read(), mid)?.next() {let (n, (_, m)) = x?;if let Some(m2) = txn.rev_iter_tags(txn.tags(&*channel.read()), Some(n.into()))?.next(){let (_, m2) = m2?;Ok(Some((n, m.into(), m2.b.into())))} else {Ok(Some((n, m.into(), Merkle::zero())))}
Ok(txn.reverse_log(&*channel.read(), None)?.next().map(|n| {let (n, (_, m)) = n.unwrap();(n, m.into())}))
Ok(None)
for x in remote_txn.log(&*remote_channel.read(), from)? {
let rem = remote_channel.read();let tags: Vec<u64> = remote_txn.iter_tags(remote_txn.tags(&*rem), from)?.map(|k| (*k.unwrap().0).into()).collect();let mut tagsi = 0;for x in remote_txn.log(&*rem, from)? {
libpijul::changestore::filesystem::push_filename(&mut local, &c);libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
match c {CS::Change(c) => {libpijul::changestore::filesystem::push_filename(&mut local, &c);libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);}CS::State(c) => {libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);libpijul::changestore::filesystem::push_tag_filename(&mut self.changes_dir, &c);}}
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,send: &mut tokio::sync::mpsc::Sender<CS>,
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);libpijul::changestore::filesystem::push_filename(&mut path, &c);super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();if std::fs::metadata(&path).is_ok() {debug!("metadata {:?} ok", path);
if let CS::Change(c) = c {libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);libpijul::changestore::filesystem::push_filename(&mut path, &c);super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();if std::fs::metadata(&path).is_ok() {debug!("metadata {:?} ok", path);libpijul::changestore::filesystem::pop_filename(&mut path);continue;}std::fs::create_dir_all(&path.parent().unwrap())?;if std::fs::hard_link(&self.changes_dir, &path).is_err() {std::fs::copy(&self.changes_dir, &path)?;}debug!("hard link done");libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
txn.apply_change_ws(store, &mut *channel, c, &mut ws)?;
match c {CS::Change(c) => {txn.apply_change_ws(store, &mut *channel, c, &mut ws)?;}CS::State(c) => {if let Some(n) = txn.channel_has_state(txn.states(&*channel), &c.into())? {let tags = txn.tags_mut(&mut *channel);txn.put_tags(tags, n.into(), c)?;} else {bail!("Cannot add tag {}: channel {:?} does not have that state",c.to_base32(),txn.name(&*channel))}}}
c: libpijul::pristine::Hash,) -> Result<libpijul::pristine::Hash, anyhow::Error> {libpijul::changestore::filesystem::push_filename(&mut path, &c);
c: CS,) -> Result<CS, anyhow::Error> {let (req, c32) = match c {CS::Change(c) => {libpijul::changestore::filesystem::push_filename(&mut path, &c);("change", c.to_base32())}CS::State(c) => {libpijul::changestore::filesystem::push_tag_filename(&mut path, &c);if std::fs::metadata(&path).is_ok() {bail!("Tag already downloaded: {}", c.to_base32())}("tag", c.to_base32())}};
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,send: &mut tokio::sync::mpsc::Sender<CS>,
let c = c.to_base32();to_channel.push(("apply", &c));
let base32;let body = match c {CS::Change(c) => {libpijul::changestore::filesystem::push_filename(&mut local, &c);let change = std::fs::read(&local)?;base32 = c.to_base32();to_channel.push(("apply", &base32));change}CS::State(c) => {libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);let mut tag_file = libpijul::tag::OpenTagFile::open(&local, &c)?;let mut v = Vec::new();tag_file.short(&mut v)?;base32 = c.to_base32();to_channel.push(("tagup", &base32));v}};libpijul::changestore::filesystem::pop_filename(&mut local);
u.sort_by(|a, b| {let na = txn.get_revchanges(&channel, a).unwrap().unwrap();let nb = txn.get_revchanges(&channel, b).unwrap().unwrap();na.cmp(&nb)
u.sort_by(|a, b| match (a, b) {(CS::Change(a), CS::Change(b)) => {let na = txn.get_revchanges(&channel, a).unwrap().unwrap();let nb = txn.get_revchanges(&channel, b).unwrap().unwrap();na.cmp(&nb)}(CS::State(a), CS::State(b)) => {let na = txn.channel_has_state(txn.states(&*channel.read()), &a.into()).unwrap().unwrap();let nb = txn.channel_has_state(txn.states(&*channel.read()), &b.into()).unwrap().unwrap();na.cmp(&nb)}_ => unreachable!(),
txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
match h {CS::Change(h) => {txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;}CS::State(s) => {if let Some(n) = txn.channel_has_state(&channel.states, &s.into())? {txn.put_tags(&mut channel.tags, n.into(), s)?;} else {bail!("Cannot add tag {}: channel {:?} does not have that state",s.to_base32(),channel.name)}}}
if let Some(int) = txn_.get_internal(&d.into())? {for inode in txn_.iter_rev_touched(int)? {let (int_, inode) = inode?;if int_ < int {continue;} else if int_ > int {break;}let ext = libpijul::pristine::Position {change: txn_.get_external(&inode.change)?.unwrap().into(),pos: inode.pos,};if inodes.is_empty() || inodes.contains(&ext) {touched.insert(*inode);
match d {CS::Change(d) => {if let Some(int) = txn_.get_internal(&d.into())? {for inode in txn_.iter_rev_touched(int)? {let (int_, inode) = inode?;if int_ < int {continue;} else if int_ > int {break;}let ext = libpijul::pristine::Position {change: txn_.get_external(&inode.change)?.unwrap().into(),pos: inode.pos,};if inodes.is_empty() || inodes.contains(&ext) {touched.insert(*inode);}}
for d in c.get_dependencies(&h)? {if original.get(&d).is_some() && now_.get(&d).is_none() {
let hh = if let CS::Change(h) = h {h} else {stack.pop();result.push(h);continue;};for d in c.get_dependencies(&hh)? {if original.get(&CS::Change(d)).is_some() && now_.get(&CS::Change(d)).is_none() {
fn check_deps<C: ChangeStore>(c: &C,original: &[libpijul::Hash],now: &[libpijul::Hash],) -> Result<(), anyhow::Error> {
fn check_deps<C: ChangeStore>(c: &C, original: &[CS], now: &[CS]) -> Result<(), anyhow::Error> {
writeln!(&mut s, "# {}", header.message).expect("Infallible write to String");writeln!(&mut s, "# {}", header.timestamp).expect("Infallible write to String");writeln!(&mut s, "# {}", hash.to_base32()).expect("Infallible write to String");
writeln!(&mut s, "# {}", header.message).unwrap();writeln!(&mut s, "# {}", header.timestamp).unwrap();match hash {CS::Change(hash) => {writeln!(&mut s, "# {}", hash.to_base32()).unwrap();}CS::State(hash) => {writeln!(&mut s, "# {}", hash.to_base32()).unwrap();}}
writeln!(&mut s, "# {}", hash.to_base32()).expect("Infallible write to String");
let hash = match hash {CS::Change(hash) => hash.to_base32(),CS::State(hash) => hash.to_base32(),};writeln!(&mut s, "# {}", hash).expect("Infallible write to String");
writeln!(o, "{} {}", n, m.to_base32())?;
let m2 = if let Some(x) = txn.rev_iter_tags(txn.tags(&*channel.read()), Some(n))?.next(){x?.1.b.into()} else {Merkle::zero()};writeln!(o, "{} {} {}", n, m.to_base32(), m2.to_base32())?;
} else if let Some(x) = txn.read().reverse_log(&*channel.read(), None)?.next() {let (n, (_, m)) = x?;let m: Merkle = m.into();writeln!(o, "{} {}", n, m.to_base32())?
writeln!(o, "-")?;
let txn = txn.read();if let Some(x) = txn.reverse_log(&*channel.read(), None)?.next() {let (n, (_, m)) = x?;let m: Merkle = m.into();let m2 = if let Some(x) = txn.rev_iter_tags(txn.tags(&*channel.read()), Some(n))?.next(){x?.1.b.into()} else {Merkle::zero()};writeln!(o, "{} {} {}", n, m.to_base32(), m2.to_base32())?} else {writeln!(o, "-")?;}
let mut tag_path = repo.changes_dir.clone();libpijul::changestore::filesystem::push_tag_filename(&mut tag_path, &state);let mut tag = libpijul::tag::OpenTagFile::open(&tag_path, &state)?;let mut buf = Vec::new();tag.short(&mut buf)?;o.write_u64::<BigEndian>(buf.len() as u64)?;o.write_all(&buf)?;o.flush()?;}} else if let Some(cap) = TAGUP.captures(&buf) {if let Some(state) = Merkle::from_base32(cap[1].as_bytes()) {
let header = bincode::deserialize(&buf)?;
let header = libpijul::tag::read_short(std::io::Cursor::new(&buf[..]), &m)?;let temp_path = tag_path.with_extension("tmp");std::fs::create_dir_all(temp_path.parent().unwrap())?;let mut w = std::fs::File::create(&temp_path)?;
writeln!(v, "{}\n", p.to_base32()).unwrap();let deps = changes.get_dependencies(&p)?;if !deps.is_empty() {write!(v, " Dependencies:").unwrap();for d in deps {write!(v, " {}", d.to_base32()).unwrap();
let header = match p {CS::Change(p) => {writeln!(v, "{}\n", p.to_base32()).unwrap();let deps = changes.get_dependencies(&p)?;if !deps.is_empty() {write!(v, " Dependencies:").unwrap();for d in deps {write!(v, " {}", d.to_base32()).unwrap();}writeln!(v).unwrap();}changes.get_header(&p)?}CS::State(p) => {writeln!(v, "t{}\n", p.to_base32()).unwrap();changes.get_tag_header(&p)?
.filter_map(|l| libpijul::Hash::from_base32(l.as_bytes()))
.filter_map(|l| {::log::debug!("l = {:?} {:?}",l,libpijul::Merkle::from_base32(l.as_bytes()));if l.starts_with("t") {libpijul::Merkle::from_base32(&l.as_bytes()[1..]).map(crate::remote::CS::State)} else {libpijul::Hash::from_base32(l.as_bytes()).map(crate::remote::CS::Change)}})
pub fn short<W: std::io::Write>(&mut self, mut w: W) -> Result<(), TagError> {let mut header_buf = vec![0u8; (self.header.channel - self.header.header) as usize];self.file.seek(SeekFrom::Start(self.header.header))?;self.file.read_exact(&mut header_buf)?;debug!("header_buf = {:?}", header_buf);let mut off = FileHeader {version: VERSION,header: 0,channel: 0,unhashed: 0,total: 0,offsets: DbOffsets::default(),state: self.header.state.clone(),};off.header = bincode::serialized_size(&off)?;off.channel = off.header + header_buf.len() as u64;off.total = off.channel;let mut off_buf = Vec::with_capacity(off.header as usize);bincode::serialize_into(&mut off_buf, &off)?;w.write_all(&off_buf)?;w.write_all(&header_buf)?;Ok(())}}pub fn read_short<R: std::io::Read + std::io::Seek>(mut file: R, expected: &Merkle) -> Result<crate::change::ChangeHeader, TagError> {let mut off = [0u8; std::mem::size_of::<FileHeader>() as usize];file.seek(SeekFrom::Start(0))?;file.read_exact(&mut off)?;let header: FileHeader = bincode::deserialize(&off).map_err(TagError::BincodeDe)?;debug!("header = {:?}", header);if &header.state == expected {file.seek(SeekFrom::Start(header.header))?;Ok(bincode::deserialize_from(file).map_err(TagError::BincodeDe)?)} else {Err(TagError::WrongHash {expected: *expected,got: header.state,})}
let tags = copy::<L64, Pair<SerializedMerkle, SerializedMerkle>, UP<L64, Pair<SerializedMerkle, SerializedMerkle>>, _>(
debug!("copying tags");let tags = copy::<L64, Pair<SerializedMerkle, SerializedMerkle>, P<L64, Pair<SerializedMerkle, SerializedMerkle>>, _>(
fn state_from_prefix(&self,channel: &Self::Channel,s: &str,) -> Result<(Merkle, L64), super::HashPrefixError<Self::GraphError>> {let h: SerializedMerkle = if let Some(ref h) = Merkle::from_prefix(s) {h.into()} else {return Err(super::HashPrefixError::Parse(s.to_string()));};let mut result = None;debug!("h = {:?}", h);for x in btree::iter(&self.txn, &channel.states, Some((&h, None))).map_err(|e| super::HashPrefixError::Txn(e.into()))?{let (e, i) = x.map_err(|e| super::HashPrefixError::Txn(e.into()))?;debug!("{:?} {:?}", e, i);if e < &h {continue;} else {let e: Merkle = e.into();let b32 = e.to_base32();debug!("{:?}", b32);let (b32, _) = b32.split_at(s.len().min(b32.len()));if b32 != s {break;} else if result.is_none() {result = Some((e, *i))} else {return Err(super::HashPrefixError::Ambiguous(s.to_string()));}}}if let Some(result) = result {Ok(result)} else {Err(super::HashPrefixError::NotFound(s.to_string()))}}
sanakirja_cursor!(remotetags, L64, Pair<SerializedMerkle, SerializedMerkle>);sanakirja_rev_cursor!(remotetags, L64, Pair<SerializedMerkle, SerializedMerkle>);type RemotetagsCursor = ::sanakirja::btree::cursor::Cursor<L64,Pair<SerializedMerkle, SerializedMerkle>,UP<L64, Pair<SerializedMerkle, SerializedMerkle>>,>;
}fn get_remote_tag(&self,remote: &Self::Tags,n: u64,) -> Result<Option<(u64, &Pair<SerializedMerkle, SerializedMerkle>)>, TxnErr<Self::GraphError>>{let n = n.into();if let Some(x) = btree::rev_iter(&self.txn, remote, Some((&n, None)))?.next() {let (&k, m) = x?;Ok(Some((k.into(), m)))} else {Ok(None)}
impl<'a, T: ChannelTxnT> Iterator for crate::pristine::RevCursor<T, &'a T, T::TagsCursor, L64, ()>{type Item = Result<&'a L64, TxnErr<T::GraphError>>;fn next(&mut self) -> Option<Self::Item> {match self.txn.cursor_tags_prev(&mut self.cursor) {Ok(Some(x)) => Some(Ok(x)),Ok(None) => None,Err(e) => Some(Err(e)),}}}
impl<'a, T: ChannelTxnT>crate::pristine::Cursor<T, &'a T, T::TagsCursor, L64, ()>{pub fn prev(&mut self) -> Option<Result<u64, TxnErr<T::GraphError>>> {match self.txn.cursor_tags_prev(&mut self.cursor) {Ok(Some(x)) => Some(Ok((*x).into())),Ok(None) => None,Err(e) => Some(Err(e)),}}}impl<'a, T: ChannelTxnT> Iterator for crate::pristine::Cursor<T, &'a T, T::TagsCursor, L64, ()>{type Item = Result<u64, TxnErr<T::GraphError>>;fn next(&mut self) -> Option<Self::Item> {match self.txn.cursor_tags_next(&mut self.cursor) {Ok(Some(x)) => Some(Ok((*x).into())),Ok(None) => None,Err(e) => Some(Err(e)),}}}