With a really nice speed bonus
BDEVQIAUBDZLH5NNXJ3HD3WK2ART4V6K2FLHMAQVAQYNXGL4TOIQC
PLQ2WBZ3PZJIKX66DPFRHRHTL2HA2ZGLJRYHUKO44JS4VT4T6YEAC
APRNC3CWRUYCHBLNL4WBO7VIWOTFAX6SYTHMXTSWJUSSQNWEIA3AC
6MGFBMON6ASGBRQJXY6VUGGZQHXPQSMQJZOOD5LKS2FZODB6ET7AC
UWQB743KR36X6A6JVNK3VH6KMEFUHOUT6ZS2Y2SWIJR55JSF5WQAC
ODUDDQRY373JMDR6W2BHUWSKEU6TEICQMNZIAIFKJPNUA5UN3C4QC
SI454P2VFO6Y6RO2NIOH6ZROURP3OTB2BBXH2F36KJT6YML6YBIAC
KOWYPLMX4TCQANL33QW5VQTKWBQVCMOVLBQUQCTOK6YVQUHZJR5QC
#[derive(Debug)]
struct Vertex<'a> {
pkg: deb::Stanza<'a>,
downloaded: Downloaded,
index: Option<usize>,
lowlink: usize,
on_stack: bool,
scc: usize,
deps: Vec<usize>,
context_path: Option<Arc<PathBuf>>,
final_path: Option<Arc<PathBuf>>,
deps_paths: Vec<Arc<PathBuf>>,
transitive_deps: Vec<Arc<PathBuf>>,
transitive_deps_h: BTreeSet<Arc<PathBuf>>,
ld_path: Vec<Arc<PathBuf>>,
ld_path_h: BTreeSet<Arc<PathBuf>>,
files: BTreeSet<Arc<String>>,
}
let mut stack = vec![StackElt {
package: pkg,
rx: None,
deps: Vec::new(),
transitive_deps: Vec::new(),
transitive_deps_h: BTreeSet::new(),
ld_path: Vec::new(),
ld_path_h: BTreeSet::new(),
files: BTreeSet::new(),
}];
let mut seen: HashMap<_, (Vec<&str>, usize)> = HashMap::new();
let mut stack = vec![(pkg, None)];
let mut vertices = Vec::new();
while let Some((pkg, rx)) = stack.pop() {
info!("{:?}", pkg);
if let Some(rx) = rx {
let downloaded: Result<Result<Downloaded, Error>, _> = rx.await;
let downloaded = downloaded.unwrap()?;
seen.get_mut(pkg.package).unwrap().1 = vertices.len();
vertices.push(Vertex {
downloaded,
deps: seen
.get(pkg.package)
.unwrap()
.0
.iter()
.map(|x| seen.get(x).unwrap().1)
.collect(),
pkg,
context_path: None,
deps_paths: Vec::new(),
files: BTreeSet::new(),
final_path: None,
ld_path: Vec::new(),
ld_path_h: BTreeSet::new(),
index: None,
lowlink: 0,
on_stack: false,
scc: 0,
transitive_deps: Vec::new(),
transitive_deps_h: BTreeSet::new(),
});
} else if !seen.contains_key(pkg.package) {
let rx = spawn_extract(client.clone(), &pkg).await;
let depends = pkg.depends.clone();
let pkg_package = pkg.package;
stack.push((pkg, Some(rx)));
let deps = push_deps(&depends, index, &mut stack).await;
seen.insert(pkg_package, (deps, 0));
}
}
let sccs = tarjan(&mut vertices);
debug!("sccs: {:?}", sccs);
if let Some(rx) = elt.rx.take() {
let downloaded = rx.await.unwrap()?;
info!("downloaded {:?}", elt.package);
let final_path = elt
.finalize(client, &files, downloaded, &mut stack, &mut result)
.await?;
seen.insert(
elt.package.package,
(elt.ld_path, elt.transitive_deps, final_path),
);
paths = elt.files;
} else {
if let Some((ld_path, transitive_deps, final_path)) = seen.get(&elt.package.package) {
debug!("already seen {:?}", elt.package.package);
if let Some(last) = stack.iter_mut().rev().filter(|x| x.rx.is_some()).next() {
for ld in ld_path {
if last.ld_path_h.insert(ld.clone()) {
last.ld_path.push(ld.clone());
}
}
for dep in transitive_deps {
if last.transitive_deps_h.insert(dep.clone()) {
debug!(
"adding transitive dep: {:?} -> {:?}",
last.package.package, dep
);
last.transitive_deps.push(dep.clone());
}
}
last.deps.push(final_path.clone());
for scc in sccs.iter() {
for v in scc.iter() {
let mut context_hasher = blake3::Hasher::new();
context_hasher.update(vertices[*v].pkg.sha256.unwrap().as_bytes());
for d in vertices[*v].deps.iter() {
let w = &vertices[*d];
if w.scc == vertices[*v].scc {
// If in the same SCC, we can't know anything other than the SHA256.
context_hasher.update(w.pkg.sha256.unwrap().as_bytes());
} else {
context_hasher
.update(w.final_path.as_ref().unwrap().to_str().unwrap().as_bytes());
elt.spawn_extract(client.clone()).await;
elt.push_deps(index, &mut stack).await;
for v in scc.iter() {
let f = finalize(&mut vertices, client, &files, *v).await?;
vertices[*v].final_path = Some(f.clone());
result.push(f)
}
fn tarjan(vertices: &mut [Vertex]) -> Vec<Vec<usize>> {
let mut sccs = Vec::new();
let mut stack = Vec::new();
let mut index = 0;
struct C {
vi: usize,
wj: usize,
}
let mut call_stack = vec![C {
vi: vertices.len() - 1,
wj: 0,
}];
'outer: while let Some(mut c) = call_stack.pop() {
if vertices[c.vi].index.is_none() {
let ref mut v = vertices[c.vi];
v.index = Some(index);
v.lowlink = index;
v.on_stack = true;
index += 1;
stack.push(c.vi);
}
while c.wj < vertices[c.vi].deps.len() {
let wi = vertices[c.vi].deps[c.wj];
if let Some(index) = vertices[wi].index {
if vertices[wi].on_stack {
vertices[c.vi].lowlink = vertices[c.vi].lowlink.min(index)
}
c.wj += 1;
} else {
c.wj += 1;
call_stack.push(c);
call_stack.push(C { vi: wi, wj: 0 });
continue 'outer;
}
}
if Some(vertices[c.vi].lowlink) == vertices[c.vi].index {
let mut scc = Vec::new();
while let Some(p) = stack.pop() {
vertices[p].scc = sccs.len();
vertices[p].on_stack = false;
scc.push(p);
if p == c.vi {
break;
}
}
sccs.push(scc)
}
}
sccs
let mut f = std::io::BufReader::new(std::fs::File::open(&download.path)?);
let d = match deb::Deb::read(&mut f) {
Ok(d) => d,
Err(e) => {
std::fs::remove_file(&download.path).unwrap_or(());
return Err(e.into());
}
};
std::fs::create_dir_all(&path).unwrap_or(());
match d.decompress(&mut f, &path) {
let p = path.clone();
let dp = download.path.clone();
match tokio::task::spawn_blocking(move || {
let mut f = std::io::BufReader::new(std::fs::File::open(&dp)?);
let d = match deb::Deb::read(&mut f) {
Ok(d) => d,
Err(e) => {
std::fs::remove_file(&dp).unwrap_or(());
return Err(e.into());
}
};
std::fs::create_dir_all(&p).unwrap_or(());
d.decompress(&mut f, &p)
})
.await
.unwrap()
{
}
async fn spawn_extract<'a>(
client: Client,
stanza: &deb::Stanza<'a>,
) -> tokio::sync::oneshot::Receiver<Result<Downloaded, Error>> {
let url = client.url(stanza.file_name.as_deref());
let sha256 = stanza.sha256.unwrap().to_string();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let permit = client.download_sem.clone().acquire_owned().await.unwrap();
info!("downloading {:?}", url);
let (mut task, _) = match client.download_url(&url, &sha256).await {
Ok(x) => x,
Err(e) => {
tx.send(Err(e)).unwrap_or(());
return Ok(());
}
};
let is_extracted = std::fs::metadata(&task.path.with_extension("")).is_ok();
info!("finished downloading {:?}", url);
if !is_extracted {
// Sets extension
if let Err(e) = extract_task(&client, &mut task).await {
info!("finished extracting {:?} {:?}", url, e);
tx.send(Err(e)).unwrap_or(());
} else {
info!("finished extracting {:?}, Ok", url);
tx.send(Ok(task)).unwrap_or(());
}
info!("sent {:?}", url);
} else {
task.path.set_extension("");
tx.send(Ok(task)).unwrap_or(());
}
drop(permit);
Ok::<_, Error>(())
});
rx
async fn finalize(
&mut self,
client: &Client,
files: &Files,
downloaded: Downloaded,
stack: &mut Vec<StackElt<'a>>,
result: &mut Vec<Arc<PathBuf>>,
) -> Result<Arc<PathBuf>, Error> {
let mut context_hasher = blake3::Hasher::new();
context_hasher.update(self.package.sha256.unwrap().as_bytes());
self.deps.sort();
for d in self.deps.iter() {
context_hasher.update(d.to_str().unwrap().as_bytes());
async fn finalize<'a>(
vertices: &mut [Vertex<'a>],
client: &Client,
files: &Files,
v: usize,
) -> Result<Arc<PathBuf>, Error> {
debug!(
"finalize {:?} {:#?}",
vertices[v].pkg.package, vertices[v].transitive_deps
);
let mut ld_path = std::mem::replace(&mut vertices[v].ld_path, Vec::new());
let mut ld_path_h = std::mem::replace(&mut vertices[v].ld_path_h, BTreeSet::new());
let mut transitive_deps = std::mem::replace(&mut vertices[v].transitive_deps, Vec::new());
let mut transitive_deps_h =
std::mem::replace(&mut vertices[v].transitive_deps_h, BTreeSet::new());
let mut deps_paths = Vec::new();
for dep in vertices[v].deps.clone().iter() {
for ld in vertices[*dep].ld_path.iter() {
if ld_path_h.insert(ld.clone()) {
ld_path.push(ld.clone());
}
debug!(
"finalize {:?} {:#?}",
self.package.package, self.transitive_deps
);
let dest = client
.store_path
.join(data_encoding::HEXLOWER.encode(context_hasher.finalize().as_bytes()));
for d in vertices[*dep].transitive_deps.iter() {
if transitive_deps_h.insert(d.clone()) {
debug!(
"adding transitive dep: {:?} -> {:?}",
vertices[v].pkg.package, dep
);
transitive_deps.push(d.clone());
}
}
deps_paths.push(if vertices[*dep].scc == vertices[v].scc {
vertices[*dep].context_path.clone().unwrap()
} else {
vertices[*dep].final_path.clone().unwrap()
})
}
vertices[v].ld_path = ld_path;
vertices[v].ld_path_h = ld_path_h;
vertices[v].transitive_deps = transitive_deps;
vertices[v].transitive_deps_h = transitive_deps_h;
vertices[v].deps_paths = deps_paths;
let initial_deps_len = self.transitive_deps.len();
// Find the libs in this package.
self.find_libs(&downloaded.path, &dest);
// Find the extra ld paths to look for.
vertices[v].add_ld_paths().unwrap();
let base_package_name = self.package.file_name.unwrap().split('/').last().unwrap();
let base_package_name = Path::new(&base_package_name).with_extension("");
let base_package_name = base_package_name.to_str().unwrap();
let initial_deps_len = vertices[v].transitive_deps.len();
// Find the libs in this package.
vertices[v].find_libs();
let final_path = if std::fs::metadata(&dest).is_err() {
info!("create final path for {dest:?}");
match self
.create_final_path(client, &files, &downloaded.path, &dest, &base_package_name)
.await
{
Ok(x) => x,
Err(e) => {
tokio::fs::remove_dir_all(&dest).await.unwrap_or(());
return Err(e);
}
}
} else {
info!("found, no patching: {:?}", dest);
let mut output_hasher = blake3::Hasher::new();
let blakesums = dest.join("blake3sums");
let file = match tokio::fs::File::open(&blakesums).await {
Ok(file) => file,
Err(e) => {
error!("Error {:?} {:?}: {:?}", blakesums, downloaded.path, e);
return Err(e.into());
}
};
hash_reader(file, &mut output_hasher).await?;
let base_package_name = vertices[v]
.pkg
.file_name
.unwrap()
.split('/')
.last()
.unwrap();
let base_package_name = Path::new(&base_package_name).with_extension("");
let base_package_name = base_package_name.to_str().unwrap();
let r = tokio::io::BufReader::new(
tokio::fs::File::open(&dest.join("paths")).await.unwrap(),
);
let mut l = r.lines();
while let Some(l) = l.next_line().await? {
self.files.insert(Arc::new(l));
let final_path = if std::fs::metadata(&*dest).is_err() {
info!("create final path for {dest:?}");
match vertices[v]
.create_final_path(client, &files, &dest, &base_package_name)
.await
{
Ok(x) => x,
Err(e) => {
tokio::fs::remove_dir_all(&*dest).await.unwrap_or(());
return Err(e);
client.store_path.join(&format!(
"{}-{}",
data_encoding::HEXLOWER.encode(output_hasher.finalize().as_bytes()),
base_package_name,
))
};
let final_path = Arc::new(final_path);
add_subst(downloaded.path.clone(), &dest, files.clone()).await?;
// Replace prefix for all library deps we've just added,
// in order to get a Merkle tree.
for dep in &mut self.transitive_deps[initial_deps_len..] {
let end = dep.strip_prefix(&dest).unwrap();
*dep = Arc::new(final_path.join(&end));
info!("symlink {:?} {:?}", downloaded.path, final_path);
match std::os::unix::fs::symlink(&dest, &*final_path) {
Ok(()) => (),
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
let got = std::fs::read_link(&*final_path)?;
debug!("Path already exists, previous value {:?}", got);
// This situation means that we've come to the same
// result via different build recipes.
/*
if dest != got {
return Err(Error::WrongResultSymlink {
expected: dest,
got,
});
}
*/
} else {
info!("found, no patching: {:?}", dest);
let mut output_hasher = blake3::Hasher::new();
let blakesums = dest.join("blake3sums");
let file = match tokio::fs::File::open(&blakesums).await {
Ok(file) => file,
Err(e) => {
error!(
"Error {:?} {:?}: {:?}",
blakesums, vertices[v].downloaded.path, e
);
return Err(e.into());
// Maintenant, faire remonter les deps.
if let Some(last) = stack.iter_mut().rev().filter(|x| x.rx.is_some()).next() {
for ld in self.ld_path.iter() {
if last.ld_path_h.insert(ld.clone()) {
last.ld_path.push(ld.clone());
}
}
for dep in self.transitive_deps.iter() {
if last.transitive_deps_h.insert(dep.clone()) {
debug!(
"adding transitive dep: {:?} -> {:?}",
last.package.package, dep
);
last.transitive_deps.push(dep.clone());
}
}
for f in self.files.iter() {
last.files.insert(f.clone());
}
last.deps.push(final_path.clone());
let r =
tokio::io::BufReader::new(tokio::fs::File::open(&dest.join("paths")).await.unwrap());
let mut l = r.lines();
while let Some(l) = l.next_line().await? {
vertices[v].files.insert(Arc::new(l));
async fn spawn_extract(&mut self, client: Client) {
let client = client.clone();
let url = client.url(self.package.file_name.as_deref());
let sha256 = self.package.sha256.unwrap().to_string();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let permit = client.download_sem.clone().acquire_owned().await.unwrap();
info!("downloading {:?}", url);
let (mut task, _) = match client.download_url(&url, &sha256).await {
Ok(x) => x,
Err(e) => {
tx.send(Err(e)).unwrap_or(());
return Ok(());
}
};
let is_extracted = std::fs::metadata(&task.path.with_extension("")).is_ok();
info!("finished downloading {:?}", url);
if !is_extracted {
// Sets extension
if let Err(e) = extract_task(&client, &mut task).await {
info!("finished extracting {:?} {:?}", url, e);
tx.send(Err(e)).unwrap_or(());
} else {
info!("finished extracting {:?}, Ok", url);
tx.send(Ok(task)).unwrap_or(());
}
info!("sent {:?}", url);
} else {
task.path.set_extension("");
tx.send(Ok(task)).unwrap_or(());
}
drop(permit);
Ok::<_, Error>(())
});
self.rx = Some(rx);
}
client.store_path.join(&format!(
"{}-{}",
data_encoding::HEXLOWER.encode(output_hasher.finalize().as_bytes()),
base_package_name,
))
};
let final_path = Arc::new(final_path);
async fn push_deps(self, index: &'a [deb::Index], stack: &mut Vec<StackElt<'a>>) {
let depends = self.package.depends.clone();
stack.push(self);
for dep in depends.iter() {
match dep {
deb::Dep::Simple(s) => {
debug!("dep {:?}", s);
let Some(dep) = multi_lookup(index, &s.name).await else {
panic!("could not find {:?}", s.name)
};
stack.push(StackElt {
package: dep,
rx: None,
deps: Vec::new(),
transitive_deps: Vec::new(),
transitive_deps_h: BTreeSet::new(),
ld_path: Vec::new(),
ld_path_h: BTreeSet::new(),
files: BTreeSet::new(),
})
}
deb::Dep::Alternatives { alt } => {
debug!("alt {:?}", alt);
let stack_len = stack.len();
for dep in alt {
if let Some(dep_) = multi_lookup(index, &dep.name).await {
stack.push(StackElt {
package: dep_,
rx: None,
deps: Vec::new(),
transitive_deps: Vec::new(),
transitive_deps_h: BTreeSet::new(),
ld_path: Vec::new(),
ld_path_h: BTreeSet::new(),
files: BTreeSet::new(),
});
break;
}
}
if stack.len() == stack_len {
panic!("Not found: {:?}", alt);
}
}
add_subst(vertices[v].downloaded.path.clone(), &dest, files.clone()).await?;
// Replace prefix for all library deps we've just added,
// in order to get a Merkle tree.
for dep in &mut vertices[v].transitive_deps[initial_deps_len..] {
let end = dep.strip_prefix(&*dest).unwrap();
*dep = Arc::new(final_path.join(&end));
}
info!("symlink {:?} {:?}", vertices[v].downloaded.path, final_path);
match std::os::unix::fs::symlink(&*dest, &*final_path) {
Ok(()) => (),
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
let got = std::fs::read_link(&*final_path)?;
debug!("Path already exists, previous value {:?}", got);
// This situation means that we've come to the same
// result via different build recipes.
/*
if dest != got {
return Err(Error::WrongResultSymlink {
expected: dest,
got,
});
#[derive(Debug)]
struct StackElt<'a> {
package: deb::Stanza<'a>,
rx: Option<tokio::sync::oneshot::Receiver<Result<Downloaded, Error>>>,
deps: Vec<Arc<PathBuf>>,
transitive_deps: Vec<Arc<PathBuf>>,
transitive_deps_h: BTreeSet<Arc<PathBuf>>,
ld_path: Vec<Arc<PathBuf>>,
ld_path_h: BTreeSet<Arc<PathBuf>>,
files: BTreeSet<Arc<String>>,
async fn push_deps<'a>(
depends: &[deb::Dep<'a>],
index: &'a [deb::Index],
stack: &mut Vec<(
deb::Stanza<'a>,
Option<tokio::sync::oneshot::Receiver<Result<Downloaded, Error>>>,
)>,
) -> Vec<&'a str> {
let mut d = Vec::new();
for dep in depends.iter() {
match dep {
deb::Dep::Simple(s) => {
debug!("dep {:?}", s);
let Some(dep) = multi_lookup(index, &s.name).await else {
panic!("could not find {:?}", s.name)
};
d.push(dep.package);
stack.push((dep, None))
}
deb::Dep::Alternatives { alt } => {
debug!("alt {:?}", alt);
let stack_len = stack.len();
for dep in alt {
if let Some(dep_) = multi_lookup(index, &dep.name).await {
d.push(dep_.package);
stack.push((dep_, None));
break;
}
}
if stack.len() == stack_len {
panic!("Not found: {:?}", alt);
}
}
}
}
d
Ok(Ok(())) => std::process::exit(0),
Ok(Err(Error::BuildReturn { status })) => std::process::exit(status),
Ok(Ok(())) => {
debug!("inner process ok");
std::process::exit(0)
}
Ok(Err(Error::BuildReturn { status })) => {
debug!("inner process {:?}", status);
std::process::exit(status)
}