OK4C3RQCIRKVBU4CQ2OOVBT4EUMSSTU4BIZQA6TUVFS7AGKEZI2AC
IJS5OIDK4YRQVHTN2MZWP3SFLS7CJMIMCDMWZBLDV5PDNMEU7MYAC
6RFPVLVLGAC62JWVZQF3CNTP74YRNDGDWXFCHKYFIYUNCCF7CGFQC
HHALDA72CDQWZLBHY57FDDKJFN76KJJJWF7C7IU2DNR4AHJKNVEQC
FAXOU7MRT62Y2SBC5PWCPLXF6KIRZOEHAIGBQU6NNL36D6MOOKEAC
B7GL4BVL42PAIVIBIQF5TFEMWFMO3LDVS2LIPFTLFSYG6JVJEDNAC
B3H475WM3JE532SL7IGJIQBRXWHNDTHP2LH5IL67N46Z6QM75SFAC
YALSRCUNFJB6222FZKFTHLOMQX3W4E5YS26Y652T4DPVOWKVKT4QC
while let Some(next_url) = pending_rx.recv().await {
let cl = cl.clone();
let confirmed = confirmed.clone();
let pending_tx = pending_tx.clone();
let _: JoinHandle<anyhow::Result<()>> = tokio::task::spawn(async move {
required_sleep().await;
let done = Arc::new(tokio::sync::Notify::new());
loop {
select! {
_ = done.notified() => {
break;
}
Some(next_url) = pending_rx.recv() => {
all_tasks.fetch_sub(1, Ordering::SeqCst);
if confirmed.read().contains_key(&next_url) {continue; } // already saw it, dupe in the crawling pipeline
let cl = cl.clone();
let confirmed = confirmed.clone();
let pending_tx = pending_tx.clone();
let all_tasks = all_tasks.clone();
let done = done.clone();
let _: JoinHandle<anyhow::Result<_>> = tokio::task::spawn(async move {
required_sleep().await;
let pg = cl.get(next_url.clone()).send().await?;
let final_url = pg.url().clone();
let data = pg.bytes().await?;
let pg = cl.get(next_url.clone()).send().await?;
let final_url = pg.url().clone();
let mut pending = HashSet::new();
if let Ok(utf8) = std::str::from_utf8(&data) {
if let Err(e) = find_new_links(utf8, &confirmed.read(), &mut pending) {
warn!("error finding links on {}: {}", final_url, e);
}
}
let mut pending = HashSet::new();
if let Ok(utf8) = std::str::from_utf8(&data) {
if let Err(e) = find_new_links(utf8, &confirmed.read(), &mut pending) {
warn!("error finding links on {}: {}", final_url, e);
}
let num_seen_here = pending.len() as isize;
let prev_remaining = all_tasks.fetch_add(num_seen_here, Ordering::SeqCst);
pending
.drain()
.for_each(|pnd| drop(pending_tx.send(pnd.clone()).map_err(|e| println!("couldn't send ): {} ", e))));
let num_confirmed = confirmed.read().len();
println!(
"finished {}/{}: {}",
num_confirmed,
num_confirmed as isize + all_tasks.load(Ordering::SeqCst),
next_url
);
if num_seen_here == 0 && prev_remaining == 0 {
done.notify_one();
println!("finished crawling site!")
}
Ok(())
});
},
_ = stop.recv() => {
break;
pending
.drain()
.for_each(|pnd| pending_tx.send(pnd.clone()).expect("couldn't send ):"));
confirmed
.write()
.insert(next_url.clone(), PageCandidate { final_url, data });
println!(
"finished {}/{}: {}",
confirmed.read().len(),
confirmed.read().len() + pending.len(),
next_url
);
Ok(())
});
if stop.try_recv().is_ok() {
break;
}
async fn phase<T, U>(fname: &str, func: impl FnOnce(T) -> anyhow::Result<U>) -> anyhow::Result<T>
where
T: for<'de> Deserialize<'de> + Serialize + Default,
U: Future<Output = anyhow::Result<T>>,
{
std::fs::copy(fname, format!("{}.bak", fname))?;
let f = std::fs::File::open(fname).map_err(|e| anyhow!(e));
let rehydrated = f
.and_then(|f| {
bincode::deserialize_from(brotli::CompressorReader::new(
std::io::BufReader::new(f),
4096 * 64,
8,
22,
))
.map_err(|e| anyhow!(e))
})
.unwrap_or(T::default());
let res = func(rehydrated)?.await?;
bincode::serialize_into(
brotli::CompressorWriter::new(
std::io::BufWriter::new(
std::fs::File::create(fname).expect("couldn't create phase output file ):"),
),
4096 * 64,
8,
22,
),
&res,
)?;
Ok(res)
let mut cl = reqwest::Client::new();
fn phase<T, U>(fname: &str, func: impl FnOnce(&mut T) -> anyhow::Result<U>) -> anyhow::Result<U>
where
T: for<'de> Deserialize<'de> + Serialize + Default,
{
let f = std::fs::File::open(fname).map_err(|e| anyhow!(e));
let mut rehydrated = f
.and_then(|f| {
bincode::deserialize_from(std::io::BufReader::new(f)).map_err(|e| anyhow!(e))
})
.unwrap_or(T::default());
let cl = reqwest::Client::new();
}
// load prev state from blob
let confirmed = {
let f = std::fs::File::open("phase1.blob").map_err(|e| anyhow!(e));
f.and_then(|f| {
bincode::deserialize_from(std::io::BufReader::new(f)).map_err(|e| anyhow!(e))
})
}
.unwrap_or(HashMap::new());
let confirmed = Arc::new(RwLock::new(confirmed));
let r = all_urls_on_site_everywhere(stop, Arc::new(cl), confirmed.clone()).await;
match r {
Err(e) => error!("big sad: {}", e),
Ok(_) => {}
bincode::serialize_into(
brotli::CompressorWriter::new(
std::io::BufWriter::new(std::fs::File::create("phase1.blob.brotli").expect("sad")),
4096 * 64,
8,
22,
),
&*confirmed.read(),
)?;
phase("phase1.blob.brotli", |confirmed| {
let confirmed = Arc::new(RwLock::new(confirmed));
let r = all_urls_on_site_everywhere(stop, Arc::new(cl), confirmed.clone());
Ok(r.map(move |r| match r {
Err(e) => Err(anyhow!("big sad: {}", e)),
Ok(v) => Ok(std::mem::replace(&mut *confirmed.write(), HashMap::new())),
}))
})
.await?;
let sel = Selector::parse("header.entry-header > h2 > img + img")
.expect("fix the phase2 selector");
let mut outlets: HashMap<Url, MediaOutlet> = {
let f = std::fs::File::open("phase2.blob").map_err(|e| anyhow!(e));
f.and_then(|f| {
bincode::deserialize_from(std::io::BufReader::new(f)).map_err(|e| anyhow!(e))
})?
};
phase(
"phase2.blob.brotli",
|mut outlets: HashMap<Url, MediaOutlet>| {
let sel = Selector::parse("header.entry-header > h2 > img + img")
.expect("fix the phase2 selector");
for (orig_url, candidate) in all_pages.into_iter() {
if !outlets.contains_key(&orig_url) {
continue;
} // a filter would borrow outlets during the iterations
if let Some(outlet) = consider_page(&sel, &candidate)? {
println!("found outlet!");
outlets.insert(orig_url, outlet);
}
}
for (orig_url, candidate) in all_pages.into_iter() {
if outlets.contains_key(&orig_url) {
continue;
} // a filter would borrow outlets during the iterations
if let Some(outlet) = consider_page(&sel, &candidate)? {
println!("found outlet!");
outlets.insert(orig_url, outlet);
}
}
Ok(std::future::ready(Ok(outlets)))
/*
let confirmed = Arc::new(RwLock::new(confirmed));
let r = all_urls_on_site_everywhere(stop, Arc::new(cl), confirmed.clone());
r.map(move |r| match r {
Err(e) => Err(anyhow!("big sad: {}", e)),
Ok(v) => Ok(std::mem::replace(&mut *confirmed.write(), HashMap::new())),
})*/
},
)
.await?;
futures = "0.3"
]
[[package]]
name = "futures"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c70be434c505aee38639abccb918163b63158a4b4bb791b45b7023044bdc3c9c"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
name = "futures-io"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e37c1a51b037b80922864b8eed90692c5cd8abd4c71ce49b77146caa47f3253b"
[[package]]
name = "futures-macro"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f8719ca0e1f3c5e34f3efe4570ef2c0610ca6da85ae7990d472e9cbfba13664"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]