PCR2LDLTK6XVSBXJL6IMGQP7GSE5UNFCT6WRM2H5EPQ2AZWFRCFAC D4SBRY6KHEAMGUEVWSUYHNCRYXCOC5JJ3N3DQ47ZXOXQGT73VERAC X7GJK2QD5LRK3KDE2YMCNDTASB3TAK6VAHSCY3SOZBHAYDQ5KJ7QC LE34RHBBMJT2BWLU5BHZJSHNMHQWX66QQNM2MAEDWEARCWOCSYLAC IFYNLE5HO7JIZ5LJYNQ7ZQGK7C7RWPT54DFMCDHPT4CEGCZN7ACQC YJLEG4WRYKKQJMMDRKIRG4T4L7ZZ7N777EKIRSHHWHNZYWZJ7G5AC Y4NVVY56QZE3I2NEJUUFKANBL5SCCUDALSTDVZ4YOPH6AXJYQT5QC HSHYU5QM3HVRQUXY2WN4UK3CHN3RKJXU5AJVZU3SWFXYEJOANIQAC JSRKEVVPN4R7IJ53AU4OYH5HJCXPLBHCYDD7CSN74W7HACAKTJMAC O527HBLRQNATAI563XV2HWAUYIPLN3BM6CZL5B3PWFED363XAJJAC YYJ76Q7V6G7FHSNZ25MU3ZZY5KGXIODBHLWOZUGB4FIKL7PUCNAAC NRBML6UEXH5QCDVL7M4DNEU6XZ72SUKYUIPUTYHEFDOLM3Z6MKCAC W4LOGDLDBZIUUNACCXWWUZGVEGWAXYAZI3C66HYHURFQ3CWZ7XKAC FYTPHU7RYWFXSM7QQXC232ISXQOCYCTXVXO3MPDTWYSM25WQKXPQC HRAWXWFEN632GIC22J2D3WLN4ZTYOOOCHXTK4CGNAREZSIZQ4XJAC AYSCI4QTFKA32PHK3Q7XU7EM7M6WHJFWOBRLWPNK5RZ5VV2QSRVAC 43PI2ASDQFPSI4TKEELSNCYQVKSVHBZCQFP7ZQ6RIFVBQAKHLKUQC EAHHH3UTBFBRDZCYXAOCNA2B3UAXE6YNOV6Q5TEFIBA7V2IZARIQC AAJTMEJA5L5OUH6CCRNHXSE54YJS5L3T6JJ2TCC5PNPWN6AQ373QC NV5CPVRHLFYYLMY6UWQZFFH73HCBQLOVOK5UZ7WH72DVCM6JJ6JQC 4DSEIE3VK6W6666GVSGOXQCSIMQDQQEFFFZW3C7FXZUYI2U6LTIQC 4LI4UD4D7U23ZAIOKGDK637QQDM3A23OTXE6VGRRPD5TPHQZJOBQC V3VBGSPHIIPBUDFT64CT4SLBEMN7WDBYWNUZA4NAGLT3SXK6ZVYQC QEB77DYAKDVYLSN7MKZFCO4UY4EP5TUSXVH6L7LHQ2NDCBD4HHFQC 5VX32NQS4TFOXTKH3UOMTMTCUZ2IGCKO2BFBXRZAEQ3GKTTNX5SQC if ('Chunk' in m) {console.log(m);out[m.Chunk.channel].push(ansi_up.ansi_to_html(m.Chunk.content));} else if ('Status' in m) {console.log(m);ended = m.Status.ended;status = m.Status.status;
if (typeof m == 'object') {if (m?.Chunk) {console.log(m);out[m.Chunk.channel].push(ansi_up.ansi_to_html(m.Chunk.content));if (m.Chunk.channel == 0 && out[1].reduce((a, b) => a + b.length, 0)) {document.getElementById('stdout-bottom')?.scrollIntoView();} else {document.getElementById('stderr-bottom')?.scrollIntoView();}} else if (m?.Status) {console.log(m);ended = m.Status.ended;status = m.Status.status;}
let jobs = $derived(data.jobs);let socket: WebSocket | undefined = undefined;function websocket() {socket = new WebSocket(`${server}/api/job/${params.user}/${params.repo}/ws`);socket.onmessage = (event) => {const j = JSON.parse(event.data);if (typeof j == 'object') {if (j?.jobs) {jobs = j.jobs;}}};socket.onclose = () => {socket = undefined;};}onMount(() => {setInterval(() => {if (!socket) websocket();}, 1000);});
</td><td><div class="flex flex-wrap gap-3"><form action="/api/job/{params.user}/{params.repo}/{job.id}/retry" method="post"><button class="btn btn-primary btn-sm" disabled={!job.ended}>Retry</button></form><form action="/api/job/{params.user}/{params.repo}/{job.id}/kill" method="post"><button class="btn btn-error btn-sm" disabled={job.ended}>Kill</button></form></div>
futures = "*"tokio = { version = "1.48", features = [ "net", "time", "rt-multi-thread", "macros", "io-util", "fs", "sync", "signal" ] }memmap = "*"log = "*"env_logger = "*"serde_derive = "*"serde = "*"rand = "0.9"uuid = { version = "*", features = [ "v4", "serde" ] }toml = "*"postgres-types = { version = "*", features = [ "derive" ] }tokio-postgres = { version = "*", features = [ "with-uuid-1", "with-chrono-0_4" ] }postgres-openssl = "*"libpijul = { version = "1.0.0-beta.10", features = [ "tarball" ] }clap = "~4.5.53"regex = "*"lazy_static = "*"libc = "*"bytes = "1.11"deadpool-postgres = "*"bincode = "*"thiserror = "*"webpki = "*"openssl = "*"tokio-openssl = "*"byteorder = "*"indexmap = "*"
futures.workspace = truetokio.workspace = trueserde_derive.workspace = trueuuid.workspace = truepijul-core.workspace = truethiserror.workspace = trueserde.workspace = true
ALTER TABLE jobs ADD COLUMN repo_state TEXT, ADD COLUMN channel TEXT NOT NULL DEFAULT 'main', ADD COLUMN script TEXT;
ALTER TABLE jobs DROP COLUMN repo_state, DROP COLUMN channel, DROP COLUMN script;
ALTER TABLE repositories ALTER COLUMN creation_ip DROP NOT NULL;ALTER TABLE repositories ALTER COLUMN creation_ip TYPE inet USING creation_ip::inet;-- IF EXISTS: this migration isn't really reversible because of unique-- constraints. This is due to constraints on the old Nest.ALTER TABLE repositories DROP CONSTRAINT IF EXISTS repositories_owner_name_key;CREATE UNIQUE INDEX IF NOT EXISTS repositories_owner_name ON repositories (owner, name) WHERE is_active;ALTER TABLE users DROP CONSTRAINT IF EXISTS users_email_key;ALTER TABLE users DROP CONSTRAINT IF EXISTS users_login_key;CREATE UNIQUE INDEX IF NOT EXISTS users_login ON users (login) WHERE is_active;DROP TABLE old_logins;ALTER TABLE publickeys ALTER COLUMN bin DROP NOT NULL;
ALTER TABLE repositories ALTER COLUMN creation_ip TYPE text USING creation_ip::text;UPDATE repositories SET creation_ip = '0.0.0.0' WHERE creation_ip IS NULL;ALTER TABLE repositories ALTER COLUMN creation_ip SET NOT NULL;-- DROP INDEX repositories_owner_name;-- ALTER TABLE repositories ADD UNIQUE (owner, name);-- ALTER TABLE users ADD UNIQUE (email);-- DROP INDEX users_login;-- ALTER TABLE users ADD UNIQUE (login);CREATE TABLE old_logins (login citext NOT NULL primary key,user_id uuid,retired timestamp with time zone DEFAULT now());ALTER TABLE publickeys ALTER COLUMN bin SET NOT NULL;
use std::io::Read;use std::net::{Ipv6Addr, SocketAddr, ToSocketAddrs};use std::path::{Path, PathBuf};use std::sync::{Arc, Mutex};use std::time::{Duration, SystemTime};
// use std::io::Read;use axum_extra::TypedHeader;use axum_extra::headers::Range;use ci::*;use pijul_core::MutTxnT;use std::collections::HashMap;use std::net::{Ipv6Addr, ToSocketAddrs};use std::path::PathBuf;use std::process::Stdio;use std::sync::Arc;use tempfile;use thiserror::*;use tokio::fs::OpenOptions;
timeout_secs: usize,server_public_keys: Vec<String>,log_path: String,tarball_path: String,}#[derive(Serialize, Deserialize)]struct BuildResult {finished: chrono::DateTime<chrono::Utc>,status: Option<i32>,link: Option<PathBuf>,job: ci::Job,
nest_url: String,repo_path: PathBuf,ci_path: PathBuf,
let key = Arc::new(thrussh_keys::load_secret_key(&conf.key_path, None).unwrap());let (sender, mut receiver) = tokio::sync::mpsc::channel(1);let client = CiClient {process: Arc::new(Mutex::new(Process::default())),log_path: Path::new(&conf.log_path).to_path_buf(),tarball_path: Path::new(&conf.tarball_path).to_path_buf(),last_window_adjustment: SystemTime::now(),server_public_keys: Arc::new(conf.server_public_keys.iter().map(|p| thrussh_keys::parse_public_key_base64(p).unwrap()).collect(),),sender,
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();let config = Config {config: Arc::new(conf),jobs: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
loop {if let Err(e) = client.protocol(&addr, config.clone(), key.clone(), &mut receiver).await{error!("restarting because of error: {:?}", e)}tokio::time::sleep(std::time::Duration::from_secs(1)).await;}
let app = Router::new().route("/trigger", post(trigger)).route("/stdout/{job}", get(stdout)).route("/stderr/{job}", get(stderr)).route("/status/{job}", get(status)).with_state(config.clone());axum::serve(listener, app).await.unwrap();
#[derive(Clone, Debug)]pub struct CiClient {process: Arc<Mutex<Process>>,tarball_path: PathBuf,log_path: PathBuf,last_window_adjustment: SystemTime,server_public_keys: Arc<Vec<thrussh_keys::key::PublicKey>>,sender: tokio::sync::mpsc::Sender<(ci::Job, Option<i32>, Option<PathBuf>)>,
async fn status(State(config): State<Config>,Path(job): Path<uuid::Uuid>,) -> Result<Json<Status>, Error> {let s =tokio::fs::read_to_string(&config.config.ci_path.join(&format!("{}.status", job))).await?;Ok(Json(serde_json::from_str(&s)?))
impl Process {fn is_ready(&self) -> bool {self.child.is_none() && self.job.is_none()
async fn stdout(State(config): State<Config>,Path(job): Path<uuid::Uuid>,range: Option<TypedHeader<Range>>,) -> Response {debug!("stdout {:?} {:?}", job, range);let Ok(mut file) =tokio::fs::File::open(&config.config.ci_path.join(&format!("{}.stdout", job))).awaitelse {return StatusCode::NOT_FOUND.into_response();};if let Some(TypedHeader(range)) = range {let body = KnownSize::file(file).await.unwrap();Ranged::new(Some(range), body).into_response()} else {use tokio::io::AsyncReadExt;let mut v = Vec::new();file.read_to_end(&mut v).await.unwrap();v.into_response()
impl CiClient {pub async fn protocol(&self,addr: &SocketAddr,config: Arc<thrussh::client::Config>,key: Arc<thrussh_keys::key::KeyPair>,receiver: &mut tokio::sync::mpsc::Receiver<(ci::Job, Option<i32>, Option<PathBuf>)>,) -> Result<(), anyhow::Error> {let mut h = thrussh::client::connect(config, &addr, self.clone()).await?;debug!("Opening session");if !h.authenticate_publickey("ci", key).await? {return Ok(());}let mut channel = h.channel_open_session().await?;channel.data(&bincode::serialize(&Message::Handshake {version: ci::VERSION,id: 0,}).unwrap()[..],).await?;debug!("handshake done");'outer: loop {if self.process.lock().unwrap().is_ready() {channel.data(&bincode::serialize(&Message::Ready).unwrap()[..]).await?;debug!("ready");}loop {tokio::select! {msg = channel.wait() => {debug!("msg = {:?}", msg);if !self.handle_msg(&mut channel, &self.sender, msg).await? {break 'outer}}msg = receiver.recv() => {debug!("message {:#?}", msg);if let Some(p) = self.process.lock().unwrap().child.take() {p.await??}if let Some((job, exit_status, path)) = msg {self.send_log(&mut channel, job, exit_status, path).await?}channel.data(&bincode::serialize(&Message::Ready).unwrap()[..]).await?;}}}}Ok(())
async fn stderr(State(config): State<Config>,Path(job): Path<uuid::Uuid>,range: Option<TypedHeader<Range>>,) -> Response {debug!("stderr {:?} {:?}", job, range);let Ok(mut file) =tokio::fs::File::open(&config.config.ci_path.join(&format!("{}.stderr", job))).awaitelse {return StatusCode::NOT_FOUND.into_response();};if let Some(TypedHeader(range)) = range {let body = KnownSize::file(file).await.unwrap();Ranged::new(Some(range), body).into_response()} else {use tokio::io::AsyncReadExt;let mut v = Vec::new();file.read_to_end(&mut v).await.unwrap();v.into_response()
async fn handle_msg(&self,channel: &mut thrussh::client::Channel,sender: &tokio::sync::mpsc::Sender<(ci::Job, Option<i32>, Option<PathBuf>)>,msg: Option<thrussh::ChannelMsg>,) -> Result<bool, anyhow::Error> {match msg {Some(thrussh::ChannelMsg::Data { data }) => {let mut proc = self.process.lock().unwrap();if let Some((mut f, mut len)) = proc.tarball.take() {debug!("len = {:?}", len);use std::io::Write;f.write_all(&data)?;len -= data.len();if len > 0 {proc.tarball = Some((f, len));}return Ok(true);}let msg = bincode::deserialize::<Message>(&data);debug!("msg = {:?}", msg);match msg {Ok(Message::Job(job)) => {self.handle_job(channel, sender.clone(), &mut proc, job).await?}Ok(Message::Chunk { id, len, .. }) => {let p = self.tarball_path.join(&format!("{}.tar.gz.tmp", id));if len == 0 {let p2 = self.tarball_path.join(&format!("{}.tar.gz", id));std::fs::rename(&p, &p2)?;proc.tarball = None;let job = proc.job.take().unwrap();self.handle_job(channel, sender.clone(), &mut proc, job).await?;return Ok(true);}let file = std::fs::OpenOptions::new().write(true).create(true).append(true).open(&p).unwrap();proc.tarball = Some((file, len as usize));}Ok(msg) => {debug!("msg = {:?}", msg);}_ => return Ok(false),}}None => return Ok(false),msg => debug!("{:?}", msg),
#[derive(Error, Debug)]pub enum Error {#[error("Lock")]Lock,#[error("Forbidden")]Forbidden,#[error("Not found")]NotFound,#[error(transparent)]Output(#[from]pijul_core::output::OutputError<<C as pijul_core::changestore::ChangeStore>::Error,pijul_core::pristine::sanakirja::MutTxn0,std::io::Error,>,),#[error(transparent)]Reqwest(#[from] reqwest::Error),#[error(transparent)]Sanakirja(#[from] pijul_core::pristine::sanakirja::SanakirjaError),#[error(transparent)]IO(#[from] std::io::Error),#[error(transparent)]Json(#[from] serde_json::Error),}type C = pijul_core::changestore::filesystem::FileSystem;impl IntoResponse for Error {fn into_response(self) -> Response {debug!("response {:?}", self);match self {Error::Forbidden => (StatusCode::FORBIDDEN, "{}").into_response(),Error::NotFound => (StatusCode::NOT_FOUND, "{}").into_response(),_ => (StatusCode::INTERNAL_SERVER_ERROR, "{}").into_response(),
async fn handle_job(&self,channel: &mut thrussh::client::Channel,sender: tokio::sync::mpsc::Sender<(ci::Job, Option<i32>, Option<PathBuf>)>,process: &mut Process,job: ci::Job,) -> Result<(), anyhow::Error> {let p = self.tarball_path.join(&format!("{}.tar.gz", job.id));debug!("p = {:?}", p);if std::fs::metadata(&p).is_err() {debug!("getting tarball");channel.data(&bincode::serialize(&Message::GetTarball { id: job.id }).unwrap()[..]).await?;process.job = Some(job);return Ok(());}debug!("tar = {:?}", p);let status = std::process::Command::new("tar").args(&["-xf", p.to_str().unwrap()]).current_dir(&self.tarball_path).status().unwrap();debug!("nix: {:?}", status);
// #[derive(Debug, Clone)]// struct RepoPath {// path: PathBuf,// remove_dir: bool,// remove_dot: bool,// }
let tarballp = self.tarball_path.join(job.id.to_string());let logp = self.log_path.clone();
#[axum::debug_handler]pub async fn trigger(State(config): State<Config>,Json(t): Json<Trigger>,) -> Result<StatusCode, Error> {tokio::spawn(async move {info!("trigger: {:?}", t);let t0 = std::time::Instant::now();let mut remote = pijul_remote::RemoteRepo::Http(pijul_remote::http::Http {url: format!("{}/{}/{}", config.config.nest_url, t.owner, t.repo).parse().unwrap(),channel: "main".to_string(),client: reqwest::ClientBuilder::new().build()?,headers: vec![],name: t.state.clone(),});
let result_path = logp.join(&format!("{}.result", job.id));if let Ok(mut f) = std::fs::File::open(&result_path) {if let Ok(build_result) = serde_json::from_reader::<_, BuildResult>(&mut f) {sender.send((build_result.job, build_result.status, build_result.link)).await?;return Ok(());}}
let path = tempfile::tempdir().unwrap();let pijul_config = pijul_config::Config::load(None, Vec::new()).unwrap();let repo = tokio::sync::Mutex::new(pijul_repository::Repository::init(&pijul_config, Some(&path.path()), None, None).unwrap(),);{let mut repo = repo.lock().await;let txn = repo.pristine.arc_txn_begin()?;let mut channel = txn.write().open_or_create_channel("main")?;
debug!("p = {:?}", tarballp);process.child = Some(tokio::task::spawn(async move {let mut process = tokio::process::Command::new("nix-build").arg("default.nix").current_dir(&tarballp).stdin(std::process::Stdio::null()).stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped()).spawn()
let h = t.state.parse().unwrap();remote.clone_state(&mut repo, &txn, &mut channel, h).await
let stdout = process.stdout.as_mut().unwrap();let stderr = process.stderr.as_mut().unwrap();let mut fstdout =tokio::fs::File::create(logp.join(&format!("{}.stdout", job.id))).await?;let mut fstderr =tokio::fs::File::create(logp.join(&format!("{}.stderr", job.id))).await?;let (a, b) = futures::future::join(tokio::io::copy(stdout, &mut fstdout),tokio::io::copy(stderr, &mut fstderr),).await;a?;b?;let status = process.wait().await?;debug!("status = {:?}", status);
let mut result_file = std::fs::File::create(&result_path)?;let link = std::fs::read_link(&tarballp.join("result")).ok();serde_json::to_writer(&mut result_file,&BuildResult {finished: chrono::Utc::now(),status: status.code(),job: job.clone(),link: link.clone(),},
pijul_core::output::output_repository_no_pending(&repo.working_copy,&repo.changes,&txn,&channel,"",true,None,1, // std::thread::available_parallelism()?.get(),0,
async fn send_log(&self,channel: &mut thrussh::client::Channel,job: ci::Job,exit_status: Option<i32>,path: Option<PathBuf>,) -> Result<(), anyhow::Error> {let id = job.id;let msg = Message::Log {job,exit_status,
let (status_tx, status_rx) = tokio::sync::watch::channel(None);let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();if let Some((k, _)) = config.jobs.lock().unwrap().insert(t.id, (kill_tx, status_rx)){k.send(()).unwrap_or(());}let result = build(Some(&config.config.ci_path),
let mut buf = Vec::with_capacity(4096);debug!("stdout: {:?}",self.log_path.join(&format!("{}.stdout", id)));if let Ok(ref mut stdout) =std::fs::File::open(&self.log_path.join(&format!("{}.stdout", id))){let len = channel.writable_packet_size().min(MAX_BUF_SIZE);buf.resize(len, 0);while let Ok(n) = stdout.read(&mut buf) {if n == 0 {channel.data(&bincode::serialize(&Message::Chunk {id,stderr: false,len: 0,}).unwrap()[..],).await?;break;}channel.data(&bincode::serialize(&Message::Chunk {id,stderr: false,len: n as u32,}).unwrap()[..],).await?;channel.data(&buf[..n]).await?}}if let Ok(ref mut stdout) =std::fs::File::open(&self.log_path.join(&format!("{}.stderr", id))){let len = channel.writable_packet_size().min(MAX_BUF_SIZE);buf.resize(len, 0);while let Ok(n) = stdout.read(&mut buf) {if n == 0 {channel.data(&bincode::serialize(&Message::Chunk {id,stderr: true,len: 0,}).unwrap()[..],).await?;break;}channel.data(&bincode::serialize(&Message::Chunk {id,stderr: true,len: n as u32,}).unwrap()[..],).await?;channel.data(&buf[..n]).await?}}Ok(())}
let (code, results) = result?;debug!("code = {:?} {:?}", code, results);tokio::fs::write(config.config.ci_path.join(&format!("{}.status", t.id)),serde_json::to_string(&Status {code,results,finished: chrono::Utc::now(),}).unwrap(),).await?;Ok::<(), Error>(())});Ok(StatusCode::OK)
const MAX_BUF_SIZE: usize = 1 << 16;
async fn build(ci: Option<&std::path::Path>,path: tempfile::TempDir,id: uuid::Uuid,targets: &HashMap<String, String>,mut kill_rx: tokio::sync::oneshot::Receiver<()>,mut status_tx: tokio::sync::watch::Sender<Option<i32>>,) -> Result<(Option<i32>, HashMap<String, PathBuf>), Error> {let mut files = if let Some(ref ci_path) = ci {tokio::fs::create_dir_all(ci_path).await?;Some((OpenOptions::new().append(true).create(true).open(&ci_path.join(&format!("{}.stdout", id))).await.unwrap(),OpenOptions::new().append(true).create(true).open(&ci_path.join(&format!("{}.stderr", id))).await.unwrap(),))} else {None};
fn check_server_key(self,server_public_key: &thrussh_keys::key::PublicKey,) -> impl futures::Future<Output = Result<(Self, bool), Self::Error>> {let valid = self.server_public_keys.iter().any(|p| p == server_public_key);futures::future::ready(Ok((self, valid)))
cmd.arg("build").arg("-L").arg("--log-format").arg("raw");for (_, target) in targets {cmd.arg(target);
fn adjust_window(&mut self, _channel: thrussh::ChannelId, target: u32) -> u32 {let elapsed = self.last_window_adjustment.elapsed().unwrap();self.last_window_adjustment = SystemTime::now();if target >= 10_000_000 {return target;}if elapsed < Duration::from_secs(2) {target * 2} else if elapsed > Duration::from_secs(8) {target / 2} else {target
if code == Some(0) {let mut cmd = tokio::process::Command::new("attic");cmd.arg("push").arg("coturnix");let mut result = HashMap::new();for (n, (t, _)) in targets.iter().enumerate() {if n == 0 {result.insert(t.clone(),tokio::fs::read_link(path.path().join("result")).await.unwrap(),);cmd.arg("result");} else {result.insert(t.clone(),tokio::fs::read_link(path.path().join(format!("result-{n}"))).await.unwrap(),);cmd.arg(format!("result-{n}"));}
#[derive(Debug, Serialize, Deserialize)]pub enum Message {Handshake {version: u16,id: i64,},Ready,Job(Job),GetTarball {id: uuid::Uuid,},InvalidInput {input: String,},Log {job: Job,exit_status: Option<i32>,path: Option<std::path::PathBuf>,},Chunk {id: uuid::Uuid,stderr: bool,len: u32,},
pub async fn dump_cmd(mut cmd: tokio::process::Child,files: &mut Option<(tokio::fs::File, tokio::fs::File)>,mut kill_rx: &mut tokio::sync::oneshot::Receiver<()>,status_tx: &mut tokio::sync::watch::Sender<Option<i32>>,) -> Result<Option<i32>, std::io::Error> {let mut stdout_ok = true;let mut stderr_ok = true;let mut buf_stdout = String::new();let mut last_stdout = std::time::UNIX_EPOCH;let mut buf_stderr = String::new();let mut last_stderr = std::time::UNIX_EPOCH;let bound = std::time::Duration::from_secs(1);use tokio::io::AsyncBufReadExt;let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());let mut stdout = stdout.lines();let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());let mut stderr = stderr.lines();while stdout_ok || stderr_ok {debug!("stdout || stderr {:?} {:?}",buf_stdout.len(),buf_stderr.len());tokio::select! {line = stdout.next_line(), if stdout_ok => {debug!("out {:?}", line);let n = if let Some(line) = line? {buf_stdout.push_str(&line);buf_stdout.push('\n');line.len() + 1} else {0};if last_stdout.elapsed().unwrap() >= bound || n == 0 {debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());if let Some((ref mut stdout, _)) = files {stdout.write_all(buf_stdout.as_bytes()).await?;stdout.flush().await?;}buf_stdout.clear();debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());last_stdout = std::time::SystemTime::now();}if n == 0 {stdout_ok = false}}line = stderr.next_line(), if stderr_ok => {debug!("err {:?}", line);let n = if let Some(line) = line? {buf_stderr.push_str(&line);buf_stderr.push('\n');line.len() + 1} else {0};if last_stderr.elapsed().unwrap() >= bound || n == 0 {debug!("sending stderr to db {:?}", buf_stderr.len());if let Some((_, ref mut stderr)) = files {stderr.write_all(buf_stderr.as_bytes()).await?;stderr.flush().await?;}buf_stderr.clear();last_stderr = std::time::SystemTime::now();}debug!("{:?}", buf_stderr.len());if n == 0 {stderr_ok = false}}_ = &mut kill_rx => {cmd.kill().await?;break}}}tokio::select! {s = cmd.wait() => {let code = s?.code();status_tx.send(code).unwrap();Ok(code)}_ = &mut kill_rx => {cmd.kill().await?;Ok(None)}}
anyhow = "1.0.100"bincode = "1.0"chrono = "0.4.42"clap = { version = "4.5.53", features = ["derive"] }env_logger = "0.11.8"futures = "0.3.31"log = "0.4.28"serde = "1.0.228"serde_derive = "1.0.228"serde_json = "1.0.145"thrussh = "0.40"thrussh-keys = "0.23"
anyhow.workspace = truebincode.workspace = truechrono.workspace = trueclap.workspace = trueenv_logger.workspace = truefutures.workspace = truelog.workspace = trueserde.workspace = trueserde_derive.workspace = trueserde_json.workspace = truethrussh.workspace = truethrussh-keys.workspace = true
tokio = { version = "1.48", features = [ "process", "fs" ] }toml = { version = "0.9.8" }uuid = { version = "1.21.0", features = ["serde" ] }tracing = "0.1.41"tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
tokio.workspace = truetoml.workspace = trueuuid.workspace = truetracing.workspace = truetracing-subscriber.workspace = trueaxum.workspace = truethiserror.workspace = truepijul-remote.workspace = truereqwest.workspace = truepijul-repository.workspace = truepijul-config.workspace = truesanakirja.workspace = truepijul-interaction.workspace = truetempfile.workspace = truediesel.workspace = truediesel-async.workspace = trueaxum-range.workspace = trueaxum-extra = { workspace = true, features = ["typed-header"] }
use crate::repository::{ChannelSpec, channel_spec};use crate::{Config, Error};use axum::{body::Body,extract::{Json, Path, Request, State},http::StatusCode,response::{IntoResponse, Response},};use futures::TryStreamExt;use pijul_core::{Base32, ChannelTxnT, DepsTxnT, GraphTxnT, TxnT, TxnTExt};use serde::*;use std::collections::HashSet;use tokio_util::io::ReaderStream;use tracing::*;#[derive(Debug, Deserialize)]pub struct TreePath {pub owner: String,pub repo: String,}// 262kb at once.const CHANGE_BUF_SIZE: usize = 1 << 18;#[derive(Debug)]enum Command {Changelist {from: u64,req_paths: Vec<String>,},State {at: Option<u64>,},Change {hash: pijul_core::pristine::Hash,full: bool,tag: bool,},Identities {from: Option<u64>,},Id,}#[axum::debug_handler]pub async fn dot_pijul(State(config): State<Config>,Path(t): Path<TreePath>,req: Request,) -> Result<Response, Error> {let mut db = config.db.get().await?;let (rid, uid, _) =super::repository_id(&mut db, &t.owner, &t.repo, None, super::Perm::READ).await?;let repo_id = crate::repository::RepositoryId {owner_id: uid,repo_id: rid,fork_origin: None,};debug!("repo = {:?}", repo_id);let mut command = None;let mut channel = ChannelSpec::Channel("main".to_string());if let Some(q) = req.uri().query() {let mut req_paths_ = Vec::new();for (k, v) in url::form_urlencoded::parse(q.as_bytes()) {debug!("{:?} {:?}", k, v);if k == "changelist" {if let Ok(from) = v.parse() {command = Some(Command::Changelist {from,req_paths: Vec::new(),})}} else if k == "path" {req_paths_.push(v.to_string());} else if k == "change" || k == "partial" || k == "tag" {if let Some(hash) = pijul_core::pristine::Hash::from_base32(v.as_bytes()) {command = Some(Command::Change {hash,full: k == "change",tag: k == "tag",})}} else if k == "channel" {if let Some(cap) = crate::ssh::DISC.captures(&v) {channel = ChannelSpec::Discussion(cap[1].parse().unwrap())} else {channel = ChannelSpec::Channel(v.to_string())}} else if k == "state" {if let Ok(from) = v.parse() {command = Some(Command::State { at: Some(from) })} else if v.is_empty() {command = Some(Command::State { at: None })}} else if k == "identities" {if let Ok(from) = v.parse() {command = Some(Command::Identities { from: Some(from) })} else if v.is_empty() {command = Some(Command::Identities { from: None })}} else if k == "id" {command = Some(Command::Id)}}if let Some(Command::Changelist {ref mut req_paths, ..}) = command{*req_paths = req_paths_}}let is_default_channel = channel == ChannelSpec::Channel("main".to_string());debug!("command = {:?}", command);match command {Some(Command::Changelist { from, req_paths }) => match channel {ChannelSpec::Channel(channel) => {let repo_ = config.repo_locks.get(&rid).await?;let pristine = repo_.pristine.read().await;let txn = pristine.txn_begin()?;let c = channel_spec(&repo_id, &channel);let channel = if let Some(channel) = txn.load_channel(&c)? {channel} else if is_default_channel {return Ok(Json(()).into_response());} else {debug!("channel not found {:?}", c);return Ok((StatusCode::NOT_FOUND,Json(pijul_core::RemoteError::ChannelNotFound {channel,url: format!("{}/{}", t.owner, t.repo),}),).into_response());};use std::io::Write;let mut v = Vec::new();let mut paths = HashSet::new();for r in req_paths {if let Ok((p, ambiguous)) = txn.follow_oldest_path(&repo_.changes, &channel, &r){let h: pijul_core::Hash = txn.get_external(&p.change)?.unwrap().into();writeln!(v, "{}.{}", h.to_base32(), p.pos.0)?;if ambiguous {let body =serde_json::to_vec(&pijul_core::RemoteError::AmbiguousPath {path: r.to_string(),}).unwrap();return Ok(hyper::Response::builder().status(hyper::StatusCode::NOT_FOUND).body(body.into())?);}paths.insert(p);paths.extend(pijul_core::fs::iter_graph_descendants(&txn,txn.graph(&*channel.read()),p,)?.filter_map(|x| x.ok()),);} else {let body = serde_json::to_vec(&pijul_core::RemoteError::PathNotFound {path: r.to_string(),}).unwrap();return Ok(hyper::Response::builder().status(hyper::StatusCode::NOT_FOUND).body(body.into())?);}}debug!("paths = {:?}", paths);let tags: Vec<u64> = txn.iter_tags(txn.tags(&*channel.read()), from)?.map(|k| (*k.unwrap().0).into()).collect();let mut tagsi = 0;for n in txn.log(&*channel.read(), from)? {let (n, (h, m)) = n?;let h_int = txn.get_internal(h)?.unwrap();if paths.is_empty()|| paths.iter().any(|x| {x.change == *h_int|| txn.get_touched_files(x, Some(h_int)).unwrap().is_some()}){let h: pijul_core::Hash = h.into();let m: pijul_core::Merkle = m.into();if tags.get(tagsi) == Some(&n) {writeln!(v, "{}.{}.{}.", n, h.to_base32(), m.to_base32())?;tagsi += 1} else {writeln!(v, "{}.{}.{}", n, h.to_base32(), m.to_base32())?;}}}Ok(v.into_response())}ChannelSpec::Discussion(disc) => {let repo_ = config.repo_locks.get(&rid).await?;let body =crate::discussions::discussion_changelist(&mut db, &repo_.changes, rid, disc).await?;Ok(body.into_response())}},Some(Command::Change { hash, full, tag }) => {let mut p = super::nest_changes_path(&config, rid);pijul_core::changestore::filesystem::push_filename(&mut p, &hash);debug!("{:?}", p);let file = match tokio::fs::File::open(&p).await {Ok(file) => file,Err(err) => {return Ok((StatusCode::NOT_FOUND, format!("File not found: {}", err)).into_response());}};Ok(Body::from_stream(ReaderStream::new(file).into_stream()).into_response())}Some(Command::State { at }) => match channel {ChannelSpec::Channel(channel) => {let repo_ = config.repo_locks.get(&rid).await?;let pristine = repo_.pristine.read().await;let txn = pristine.txn_begin()?;let c = channel_spec(&repo_id, &channel);let channel = if let Some(channel) = txn.load_channel(&c)? {channel} else {debug!("channel not found {:?}", c);let body = serde_json::to_vec(&pijul_core::RemoteError::ChannelNotFound {channel,url: format!("{}/{}", t.owner, t.repo),}).unwrap();return Ok(hyper::Response::builder().status(hyper::StatusCode::NOT_FOUND).body(body.into())?);};let mut o = Vec::new();use std::io::Write;if let Some(n) = txn.reverse_log(&*channel.read(), at)?.next() {let (n, (_, m)) = n?;let m: pijul_core::Merkle = m.into();writeln!(o, "{} {}", n, m.to_base32())?} else {writeln!(o, "-")?;}debug!("body = {:?}", o);Ok(o.into_response())}ChannelSpec::Discussion(disc) => {let repo_ = config.repo_locks.get(&rid).await?;let (n, m) = crate::discussions::discussion_state(&mut db, rid, disc, at).await?;Ok(format!("{} {}\n", n, m.to_base32()).into_response())}},Some(Command::Identities { from }) => {/*let rows = if let Some(rev) = from {self.db.query("SELECT login, email, name, revision, signingkeys.algorithm, signingkeys.public_key, signingkeys.signature, signingkeys.expires FROM contributors JOIN signingkeys ON signingkeys.public_key = contributors.key JOIN users ON user_id = users.id WHERE repo = $1 AND revision > $2",&[&repo.id.repo_id,&(rev as i64)]).await?} else {self.db.query("SELECT login, email, name, revision, signingkeys.algorithm, signingkeys.public_key, signingkeys.signature, signingkeys.expires FROM contributors JOIN signingkeys ON signingkeys.public_key = contributors.key JOIN users ON user_id = users.id WHERE repo = $1",&[&repo.id.repo_id,]).await?};*/#[derive(Debug, Serialize)]struct Identities {id: Vec<super::Identity>,rev: u64,}let id = Identities {id: Vec::new(),rev: 0,};/*for r in rows {let login: String = r.get(0);let email: String = r.get(1);let revision: chrono::DateTime<chrono::Utc> = r.get(3);id.rev = id.rev.max(revision.timestamp() as u64);let algorithm: crate::ssh::Keyalgorithm = r.get(4);let key: Vec<u8> = r.get(5);let signature: Vec<u8> = r.get(6);let expires: Option<chrono::DateTime<chrono::Utc>> = r.get(7);debug!("{:?} {:?}", key, login);id.id.push(Identity {public_key: pijul_core::key::PublicKey {algorithm: algorithm.into(),key: bs58::encode(&key).into_string(),signature: bs58::encode(&signature).into_string(),expires,version: pijul_core::key::VERSION,},login,email: Some(email),origin: "nest.pijul.com".to_string(),name: None,last_modified: revision.timestamp() as u64,})}*/let body = serde_json::to_string_pretty(&id).unwrap();Ok((hyper::StatusCode::OK, Json(id)).into_response())}Some(Command::Id) => match channel {ChannelSpec::Channel(channel) => {let repo_ = config.repo_locks.get(&rid).await?;let pristine = repo_.pristine.read().await;let txn = pristine.txn_begin()?;let c = channel_spec(&repo_id, &channel);if let Some(channel) = txn.load_channel(&c)? {let channel = channel.read();Ok(format!("{}", channel.id).into_response())} else {Ok((StatusCode::NOT_FOUND, b"Channel not found").into_response())}}_ => Ok(StatusCode::OK.into_response()),},None => Ok((StatusCode::NOT_FOUND, b"No command specified").into_response()),}}
use libpijul::change::{Change, ChangeFile, ChangeHeader};use libpijul::changestore::filesystem::*;use libpijul::changestore::*;use libpijul::pristine::{Base32, ChangeId, Hash, Merkle, Position, Vertex};
use pijul_core::change::{Change, ChangeFile, ChangeHeader};use pijul_core::changestore::filesystem::*;use pijul_core::changestore::*;use pijul_core::pristine::{Base32, ChangeId, Hash, Merkle, Position, Vertex};
) -> Result<Hashed<Hunk<Option<Hash>, Local>, Author>, libpijul::change::ChangeError> {use libpijul::change::*;use libpijul::pristine::Hasher;
) -> Result<Hashed<Hunk<Option<Hash>, Local>, Author>, pijul_core::change::ChangeError> {use pijul_core::change::*;use pijul_core::pristine::Hasher;
use libpijul::changestore::ChangeStore;use libpijul::fs::FsErrorC;use libpijul::output::{FileError, OutputError};use libpijul::pristine::sanakirja::MutTxn0;use libpijul::pristine::sanakirja::SanakirjaError;use libpijul::pristine::{ForkError, TreeErr, TxnErr};use libpijul::{ApplyError, ArcTxn, ChannelMutTxnT, ChannelRef, MutTxnT, MutTxnTExt, TxnT, TxnTExt,UnrecordError,
use pijul_core::changestore::ChangeStore;use pijul_core::fs::FsErrorC;use pijul_core::output::{FileError, OutputError};use pijul_core::pristine::sanakirja::MutTxn0;use pijul_core::pristine::sanakirja::SanakirjaError;use pijul_core::pristine::{ForkError, TreeErr, TxnErr};use pijul_core::{ApplyError, ArcTxn, Base32, ChannelMutTxnT, MutTxnT, MutTxnTExt, TxnT, TxnTExt, UnrecordError,
let repo_ = s.locks.get(&repo).await.unwrap();tokio::task::spawn_blocking(move || {if let Some((temp, depl)) = {let pri = repo_.pristine.blocking_write();let txn = pri.arc_txn_begin()?;let channel = format!("{}_{}", repo, channel);let channel_ = {let mut txn_ = txn.write();txn_.open_or_create_channel(&channel)?};let changes = repo_.changes.clone();
if let Some((state, deployment)) =get_config_state(&s.locks, repo, channel).await?{debug!("{:?} {:?}", state, deployment);let mut db = s.db.get().await.unwrap();use crate::db::jobs::dsl as jobs;let id = diesel::insert_into(jobs::jobs).values((jobs::repo.eq(repo),jobs::repo_state.eq(state.to_base32()),)).returning(jobs::id).get_result::<uuid::Uuid>(&mut db).await.unwrap();use crate::db::repositories::dsl as repositories;use crate::db::users::dsl as users;
s.output_for_deployment(&txn, &channel_, &changes)?} {tokio::spawn(async move {use crate::db::jobs::dsl as jobs;let id = diesel::insert_into(jobs::jobs).values((jobs::repo.eq(repo),)).returning(jobs::id).get_result::<uuid::Uuid>(&mut s.db.get().await.unwrap()).await.unwrap();
let (owner, repo) = repositories::repositories.find(repo).inner_join(users::users).select((users::login, repositories::name)).get_result::<(String, String)>(&mut db).await.unwrap();let mut targets = std::collections::HashMap::new();targets.insert(deployment.clone(), format!(".#{}", deployment));let body = serde_json::to_string(&ci::Trigger {id,owner,repo,state: state.to_base32(),targets,}).unwrap();debug!("{:?} {}", s.ci.url, body);for (n, ci) in s.ci.url.iter().enumerate() {let res = s.client.post(ci.clone() + "/trigger").header("Content-Type", "application/json").body(body.clone()).send().await.unwrap();debug!("{:?}", res);
let permit = s.builders.acquire().await.unwrap();{s.deploy(id, temp, depl).await?;}std::mem::drop(permit);Ok::<_, Error>(())});
if let reqwest::StatusCode::OK = res.status() {tokio::spawn(async move {sync_job(&mut db, s.client, &s.ci, n, id).await;});break;}
txn: &ArcTxn<libpijul::pristine::sanakirja::MutTxn0>,channel: &ChannelRef<libpijul::pristine::sanakirja::MutTxn0>,
txn: &ArcTxn<pijul_core::pristine::sanakirja::MutTxn0>,channel: &pijul_core::ChannelRef<pijul_core::pristine::sanakirja::MutTxn0>,
use libpijul::ChannelTxnT;let mut graph = libpijul::alive::retrieve(&*txn_, txn_.graph(&*channel_), pos, false)?;
use pijul_core::ChannelTxnT;let mut graph = {let txn = txn.read();pijul_core::alive::retrieve(&*txn, txn.graph(&*channel.read()), pos, false)?};
impl H {fn output_for_deployment<C: ChangeStore<Error = crate::repository::changestore::Error> + Clone + Send + Sync + 'static,>(&self,txn: &ArcTxn<libpijul::pristine::sanakirja::MutTxn0>,channel: &ChannelRef<libpijul::pristine::sanakirja::MutTxn0>,changes: &C,) -> Result<Option<(tempfile::TempDir, String)>, Error> {if let Some(config) = get_file(txn, channel, changes, "pijul.toml")? {debug!("config = {:?}", config);
pub async fn get_config_state(locks: &RepositoryLocks,repo: uuid::Uuid,channel: String,) -> Result<Option<(pijul_core::Merkle, String)>, Error> {debug!("get_config_state");let repo_ = locks.get(&repo).await.unwrap();tokio::task::spawn_blocking(move || {let pri = repo_.pristine.blocking_write();let txn_ = pri.arc_txn_begin()?;let channel = format!("{}_{}", repo, channel);if let Some(channel) = txn_.read().load_channel(&channel)? {if let Some(config) = get_file(&txn_, &channel, &repo_.changes, "pijul.toml")? {debug!("config = {:?}", config);
if let Ok(parsed) = toml::from_str::<Config>(&config) {debug!("{:?}", parsed);if let Some(depl) = parsed.deployment {let txn = txn.clone();let channel = channel.clone();let changes = changes.clone();let tmp_dir = tempfile::tempdir().unwrap();let wc =libpijul::working_copy::filesystem::FileSystem::from_root(tmp_dir.path());libpijul::output::output_repository_no_pending(&wc, &changes, &txn, &channel, "", true, None, 1, 0,).unwrap();return Ok(Some((tmp_dir, depl)));
if let Ok(parsed) = toml::from_str::<Config>(&config) {if let Some(dep) = parsed.deployment {if let Some(Ok((_, (_, state)))) =txn_.read().reverse_log(&channel.read(), None)?.next(){return Ok(Some((state.into(), dep)));}}
}async fn deploy(&self,id: uuid::Uuid,tmp_dir: tempfile::TempDir,depl: String,) -> Result<(), Error> {use std::process::Stdio;let (status_tx, status_rx) = tokio::sync::watch::channel(None);let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();let p = tmp_dir.path().join(depl);debug!("launching {:?}", p);
}).await.unwrap()}
let mut cmd = tokio::process::Command::new(p).current_dir(tmp_dir.path()).stderr(Stdio::piped()).stdout(Stdio::piped()).stdin(Stdio::null()).spawn()
pub async fn sync_job(db: &mut diesel_async::AsyncPgConnection,client: reqwest::Client,ci: &crate::config_file::CiConfig,ci_n: usize,job: uuid::Uuid,) {let mut files = if let Some(ref f) = ci.filesystem {let stdout = OpenOptions::new().append(true).create(true).open(&f.join(&format!("{}.stdout", job))).await.unwrap();let stderr = OpenOptions::new().append(true).create(true).open(&f.join(&format!("{}.stderr", job))).await
use tokio::io::AsyncBufReadExt;let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());let mut stdout = stdout.lines();let mut stdout_ok = true;let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());let mut stderr = stderr.lines();self.jobs.lock().unwrap().insert(id, (kill_tx, status_tx.clone(), status_rx));let mut stderr_ok = true;let mut buf_stdout = String::new();let mut last_stdout = std::time::UNIX_EPOCH;let mut buf_stderr = String::new();let mut last_stderr = std::time::UNIX_EPOCH;let bound = std::time::Duration::from_secs(1);let mut files = if let Some(ref path) = self.ci.filesystem {Some((OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stdout", id))).await.unwrap(),OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stderr", id))).await.unwrap(),))
loop {debug!("Querying status");let status = client.get(format!("{}/status/{}", ci.url[ci_n], job)).send().await.unwrap();let status = if let reqwest::StatusCode::OK = status.status() {let status: ci::Status = status.json().await.unwrap();debug!("STATUS {:?}", status);use crate::db::jobs::dsl as jobs;diesel::update(jobs::jobs.find(job)).set((jobs::status.eq(status.code),jobs::ended.eq(status.finished),)).execute(db).await.unwrap();Some(status)
while stdout_ok || stderr_ok {debug!("stdout || stderr {:?} {:?}",buf_stdout.len(),buf_stderr.len()
if let Some((ref mut stdout, ref mut stderr)) = files {let stdout_len = stdout.metadata().await.unwrap().len();let stderr_len = stderr.metadata().await.unwrap().len();let (stdout_, stderr_) = tokio::join!(client.get(format!("{}/stdout/{}", ci.url[ci_n], job)).header("Range", format!("bytes={stdout_len}-*")).send(),client.get(format!("{}/stderr/{}", ci.url[ci_n], job)).header("Range", format!("bytes={stderr_len}-*")).send()
tokio::select! {line = stdout.next_line(), if stdout_ok => {let n = if let Some(line) = line? {buf_stdout.push_str(&line);buf_stdout.push('\n');line.len() + 1} else {0};if last_stdout.elapsed().unwrap() >= bound || n == 0 {debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());if let Some((ref mut stdout, _)) = files {stdout.write_all(buf_stdout.as_bytes()).await?;stdout.flush().await?;}buf_stdout.clear();debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());last_stdout = std::time::SystemTime::now();}if n == 0 {stdout_ok = false}}line = stderr.next_line(), if stderr_ok => {let n = if let Some(line) = line? {buf_stderr.push_str(&line);buf_stderr.push('\n');line.len() + 1} else {0};if last_stderr.elapsed().unwrap() >= bound || n == 0 {debug!("sending stderr to db {:?}", buf_stderr.len());if let Some((_, ref mut stderr)) = files {stderr.write_all(buf_stderr.as_bytes()).await?;stderr.flush().await?;}buf_stderr.clear();last_stderr = std::time::SystemTime::now();}debug!("{:?}", buf_stderr.len());if n == 0 {stderr_ok = false}}}
let stdout_ = stdout_.unwrap();let stderr_ = stderr_.unwrap();debug!("{:?} {:?}", stdout_, stderr_);let bytes = stdout_.bytes().await.unwrap();stdout.write(&bytes).await.unwrap();let bytes = stderr_.bytes().await.unwrap();stderr.write(&bytes).await.unwrap();
let status = tokio::select! {status = cmd.wait() => {status?.code()
if let Some(status) = status {let (mut status_tx, _status_rx) = tokio::sync::watch::channel(None);let (_kill_tx, mut kill_rx) = tokio::sync::oneshot::channel();for (script, s) in status.results.iter() {let mut cmd = tokio::process::Command::new("nix-store");cmd.arg("-r").arg(&s);let cmd = cmd.stderr(Stdio::piped()).stdout(Stdio::piped()).stdin(Stdio::null()).spawn().unwrap();ci::dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx).await.unwrap();
_ = kill_rx => {cmd.kill().await?;None
for (script, s) in status.results.iter() {debug!("launching {:?} {:?}", s, script);let cmd = tokio::process::Command::new(format!("{}/bin/{script}", s.display())).stderr(Stdio::piped()).stdout(Stdio::piped()).stdin(Stdio::null()).spawn().unwrap();ci::dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx).await.unwrap();
};debug!("process exited with {:?}", status);debug!("stderr {}", buf_stderr);debug!("stdout {}", buf_stdout);let now = chrono::Utc::now();use crate::db::jobs::dsl as jobs;diesel::update(jobs::jobs.find(id)).set((jobs::status.eq(status), jobs::ended.eq(&now))).execute(&mut self.db.get().await.unwrap()).await?;status_tx.send(Some((now, status))).unwrap();self.jobs.lock().unwrap().remove(&id);Ok::<_, Error>(())
break;}tokio::time::sleep(std::time::Duration::from_secs(1)).await
pub async fn redirect_http_to_https(http: u16, https: u16, mut app: Router<()>) {fn make_https(host: String,uri: http::Uri,http: u16,https: u16,) -> Result<http::Uri, axum::BoxError> {let mut parts = uri.into_parts();parts.scheme = Some(axum::http::uri::Scheme::HTTPS);
*/
if parts.path_and_query.is_none() {parts.path_and_query = Some("/".parse().unwrap());}let https_host = host.replace(&http.to_string(), &https.to_string());parts.authority = Some(https_host.parse()?);Ok(http::Uri::from_parts(parts)?)}use axum::extract::ConnectInfo;use axum::response::IntoResponse;use axum_extra::extract::Host;let redirect = move |Host(host): Host,uri: http::Uri,ConnectInfo(addr): ConnectInfo<SocketAddr>,r: http::Request<axum::body::Body>| async move {debug!("redirect {:?}", addr);if addr.ip().is_loopback() {debug!("is loopback");use tower_service::Service;Ok(app.call(r).await.into_response())} else {debug!("not loopback");match make_https(host, uri, http, https) {Ok(uri) => Ok(Redirect::permanent(&uri.to_string()).into_response()),Err(error) => {tracing::warn!(%error, "failed to convert URI to HTTPS");Err(StatusCode::BAD_REQUEST.into_response())}}}};let addr = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0);let http_addr = SocketAddr::V6(SocketAddrV6::new(addr, http, 0, 0));let listener = tokio::net::TcpListener::bind(http_addr).await.unwrap();tracing::debug!("listening on {}", listener.local_addr().unwrap());use axum::handler::HandlerWithoutStateExt;axum::serve(listener,redirect.into_make_service_with_connect_info::<SocketAddr>(),).await.unwrap();
async fn redirect_http_to_https(uri: Uri) -> Redirect {let uri = format!("https://{}{}", uri.host().unwrap(), uri.path());Redirect::temporary(&uri)
impl<E: Into<Error> + std::error::Error> From<libpijul::pristine::TxnErr<E>> for Error {fn from(e: libpijul::pristine::TxnErr<E>) -> Self {
impl<E: Into<Error> + std::error::Error> From<pijul_core::pristine::TxnErr<E>> for Error {fn from(e: pijul_core::pristine::TxnErr<E>) -> Self {
.route("/{owner}/{repo}/{job_id}/ws", any(ws_handler))
.route("/{owner}/{repo}/{job_id}/retry", post(retry)).route("/{owner}/{repo}/{job_id}/kill", post(kill)).route("/{owner}/{repo}/{job_id}/ws", any(job_ws_handler)).route("/{owner}/{repo}/ws", any(jobs_ws_handler))
pub async fn retry(State(config): State<Config>,jar: SignedCookieJar,Path(path): Path<JobPath>,) -> Result<Redirect, crate::Error> {let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {(Some(a), Some(b))} else {(None, None)};let mut db = config.db.get().await?;use crate::db::jobs::dsl as jobs;use crate::db::repositories::dsl as repos;use crate::db::users::dsl as users;
pub async fn ws_handler(
if let Some(repo) = repos::repositories.inner_join(jobs::jobs).inner_join(users::users).filter(users::login.eq(&path.owner)).filter(repos::name.eq(&path.repo)).filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(uid.unwrap_or(uuid::Uuid::nil()),repos::id,Perm::WRITE_JOBS.bits()))).filter(jobs::id.eq(path.job_id)).select(repos::id).get_result::<uuid::Uuid>(&mut db).await.optional()?{let mut db = config.db.get().await.unwrap();use crate::db::jobs::dsl as jobs;let (Some(state), channel) = jobs::jobs.find(&path.job_id).select((jobs::repo_state, jobs::channel)).get_result::<(Option<String>, String)>(&mut db).await.unwrap()else {return Err(crate::Error::NotFound);};let id = diesel::insert_into(jobs::jobs).values((jobs::repo.eq(repo), jobs::repo_state.eq(&state))).returning(jobs::id).get_result::<uuid::Uuid>(&mut db).await.unwrap();use crate::db::repositories::dsl as repositories;use crate::db::users::dsl as users;if let Some((state, deployment)) =crate::replication::get_config_state(&config.repo_locks, repo, channel).await.unwrap(){let (owner, repo) = repositories::repositories.find(repo).inner_join(users::users).select((users::login, repositories::name)).get_result::<(String, String)>(&mut db).await.unwrap();let mut targets = std::collections::HashMap::new();targets.insert(deployment.clone(), format!(".#{}", deployment));let body = serde_json::to_string(&ci::Trigger {id,owner,repo,state: state.to_base32(),targets,}).unwrap();debug!("{:?} {}", config.ci.url, body);tokio::spawn(async move {let client = reqwest::Client::new();for (n, ci_url) in config.ci.url.iter().enumerate() {let res = client.post(ci_url.clone() + "/trigger").header("Content-Type", "application/json").body(body.clone()).send().await.unwrap();if let reqwest::StatusCode::OK = res.status() {crate::replication::sync_job(&mut db, client, &config.ci, n, id).await;break;}}});}Ok(Redirect::to(&format!("/{}/{}/jobs", path.owner, path.repo)))} else {Err(crate::Error::NotFound)}}pub async fn kill(State(config): State<Config>,jar: SignedCookieJar,Path(path): Path<JobPath>,) -> Result<Redirect, crate::Error> {let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {(Some(a), Some(b))} else {(None, None)};let mut db = config.db.get().await?;use crate::db::jobs::dsl as jobs;use crate::db::repositories::dsl as repos;use crate::db::users::dsl as users;if let Some(repo) = repos::repositories.inner_join(jobs::jobs).inner_join(users::users).filter(users::login.eq(&path.owner)).filter(repos::name.eq(&path.repo)).filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(uid.unwrap_or(uuid::Uuid::nil()),repos::id,Perm::WRITE_JOBS.bits()))).filter(jobs::id.eq(path.job_id)).select(repos::id).get_result::<uuid::Uuid>(&mut db).await.optional()?{let client = reqwest::Client::new();for cl in config.ci.url.iter() {let res = client.post(format!("{}/kill/{}", cl, path.job_id)).header("Content-Type", "application/json").send().await.unwrap();}Ok(Redirect::to(&format!("/{}/{}/jobs", path.owner, path.repo)))} else {Err(crate::Error::NotFound)}}pub async fn job_ws_handler(
ws.on_upgrade(move |socket| handle_socket(config, path.job_id, socket))
use crate::db::repositories::dsl as repos;use crate::db::users::dsl as users;let (uid, login) = if let Ok(Some((a, b))) = get_user_login(&jar, &config).await {(Some(a), Some(b))} else {(None, None)};let mut db = config.db.get().await.unwrap();if repos::repositories.inner_join(users::users).filter(users::login.eq(path.owner)).filter(repos::name.eq(path.repo)).filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(uid.unwrap_or(uuid::Uuid::nil()),repos::id,Perm::READ_JOBS.bits()))).select(repos::id).get_result::<uuid::Uuid>(&mut db).await.optional().unwrap().is_none(){return crate::Error::NeedsAuth.into_response();}ws.on_upgrade(move |socket| job_handle_socket(config, path.job_id, socket))}pub async fn jobs_ws_handler(State(config): State<Config>,Path(path): Path<JobPath>,jar: SignedCookieJar,ws: WebSocketUpgrade,) -> Response {use crate::db::repositories::dsl as repos;use crate::db::users::dsl as users;let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await.unwrap() {(Some(a), Some(b))} else {(None, None)};let mut db = config.db.get().await.unwrap();let Some(repo_id) = repos::repositories.inner_join(users::users).filter(users::login.eq(path.owner)).filter(repos::name.eq(path.repo)).filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(uid.unwrap_or(uuid::Uuid::nil()),repos::id,Perm::READ_JOBS.bits()))).select(repos::id).get_result::<uuid::Uuid>(&mut db).await.optional().unwrap()else {return crate::Error::NeedsAuth.into_response();};ws.on_upgrade(move |socket| jobs_handle_socket(config, repo_id, socket))
}}#[derive(Debug, Deserialize, Serialize)]enum JobsMsg {Jobs(Vec<Job>),Heartbeat,}// Update jobs page list.async fn jobs_handle_socket(config: Config, repo_id: uuid::Uuid, mut socket: WebSocket) {use crate::db::jobs::dsl as jobs;let mut latest = {let mut latest = config.jobs_latest.lock().await;match latest.entry(repo_id) {Entry::Vacant(e) => {let last = jobs::jobs.filter(jobs::repo.eq(repo_id)).select(jobs::id).order_by(jobs::started.desc()).first::<uuid::Uuid>(&mut config.db.get().await.unwrap()).await.unwrap();let (sender, receiver) = tokio::sync::watch::channel(last);e.insert(sender);receiver}Entry::Occupied(e) => e.get().subscribe(),}};let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));loop {debug!("waiting job");tokio::select! {_ = interval.tick() => {socket.send(serde_json::to_string(&JobsMsg::Heartbeat).unwrap().into()).await.unwrap_or(());}x = latest.changed() => {debug!("status {:?}", x);let mut db = config.db.get().await.unwrap();let latest = latest.borrow_and_update().clone();let jobs = jobs::jobs.filter(jobs::repo.eq(repo_id)).select(Job::as_select()).order_by(jobs::started.desc()).get_results::<Job>(&mut db).await.unwrap();socket.send(serde_json::to_string(&JobsMsg::Jobs(jobs)).unwrap().into()).await.unwrap_or(());break}}
// colored signatureslet h = rng.gen::<f64>();let color = Hsv::new(RgbHue::from(h * 360f64), 0.8, 1.);let rgb = Rgb::from(color);write!(s, "\" stroke=\"rgb({},{},{})\" stroke-width=\"2\" fill=\"transparent\"/></svg>",
// colored signatureslet h = rng.gen::<f64>();let color = Hsv::new(RgbHue::from(h * 360f64), 0.8, 1.);let rgb = Rgb::from(color);write!(s, "\" stroke=\"rgb({},{},{})\" stroke-width=\"2\" fill=\"transparent\"/></svg>",
.get_header(&libpijul::Hash::from_base32(hash.as_bytes()).unwrap())?;let hash_ = libpijul::Hash::from_base32(hash.as_bytes()).unwrap();
.get_header(&pijul_core::Hash::from_base32(hash.as_bytes()).unwrap())?;let hash_ = pijul_core::Hash::from_base32(hash.as_bytes()).unwrap();
}}diesel::table! {use diesel::sql_types::*;use diesel::pg::sql_types::*;use crate::{Keyalgorithm};old_logins (login) {login -> Citext,user_id -> Nullable<Uuid>,retired -> Nullable<Timestamptz>,
repo_state -> Nullable<Text>,channel -> Text,script -> Nullable<Text>,
impl<'a> From<libpijul::changestore::FileMetadata<'a>> for Metadata {fn from(f: libpijul::changestore::FileMetadata<'a>) -> Metadata {
impl<'a> From<pijul_core::changestore::FileMetadata<'a>> for Metadata {fn from(f: pijul_core::changestore::FileMetadata<'a>) -> Metadata {
libpijul::change::Hunk::FileAdd { .. } => unreachable!(),libpijul::change::Hunk::FileMove {add: libpijul::change::Atom::NewVertex(ref add),
pijul_core::change::Hunk::FileAdd { .. } => unreachable!(),pijul_core::change::Hunk::FileMove {add: pijul_core::change::Atom::NewVertex(ref add),
libpijul::change::Hunk::Replacement {change: libpijul::change::Atom::EdgeMap(ref change),replacement: libpijul::change::Atom::NewVertex(ref replacement),
pijul_core::change::Hunk::Replacement {change: pijul_core::change::Atom::EdgeMap(ref change),replacement: pijul_core::change::Atom::NewVertex(ref replacement),
libpijul::change::Hunk::Replacement { .. } => unreachable!(),libpijul::change::Hunk::SolveOrderConflict { change, local } => {
pijul_core::change::Hunk::Replacement { .. } => unreachable!(),pijul_core::change::Hunk::SolveOrderConflict { change, local } => {
pos: &libpijul::pristine::Position<Option<Hash>>,) -> libpijul::pristine::Position<usize> {libpijul::pristine::Position {
pos: &pijul_core::pristine::Position<Option<Hash>>,) -> pijul_core::pristine::Position<usize> {pijul_core::pristine::Position {
n: &libpijul::change::NewVertex<Option<libpijul::pristine::Hash>>,contents: &Option<libpijul::change::Atom<Option<libpijul::pristine::Hash>>>,
n: &pijul_core::change::NewVertex<Option<pijul_core::pristine::Hash>>,contents: &Option<pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>>,
add: &'a libpijul::change::NewVertex<Option<libpijul::pristine::Hash>>,del: &'a libpijul::change::Atom<Option<libpijul::pristine::Hash>>,
add: &'a pijul_core::change::NewVertex<Option<pijul_core::pristine::Hash>>,del: &'a pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>,
let meta: Metadata = libpijul::changestore::FileMetadata::read(&buf).into();let (old, show_perms) = if let libpijul::change::Atom::EdgeMap(ref del) = del {
let meta: Metadata = pijul_core::changestore::FileMetadata::read(&buf).into();let (old, show_perms) = if let pijul_core::change::Atom::EdgeMap(ref del) = del {
del: &'a libpijul::change::Atom<Option<libpijul::pristine::Hash>>,contents: &'a Option<libpijul::change::Atom<Option<libpijul::pristine::Hash>>>,
del: &'a pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>,contents: &'a Option<pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>>,
undel: &'a libpijul::change::Atom<Option<libpijul::pristine::Hash>>,contents: &'a Option<libpijul::change::Atom<Option<libpijul::pristine::Hash>>>,
undel: &'a pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>,contents: &'a Option<pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>>,
change: &'a libpijul::change::EdgeMap<Option<libpijul::pristine::Hash>>,replacement: &'a libpijul::change::NewVertex<Option<libpijul::pristine::Hash>>,local: &'a libpijul::change::Local,
change: &'a pijul_core::change::EdgeMap<Option<pijul_core::pristine::Hash>>,replacement: &'a pijul_core::change::NewVertex<Option<pijul_core::pristine::Hash>>,local: &'a pijul_core::change::Local,
change: &'a libpijul::change::Atom<Option<libpijul::pristine::Hash>>,name: &'a libpijul::change::Atom<Option<libpijul::pristine::Hash>>,
change: &'a pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>,name: &'a pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>,
change: &'a libpijul::change::Atom<Option<libpijul::pristine::Hash>>,name: &'a libpijul::change::Atom<Option<libpijul::pristine::Hash>>,
change: &'a pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>,name: &'a pijul_core::change::Atom<Option<pijul_core::pristine::Hash>>,
anyhow = "1.0.100"axum = { version = "0.8.7", features = ["macros", "ws"] }axum-extra = { version = "0.12.2", features = [ "cookie-signed", "cookie", "typed-header" ] }
anyhow.workspace = trueaxum.workspace = trueaxum-extra = { version = "0.12.6", features = [ "cookie-signed", "cookie", "typed-header" ] }
bytes = "1.11.0"chrono = { version = "0.4.42", features = ["serde"] }ci = { path = "../ci" }clap = { version = "4.5.53", features = ["derive"] }
bytes = "1.11.1"chrono.workspace = trueci.workspace = trueclap = { version = "4.6.1", features = ["derive"] }
diesel = { version = "2.3.3", features = [ "postgres_backend", "extras", "network-address" ] }diesel-async = { version = "0.7.4", features = [ "postgres", "deadpool" ] }diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }futures = "0.3.31"
diesel.workspace = truediesel-async.workspace = truediesel-derive-enum.workspace = truefutures = "0.3.32"
tokio = { version = "1.48.0", features = ["full"] }tokio-postgres = "0.7.15"tokio-stream = "0.1.17"toml = "0.9.8"tower-http = { version = "0.6.7", features = [ "trace", "cors", "compression-br", "compression-gzip", "compression-zstd", "compression-deflate" ] }
tokio = { version = "1.52.3", features = ["full"] }tokio-postgres = "0.7.17"tokio-stream = "0.1.18"toml = "0.9.12"tower-http = { version = "0.6.11", features = [ "trace", "cors", "compression-br", "compression-gzip", "compression-zstd", "compression-deflate", "fs" ] }
tracing = "0.1.41"tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }url = "2.5.7"uuid = "1.18.1"webauthn-rs = "0.5.3"
tracing = "0.1.44"tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }url = "2.5.8"uuid = "1.23.2"webauthn-rs = "0.5.5"
tokio-util = "0.7.18"
[workspace.dependencies]pijul-core = { version = "1.0.0-beta.14", features = ["tarball"] }pijul-remote = { version = "1.0.0-beta.14" }pijul-repository = { version = "1.0.0-beta.13" }pijul-config = { version = "1.0.0-beta.12" }pijul-interaction = { version = "1.0.0-beta.12" }ci = { path = "ci" }anyhow = "1.0.102"axum = { version = "0.8.9", features = [ "macros", "ws" ] }bincode = "1.3"chrono = { version = "0.4.44", features = ["serde"] }clap = { version = "4.6.1", features = ["derive"] }env_logger = "0.11.10"futures = "0.3.32"log = "0.4.31"serde = "1.0.228"serde_derive = "1.0.228"serde_json = "1.0.150"thrussh = "0.40"thrussh-keys = "0.23"tokio = { version = "1.52", features = [ "process", "fs" ] }toml = { version = "0.9.12" }uuid = { version = "1.23.2", features = ["serde" ] }tracing = "0.1.44"tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }byteorder = "1.5.0"thiserror = "1.0"reqwest = { version = "0.11", features = ["stream", "json"] }sanakirja = "2.0.0-beta"tempfile = "3.27.0"diesel = { version = "2.3.9", features = [ "postgres_backend", "extras", "network-address" ] }diesel-async = { version = "0.7.4", features = [ "postgres", "deadpool" ] }diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }axum-range = "1.0.0"axum-extra = "0.12.6"
"cpufeatures",
"cpufeatures 0.2.17",][[package]]name = "aes"version = "0.9.1"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "f1fc76eaeac4c9164506c466d4ffdd8ec9d0c5bf57ee97177c4d8eceb3a0e138"dependencies = ["cipher 0.5.2","cpubits","cpufeatures 0.3.0",
checksum = "dbfe9f610fe4e99cf0cfcd03ccf8c63c28c616fe714d80475ef731f3b13dd21b"
checksum = "9963ff19f40c6102c76756ef0a46004c0d58957d87259fc9208ff8441c12ab96"dependencies = ["axum","axum-core","bytes","futures-util","headers","http 1.4.1","http-body 1.0.1","http-body-util","mime","pin-project-lite","rustversion","serde_core","tower-layer","tower-service","tracing",][[package]]name = "axum-extra"version = "0.12.6"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "be44683b41ccb9ab2d23a5230015c9c3c55be97a25e4428366de8873103f7970"
"syn 2.0.111",
"syn 2.0.117",][[package]]name = "axum-range"version = "1.0.0"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "6a30717ba4cd96521a74b1b847660f2823bc47d8da41db49a024552849fe9f5b"dependencies = ["axum","axum-extra 0.10.3","bytes","futures","http-body 1.0.1","pin-project","tokio",
name = "bincode_derive"version = "2.0.1"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09"dependencies = ["virtue",][[package]]
"crypto-common","inout",
"crypto-common 0.1.6","inout 0.1.4",][[package]]name = "cipher"version = "0.5.2"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "e8cf2a2c93cd704877c0858356ed03480ff301ee950b43f1cbe4573b088bfa6c"dependencies = ["block-buffer 0.12.0","crypto-common 0.2.2","inout 0.2.2",
name = "cmov"version = "0.5.4"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "0c9ea0ac24bc397ab3c98583a3c9ba74fa56b09a4449bbe172b9b1ddb016027a"[[package]]name = "cobs"version = "0.3.0"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "0fa961b519f0b462e3a3b4a34b64d119eeaca1d59af726fe450bbba07a9fc0a1"dependencies = ["thiserror 2.0.18",][[package]]
version = "0.4.31"
version = "0.4.32"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "cc14f565cf027a105f7a44ccf9e5b424348421a1d8952a8fc9d499d313107789"[[package]]name = "console"version = "0.15.11"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8"dependencies = ["encode_unicode","libc","once_cell","unicode-width","windows-sys 0.59.0",][[package]]name = "const-oid"version = "0.10.2"
name = "ctr"version = "0.10.1"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "baaca1c4b237092596f64d571e9db6ce4109c4ef9742e27590f1709594461f21"dependencies = ["cipher 0.5.2",][[package]]name = "ctutils"version = "0.4.2"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "7d5515a3834141de9eafb9717ad39eea8247b5674e6066c404e8c4b365d2a29e"dependencies = ["cmov",][[package]]
"tokio",][[package]]name = "deadpool-postgres"version = "0.14.1"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "3d697d376cbfa018c23eb4caab1fd1883dd9c906a8c034e8d9a3cb06a7e0bef9"dependencies = ["async-trait","deadpool","getrandom 0.2.16",
[[package]]name = "embedded-io"version = "0.4.0"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "ef1a6892d9eef45c8fa6b9e0086428a2cca8491aca8f787c534a3d6d0bcb3ced"[[package]]name = "embedded-io"version = "0.6.1"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d"[[package]]name = "encode_unicode"version = "1.0.0"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386"
checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d"dependencies = ["bytes","fnv","futures-core","futures-sink","futures-util","http 0.2.12","indexmap","slab","tokio","tokio-util","tracing",][[package]]name = "h2"version = "0.4.14"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "171fefbc92fe4a4de27e0698d6a5b392d6a0e333506bc49133760b3bcf948733"
version = "1.4.0"
version = "0.2.12"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1"dependencies = ["bytes","fnv","itoa",][[package]]name = "http"version = "1.4.1"
"http","hyper","hyper-util","rustls","rustls-pki-types",
"atomic-waker","bytes","futures-channel","futures-core","h2 0.4.14","http 1.4.1","http-body 1.0.1","httparse","httpdate","itoa","pin-project-lite","smallvec",
"futures-channel","futures-core","futures-util","http","http-body","hyper","ipnet","libc","percent-encoding",
"http 1.4.1","http-body 1.0.1","hyper 1.10.1",
][[package]]name = "indicatif"version = "0.17.11"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"dependencies = ["console","number_prefix","portable-atomic","unicode-segmentation","unicode-width","web-time",
name = "iri-string"version = "0.7.9"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397"dependencies = ["memchr","serde",][[package]]
checksum = "a3c2a6c0b4b5637c41719973ef40c6a1cf564f9db6958350de6193fbee9c23f5"
checksum = "eebcc3aff044e5944a8fbaf69eb277d11986064cba30c468730e8b9909fb551c"dependencies = ["byteorder","dbus-secret-service","linux-keyutils","log","security-framework 2.11.1","security-framework 3.7.0","windows-sys 0.60.2","zeroize",]
checksum = "f625e90234ef1fd164dfe7980ffb04a3036a08e31e051cc29e6273a0c9a7a7e7"dependencies = ["adler32","aes 0.7.5","bincode 1.3.3","bitflags 2.10.0","blake3","bs58 0.4.0","byteorder","canonical-path","chardetng","crossbeam-deque","curve25519-dalek 4.1.3","data-encoding","diffs","ed25519-dalek","encoding_rs","flate2","generic-array","getrandom 0.2.16","hmac 0.11.0","ignore","imara-diff","jiff","log","lru-cache","nom 7.1.3","parking_lot 0.12.5","path-slash","pbkdf2 0.9.0","pijul-macros","rand 0.9.2","regex","sanakirja","serde","serde_derive","serde_json","sha2 0.9.9","tar","tempfile","thiserror 2.0.17","toml 0.8.23","twox-hash","zstd-seekable",]
checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981"
version = "2.7.6"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"[[package]]name = "memmap"version = "0.7.0"
version = "2.8.1"
name = "objc2-core-foundation"version = "0.3.2"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536"dependencies = ["bitflags 2.12.1",][[package]]name = "objc2-system-configuration"version = "0.3.2"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "7216bd11cbda54ccabcab84d523dc93b858ec75ecfb3a7d89513fa22464da396"dependencies = ["objc2-core-foundation",][[package]]
"siphasher 1.0.1",
"siphasher 1.0.3",][[package]]name = "pijul-config"version = "1.0.0-beta.12"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "7920c9e7135f08f267de8539865c6a384a93613a64039afef51d7c6baa3fae72"dependencies = ["anyhow","dialoguer","dirs-next","figment","log","pijul-core","serde","serde_derive","toml 0.8.23","whoami 1.6.1",][[package]]name = "pijul-core"version = "1.0.0-beta.14"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "9e70164a087ed87f9ebda06a590afbe2443d5402c32fe66c0fe9e396de25a026"dependencies = ["adler32","aes 0.9.1","bincode","bitflags 2.12.1","blake3","bs58 0.4.0","byteorder","canonical-path","chardetng","crossbeam-deque","ctr 0.10.1","curve25519-dalek 4.1.3","data-encoding","diffs","ed25519-dalek","encoding_rs","flate2","generic-array","getrandom 0.2.17","hmac 0.11.0","ignore","imara-diff","jiff","log","lru-cache","nom 7.1.3","parking_lot 0.12.5","path-slash","pbkdf2 0.9.0","pijul-macros","rand 0.10.1","regex","sanakirja","serde","serde_derive","serde_json","sha2 0.9.9","tar","tempfile","thiserror 2.0.18","toml 0.8.23","twox-hash","zstd-seekable",
][[package]]name = "pijul-identity"version = "1.0.0-beta.12"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "f1fc424f2842866a9c434fedc515b92bdf0fe50c2b2ce39c27fa527975612717"dependencies = ["anyhow","dirs-next","jiff","keyring","log","pijul-config","pijul-core","pijul-interaction","serde","serde_derive","serde_json","thiserror 2.0.18","thrussh-keys","toml 0.8.23","validator","whoami 1.6.1",
name = "pijul-interaction"version = "1.0.0-beta.12"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "4787fd3e8d03df9f8e159690460d166a9a0578f5196b48cf5d81cc9b6253abcf"dependencies = ["dialoguer","duplicate","indicatif","log","pijul-config","thiserror 2.0.18",][[package]]
"syn 2.0.111",
"syn 2.0.117",][[package]]name = "pijul-remote"version = "1.0.0-beta.14"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "1161f7ea30608aa03ba7b3db3eaa91e810e33679dfa9b7dd74599b37d02e4b03"dependencies = ["anyhow","byteorder","bytes","dirs-next","futures","futures-util","keyring","log","pijul-config","pijul-core","pijul-identity","pijul-interaction","pijul-repository","regex","reqwest","sanakirja","serde","serde_derive","serde_json","thrussh","thrussh-config","thrussh-keys","tokio","url",][[package]]name = "pijul-repository"version = "1.0.0-beta.13"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "74d10e411ec5dc5255614df7f569068a5f9056c439beffd3c790041ed95451af"dependencies = ["anyhow","log","pijul-config","pijul-core","rlimit","toml 0.8.23",
name = "postgres-openssl"version = "0.5.2"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "8f86f073ad570f76e9e278ce6f05775fc723eed7daa6b4f9c2aa078080a564a0"dependencies = ["openssl","tokio","tokio-openssl","tokio-postgres",][[package]]
"rand_core 0.9.3",
"rand_core 0.9.5",][[package]]name = "rand"version = "0.10.1"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207"dependencies = ["chacha20","getrandom 0.4.2","rand_core 0.10.1",
"bitflags 2.10.0",
"bitflags 2.12.1",][[package]]name = "redox_syscall"version = "0.8.1"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "5b44b894f2a6e36457d665d1e08c3866add6ed5e70050c1b4ba8a8ddedb02ce7"dependencies = ["bitflags 2.12.1",
"indexmap","lazy_static","libc","libpijul","log","memmap","openssl","postgres-openssl","postgres-types","rand 0.9.2","regex",
"pijul-core",
"h2","http","http-body","http-body-util","hyper","hyper-rustls",
"futures-util","h2 0.3.27","http 0.2.12","http-body 0.4.6","hyper 0.14.32",
"bitflags 2.10.0","core-foundation",
"bitflags 2.12.1","core-foundation 0.9.4","core-foundation-sys","libc","security-framework-sys",][[package]]name = "security-framework"version = "3.7.0"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d"dependencies = ["bitflags 2.12.1","core-foundation 0.10.1",
"windows-sys 0.60.2",
"windows-sys 0.52.0",][[package]]name = "socket2"version = "0.6.4"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51"dependencies = ["libc","windows-sys 0.61.2",
name = "validator"version = "0.20.0"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "43fb22e1a008ece370ce08a3e9e4447a910e92621bb49b85d6e48a45397e7cfa"dependencies = ["idna","once_cell","regex","serde","serde_derive","serde_json","url",][[package]]
name = "wasm-streams"version = "0.4.2"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"dependencies = ["futures-util","js-sys","wasm-bindgen","wasm-bindgen-futures","web-sys",][[package]]
[[package]]name = "windows-registry"version = "0.6.1"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720"dependencies = ["windows-link","windows-result","windows-strings",]
name = "wit-bindgen"version = "0.46.0"
name = "winnow"version = "1.0.3"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "0592e1c9d151f854e6fd382574c3a0855250e1d9b2f99d9281c6e6391af352f1"[[package]]name = "winreg"version = "0.50.0"