use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::path::PathBuf;
use super::{make_changelist, parse_changelist};
use anyhow::bail;
use clap::Clap;
use lazy_static::lazy_static;
use libpijul::changestore::ChangeStore;
use libpijul::pristine::sanakirja::Txn;
use libpijul::*;
use log::debug;
use regex::Regex;
use crate::repository::Repository;
#[derive(Clap, Debug)]
pub struct Remote {
#[clap(subcommand)]
subcmd: Option<SubRemote>,
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
}
#[derive(Clap, Debug)]
pub enum SubRemote {
#[clap(name = "delete")]
Delete { remote: String },
}
impl Remote {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
debug!("{:?}", repo.config);
let mut stdout = std::io::stdout();
match self.subcmd {
None => {
let txn = repo.pristine.txn_begin()?;
for r in txn.iter_remotes("")? {
let r = r?;
writeln!(stdout, " {}", r.name())?;
}
}
Some(SubRemote::Delete { remote }) => {
let mut txn = repo.pristine.mut_txn_begin();
if !txn.drop_named_remote(&remote)? {
writeln!(std::io::stderr(), "Remote not found: {:?}", remote)?
} else {
txn.commit()?;
}
}
}
Ok(())
}
}
#[derive(Clap, Debug)]
pub struct Push {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "from-channel")]
from_channel: Option<String>,
#[clap(long = "all", short = 'a', conflicts_with = "changes")]
all: bool,
#[clap(short = 'k')]
no_cert_check: bool,
#[clap(long = "path")]
path: Vec<String>,
to: Option<String>,
#[clap(long = "to-channel")]
to_channel: Option<String>,
#[clap(last = true)]
changes: Vec<String>,
}
#[derive(Clap, Debug)]
pub struct Pull {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "to-channel")]
to_channel: Option<String>,
#[clap(long = "all", short = 'a', conflicts_with = "changes")]
all: bool,
#[clap(short = 'k')]
no_cert_check: bool,
#[clap(long = "full")]
full: bool, #[clap(long = "path")]
path: Vec<String>,
from: Option<String>,
#[clap(long = "from-channel")]
from_channel: Option<String>,
#[clap(last = true)]
changes: Vec<String>, }
lazy_static! {
static ref CHANNEL: Regex = Regex::new(r#"([^:]*)(:(.*))?"#).unwrap();
}
impl Push {
pub async fn run(self) -> Result<(), anyhow::Error> {
let mut stderr = std::io::stderr();
let repo = Repository::find_root(self.repo_path)?;
debug!("{:?}", repo.config);
let channel_name = repo.config.get_current_channel(self.from_channel.as_ref());
let remote_name = if let Some(ref rem) = self.to {
rem
} else if let Some(ref def) = repo.config.default_remote {
def
} else {
bail!("Missing remote");
};
let mut push_channel = None;
let remote_channel = if let Some(ref c) = self.to_channel {
let c = CHANNEL.captures(c).unwrap();
push_channel = c.get(3).map(|x| x.as_str());
let c = c.get(1).unwrap().as_str();
if c.is_empty() {
channel_name
} else {
c
}
} else {
channel_name
};
debug!("remote_channel = {:?} {:?}", remote_channel, push_channel);
let mut remote = repo
.remote(
Some(&repo.path),
&remote_name,
remote_channel,
self.no_cert_check,
)
.await?;
let mut txn = repo.pristine.mut_txn_begin();
let remote_changes = remote.update_changelist(&mut txn, &self.path).await?;
let channel = txn.open_or_create_channel(channel_name)?;
let mut paths = HashSet::new();
for path in self.path.iter() {
let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
if ambiguous {
bail!("Ambiguous path: {:?}", path)
}
paths.insert(p);
paths.extend(
libpijul::fs::iter_graph_descendants(&txn, &channel.borrow().graph, p)?
.map(|x| x.unwrap()),
);
}
let mut to_upload = Vec::new();
for x in txn.reverse_log(&channel.borrow(), None)? {
let (_, (h, m)) = x?;
if let Some((_, ref remote_changes)) = remote_changes {
if txn.remote_has_state(remote_changes, m)? {
break;
}
let h_int = txn.get_internal(h)?.unwrap();
if !txn.remote_has_change(&remote_changes, h)? {
if paths.is_empty() {
to_upload.push(h)
} else {
for p in paths.iter() {
if txn.get_touched_files(*p, Some(h_int))?.is_some() {
to_upload.push(h);
break;
}
}
}
}
} else if let crate::remote::RemoteRepo::LocalChannel(ref remote_channel) = remote {
if let Some(channel) = txn.load_channel(remote_channel)? {
let channel = channel.borrow();
let h_int = txn.get_internal(h)?.unwrap();
if txn.get_changeset(Txn::changes(&channel), h_int)?.is_none() {
if paths.is_empty() {
to_upload.push(h)
} else {
for p in paths.iter() {
if txn.get_touched_files(*p, Some(h_int))?.is_some() {
to_upload.push(h);
break;
}
}
}
}
}
}
}
debug!("to_upload = {:?}", to_upload);
if to_upload.is_empty() {
writeln!(stderr, "Nothing to push")?;
return Ok(());
}
to_upload.reverse();
let to_upload = if !self.changes.is_empty() {
let mut u = Vec::new();
let mut not_found = Vec::new();
for change in self.changes.iter() {
match txn.hash_from_prefix(change) {
Ok((hash, _)) => {
if to_upload.contains(&hash) {
u.push(hash);
}
}
Err(_) => {
if !not_found.contains(change) {
not_found.push(change.to_string());
}
}
}
}
if !not_found.is_empty() {
bail!("Changes not found: {:?}", not_found)
}
check_deps(&repo.changes, &to_upload, &u)?;
u
} else if self.all {
to_upload
} else {
let mut o = make_changelist(&repo.changes, &to_upload, "push")?;
let remote_changes = remote_changes.map(|x| x.1);
loop {
let d = parse_changelist(&edit::edit_bytes(&o[..])?);
let comp = complete_deps(&txn, &remote_changes, &repo.changes, &to_upload, &d)?;
if comp.len() == d.len() {
break comp;
}
o = make_changelist(&repo.changes, &comp, "push")?
}
};
debug!("to_upload = {:?}", to_upload);
if to_upload.is_empty() {
writeln!(stderr, "Nothing to push")?;
return Ok(());
}
remote
.upload_changes(&mut txn, repo.changes_dir.clone(), push_channel, &to_upload)
.await?;
txn.commit()?;
remote.finish().await?;
Ok(())
}
}
impl Pull {
pub async fn run(self) -> Result<(), anyhow::Error> {
let mut repo = Repository::find_root(self.repo_path)?;
let mut txn = repo.pristine.mut_txn_begin();
let channel_name = repo.config.get_current_channel(self.to_channel.as_ref());
let mut channel = txn.open_or_create_channel(channel_name)?;
debug!("{:?}", repo.config);
let remote_name = if let Some(ref rem) = self.from {
rem
} else if let Some(ref def) = repo.config.default_remote {
def
} else {
bail!("Missing remote")
};
let from_channel = if let Some(ref c) = self.from_channel {
c
} else {
crate::DEFAULT_CHANNEL
};
let mut remote = repo
.remote(
Some(&repo.path),
&remote_name,
from_channel,
self.no_cert_check,
)
.await?;
debug!("downloading");
let mut inodes: HashSet<libpijul::pristine::Position<libpijul::Hash>> = HashSet::new();
let mut to_download = if self.changes.is_empty() {
let remote_changes = remote.update_changelist(&mut txn, &self.path).await?;
debug!("changelist done");
let mut to_download = Vec::new();
if let Some((inodes_, remote_changes)) = remote_changes {
inodes.extend(inodes_.into_iter());
for x in txn.iter_remote(&remote_changes.borrow().remote, 0)? {
let (h, m) = x?.1;
if txn.channel_has_state(&channel.borrow(), m)?.is_some() {
break;
} else if txn.get_revchanges(&channel, h)?.is_none() {
to_download.push(h)
}
}
} else if let crate::remote::RemoteRepo::LocalChannel(ref remote_channel) = remote {
let mut inodes_ = HashSet::new();
for path in self.path.iter() {
let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
if ambiguous {
bail!("Ambiguous path: {:?}", path)
}
inodes_.insert(p);
inodes_.extend(
libpijul::fs::iter_graph_descendants(&txn, &channel.borrow().graph, p)?
.map(|x| x.unwrap()),
);
}
inodes.extend(inodes_.iter().map(|x| libpijul::pristine::Position {
change: txn.get_external(x.change).unwrap().unwrap(),
pos: x.pos,
}));
if let Some(remote_channel) = txn.load_channel(remote_channel)? {
let remote_channel = remote_channel.borrow();
for x in txn.reverse_log(&remote_channel, None)? {
let (h, m) = x?.1;
if txn.channel_has_state(&channel.borrow(), m)?.is_some() {
break;
}
let h_int = txn.get_internal(h)?.unwrap();
if txn
.get_changeset(Txn::changes(&channel.borrow()), h_int)?
.is_none()
{
if inodes_.is_empty()
|| inodes_.iter().any(|&inode| {
txn.get_rev_touched_files(h_int, Some(inode))
.unwrap()
.is_some()
})
{
to_download.push(h)
}
}
}
}
}
to_download
} else {
let r: Result<Vec<libpijul::Hash>, anyhow::Error> = self
.changes
.iter()
.map(|h| Ok(txn.hash_from_prefix(h)?.0))
.collect();
r?
};
debug!("recording");
let hash = super::pending(&mut txn, &mut channel, &mut repo)?;
let to_download = remote
.pull(
&mut repo,
&mut txn,
&mut channel,
&mut to_download,
&inodes,
self.all,
)
.await?;
if to_download.is_empty() {
let mut stderr = std::io::stderr();
writeln!(stderr, "Nothing to pull")?;
return Ok(());
}
if !self.all {
let mut o = make_changelist(&repo.changes, &to_download, "pull")?;
let d = loop {
let d = parse_changelist(&edit::edit_bytes(&o[..])?);
let comp = complete_deps(&txn, &None, &repo.changes, &to_download, &d)?;
if comp.len() == d.len() {
break comp;
}
o = make_changelist(&repo.changes, &comp, "pull")?
};
let mut ws = libpijul::ApplyWorkspace::new();
debug!("to_download = {:?}", to_download);
let progress = indicatif::ProgressBar::new(d.len() as u64);
progress.set_style(
indicatif::ProgressStyle::default_spinner()
.template(" Applying changes {wide_bar} {pos}/{len}"),
);
for h in d.iter() {
txn.apply_change_rec_ws(&repo.changes, &mut channel, *h, &mut ws)?;
progress.inc(1);
}
progress.set_style(
indicatif::ProgressStyle::default_bar()
.template("✓ Applying changes {wide_bar} {pos}/{len}"),
);
progress.finish();
}
debug!("completing changes");
remote
.complete_changes(&repo, &txn, &mut channel, &to_download, self.full)
.await?;
remote.finish().await?;
let progress = indicatif::ProgressBar::new_spinner();
progress.set_style(
indicatif::ProgressStyle::default_spinner().template("{spinner} Outputting repository"),
);
progress.enable_steady_tick(100);
debug!("inodes = {:?}", inodes);
let mut touched = HashSet::new();
for d in to_download.iter() {
if let Some(int) = txn.get_internal(*d)? {
for inode in txn.iter_rev_touched(int)? {
let (int_, inode) = inode?;
if int_ < int {
continue;
} else if int_ > int {
break;
}
let ext = libpijul::pristine::Position {
change: txn.get_external(inode.change)?.unwrap(),
pos: inode.pos,
};
if inodes.is_empty() || inodes.contains(&ext) {
touched.insert(inode);
}
}
}
}
let mut done = HashMap::new();
for i in touched {
let (path, _) =
libpijul::fs::find_path(&repo.changes, &txn, &channel.borrow(), false, i)?;
debug!("path = {:?}", path);
txn.output_repository_no_pending(
&mut repo.working_copy,
&repo.changes,
&mut channel,
&mut done,
&path,
true,
None,
)?;
}
progress.set_style(
indicatif::ProgressStyle::default_spinner().template("✓ Outputting repository"),
);
progress.finish();
if let Some(h) = hash {
txn.unrecord(&repo.changes, &mut channel, &h)?;
repo.changes.del_change(&h)?;
}
txn.commit()?;
Ok(())
}
}
fn complete_deps<T: TxnT, C: ChangeStore>(
txn: &T,
remote_changes: &Option<libpijul::RemoteRef<T>>,
c: &C,
original: &[libpijul::Hash],
now: &[libpijul::Hash],
) -> Result<Vec<libpijul::Hash>, anyhow::Error> {
let original_: HashSet<_> = original.iter().collect();
let mut now_ = HashSet::with_capacity(original.len());
let mut result = Vec::with_capacity(original.len());
for &h in now {
now_.insert(h);
result.push(h);
}
let mut stack = now.to_vec();
stack.reverse();
while let Some(n) = stack.pop() {
for d in c.get_dependencies(&n)? {
if let Some(ref rem) = remote_changes {
if txn.remote_has_change(rem, d)? {
continue;
}
}
if original_.get(&d).is_some() && now_.get(&d).is_none() {
result.push(d);
now_.insert(d);
stack.push(d);
}
}
if now_.insert(n) {
result.push(n)
}
}
Ok(result)
}
fn check_deps<C: ChangeStore>(
c: &C,
original: &[libpijul::Hash],
now: &[libpijul::Hash],
) -> Result<(), anyhow::Error> {
let original_: HashSet<_> = original.iter().collect();
let now_: HashSet<_> = now.iter().collect();
for n in now {
for d in c.get_dependencies(n)? {
if original_.get(&d).is_some() && now_.get(&d).is_none() {
bail!("Missing dependency: {:?}", n)
}
}
}
Ok(())
}