2GEILJZTYWEJEMN4PWFQYLQXZZ44BVCJFM5XW35EUGS3GQZB3CPQC
static ref ACTORS: Arc<Mutex<HashMap<Uuid, Arc<Mutex<Actor>>>>> = Arc::new(Mutex::new(HashMap::new()));
static ref SPAWN_REQUESTS: Arc<Mutex<Vec<(Uuid, SpawnActor)>>> = Arc::new(Mutex::new(Vec::new()));
static ref ACTORS: Arc<Mutex<HashMap<Uuid, UnboundedSender<Message>>>> = Arc::new(Mutex::new(HashMap::new()));
async fn spawn_actor(executable: PathBuf, block: bool) -> anyhow::Result<Uuid> {
if !executable.is_file() {
bail!("{} is not a file", executable.display());
async fn spawn_actor(
spawn_actor: SpawnActor,
block: bool,
tx: UnboundedSender<(Uuid, SpawnActor)>,
) -> anyhow::Result<Uuid> {
if !spawn_actor.executable.is_file() {
bail!("{} is not a file", spawn_actor.executable.display());
let init_message = Message {
to: uuid,
from: None,
payload: serde_json::json!({
"id": uuid,
}),
};
let msg = serde_json::to_string(&init_message)?;
stdin
.write_all(msg.as_bytes())
.await
.expect("Failed to write to child stdin");
spawn(async move {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Message>();
spawn(async move {
while let Some(message) = rx.recv().await {
let msg = serde_json::to_string(&message).unwrap();
stdin
.write_all(format!("{msg}\n").as_bytes())
.await
.expect("Failed to write to child stdin");
stdin.flush().await.expect("Failed to flush child stdin");
}
});
let init_message = Message {
to: uuid,
from: None,
payload: serde_json::json!({
"id": uuid,
}),
};
tx.send(init_message)
.expect("Failed to write to child stdin");
spawn(async {
let mut spawn_requests = SPAWN_REQUESTS.lock().await;
loop {
if let Some((uuid, request)) = spawn_requests.pop() {
let new_uid = spawn_actor(request.executable, false).await.unwrap();
let message = Message {
from: None,
to: uuid,
payload: serde_json::json!({
"type": "spawned",
"status": "ok",
"id": new_uid,
}),
};
let actors = ACTORS.lock().await;
let mut actor = actors.get(&uuid).unwrap().lock().await;
actor
.stdin
.write_all(serde_json::to_string(&message).unwrap().as_bytes())
.await
.unwrap();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(Uuid, SpawnActor)>();
{
let tx = tx.clone();
spawn(async move {
loop {
if let Some((uuid, request)) = rx.recv().await {
match spawn_actor(request, false, tx.clone()).await {
Ok(new_uid) => {
let message = Message {
from: None,
to: uuid,
payload: serde_json::json!({
"type": "spawned",
"status": "ok",
"id": new_uid,
}),
};
let actors = ACTORS.lock().await;
let actor = actors.get(&uuid).unwrap();
actor.send(message).unwrap();
}
Err(e) => {
let message = Message {
from: None,
to: uuid,
payload: serde_json::json!({
"type": "spawned",
"status": "error",
"error": format!("{e}"),
}),
};
let actors = ACTORS.lock().await;
let actor = actors.get(&uuid).unwrap();
actor.send(message).unwrap();
}
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
0{"type":"Message","from":null,"to":"35c87090-2182-4d5a-9a64-6cd105722066","payload":{"id":"35c87090-2182-4d5a-9a64-6cd105722066"}}
{"type":"Message","from":null,"to":"35c87090-2182-4d5a-9a64-6cd105722066","payload":{"id":"35c87090-2182-4d5a-9a64-6cd105722066"}}
{"type":"Message","from":"35c87090-2182-4d5a-9a64-6cd105722066","to":"35c87090-2182-4d5a-9a64-6cd105722066","payload":{"status":"ok","count":1}}