K7NUNACPGSZU4TMH5OIPO74ANZZK2P2VVZMPJ46Q3NTZJIU4H6NAC AE4PHS6EEAG647L3MTTR6IKZJZGRTP6TGOUEBSTUYKKDFIOPF47QC WHBCEBCDWG6ZK57VRW2IPOVRR4AYOMNPAAVKAFXGRT7QEVQUBJ4AC MGOB3WXKVDFPE5PPKWC4EEST2CR4NQ7B2FASVYHO5JNNGXC5NNPAC FEHWT3MOW3STJXNBQWXSY6ZDSJIJWVU6EWCXA6KARH7DCYFNM5NQC SM5TELBWYEM67PHIK2RSGJ3E4TCUT2XLVEZIJGM2GEI45RHSBH6AC AW3IHN2SW4JYIT4Y26Q45BLD5RFJ3POVSTECR4GQ3ZQVB463EZVQC VR6XFOE7TKQQYWIE26QTRB2XWYCZTNBWZ32SB43M2JVOW6EN5KJQC LILEG6KRUBQN7AIFHDK5N3X4JPBOLK2ATIO5JGRU3AEG5CCIWBNAC G4U6J7TOHDXUL6ECIGJUSTQPURB54PC4VIAW2UGXDJPPFQLR64LQC P35HCXDOT4NKKPTM6OMKEVKBLB3VLEODS7LNKRG7777RLOFDGU2AC # pijul.ignore.pijul
let play ~audio t =let p, u = Promise.create () inEio.Stream.add t.evs @@ Req_play (audio, Promise.resolve u);Promise.await_exn plet stop t =let p, u = Promise.create () inEio.Stream.add t.evs @@ Req_stop (Promise.resolve u);Promise.await_exn p
module Parser = structlet frame ~sample_p ~channels ~size ~kind =let buf = Bigarray.Array1.create kind Bigarray.c_layout (channels * size) inlet p =let open Angstrom inlet sample = count channels sample_p inlet rec samples ?(i = 0) () =end_of_input >>| (fun () -> `eof) <|> (sample >>| fun s -> `sample s)>>= function| `eof -> return ()| `sample chans ->List.iteri (fun ci c -> buf.{i + ci} <- c) chans;samples ~i:(i + channels) ()insamples () >>| fun () -> bufinfun buf ->Angstrom.parse_bigstring ~consume:Angstrom.Consume.All p buf |> function| Ok frame -> frame| Error str -> failwith @@ "error parsing raw audio frame " ^ str
exception Invalid_input of string
let s16le ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.LE.any_int16 ~channels ~size:frame_size~kind:Bigarray.int16_signed
module Parser = structlet frame ~sample_p ~channels ~size ~kind =let buf = Bigarray.Array1.create kind Bigarray.c_layout (channels * size) inlet p =let open Angstrom inlet sample = count channels sample_p inlet rec samples ?(i = 0) () =end_of_input >>| (fun () -> `eof) <|> (sample >>| fun s -> `sample s)>>= function| `eof -> return ()| `sample chans ->List.iteri (fun ci c -> buf.{i + ci} <- c) chans;samples ~i:(i + channels) ()insamples () >>| fun () -> bufinfun buf ->Angstrom.parse_bigstring ~consume:Angstrom.Consume.All p buf |> function| Ok frame -> frame| Error str -> failwith @@ "error parsing raw audio frame " ^ str
let s16be ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.BE.any_int16 ~channels ~size:frame_size~kind:Bigarray.int16_signed
let s16le ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.LE.any_int16 ~channels ~size:frame_size~kind:Bigarray.int16_signed
let f32le ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.LE.any_float ~channels ~size:frame_size~kind:Bigarray.float32
let s16be ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.BE.any_int16 ~channels ~size:frame_size~kind:Bigarray.int16_signed
let f32be ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.BE.any_float ~channels ~size:frame_size~kind:Bigarray.float32
let f32le ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.LE.any_float ~channels ~size:frame_size~kind:Bigarray.float32
let f64le ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.LE.any_double ~channels ~size:frame_size~kind:Bigarray.float64
let f32be ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.BE.any_float ~channels ~size:frame_size~kind:Bigarray.float32
let f64be ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.BE.any_double ~channels ~size:frame_size~kind:Bigarray.float64end
let f64le ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.LE.any_double ~channels ~size:frame_size~kind:Bigarray.float64
let pcm_args =["-nostdin";"-analyzeduration";"0";"-f";"s16le";"-ar";"48000";"-ac";"2";"-flush_packets";"1";]
let f64be ?(channels = 2) ?(frame_size = 48000) =frame ~sample_p:Angstrom.BE.any_double ~channels ~size:frame_size~kind:Bigarray.float64end
let frame_reader () =let buf = Bigstringaf.create (Rtp._FRAME_SIZE * Rtp._CHANNELS * 2) inlet blen = Bigstringaf.length buf inlet parse_frame =let parser =Parser.s16le ~channels:Rtp._CHANNELS ~frame_size:Rtp._FRAME_SIZEinfun () -> parser bufinfun ic ->Lwt_io.read_into_exactly_bigstring ic buf 0 blen |> Lwt_result.catch>|= function| Ok () -> `Frame (parse_frame ())| Error End_of_file -> `Eof| Error exn -> `Exn exnlet writer = function| `Stream s ->let f oc =Lwt_pipe.read s >>= function| Some d -> (Lwt_io.write_from_exactly_bigstring oc d 0 (Bigstringaf.length d)|> Lwt_result.catch>|= function| Ok () -> true| _ -> false)| None -> Lwt_io.close oc >|= Fun.const falseinf| _ -> fun oc -> Lwt_io.close oc >|= Fun.const falselet close_input = function `Stream s -> Lwt_pipe.close_nonblock s | _ -> ()let create input =let cmd = "ffmpeg" inlet i =match input with| `Stream _ -> [ "-i"; "-" ]| `File path -> [ "-i"; path ]| `Url url -> [ "-i"; url ]inlet args = List.concat [ i; pcm_args; stdout_args ] inSwitch.run @@ fun sw ->let cmd_str = cmd :: args |> List.to_string ~sep:" " Fun.id inL.debug (fun m -> m "running cmd: %s" cmd_str);let init () = Eio_proc.spawn ~sw inlet stop proc = Eio_proc.close proc |> ignore inlet pull proc = None inlet p = Lwt_pipe.create () inlet p_p, u_p = Lwt.wait () inlet spawn () =let cmd_str = cmd :: args |> List.to_string ~sep:" " Fun.id inL.debug (fun m -> m "running cmd: %s" cmd_str);Lwt_process.with_process_full("", Array.of_list @@ (cmd :: args))(fun proc ->L.debug (fun m -> m "ffmpeg running with pid=%d" proc#pid);let read_logs () =Lwt_io.read ~count:0x100 proc#stderr >|= fun s ->`Logs (String.length s = 0)inlet read_frame () = frame_reader () proc#stdout inlet write () = writer input proc#stdin >|= fun r -> `Write r inlet rec poll' q =Lwt.nchoose_split q >>= function| [], [] -> Lwt_result.return ()| rs, ps -> (match handle ~out:ps rs with| Ok [] -> Lwt_result.return ()| Ok q -> poll' q| Error _ as e -> Lwt.return e)and handle ?(out = []) = function| [] -> Ok out| (`Write false | `Eof | `Logs true) :: xs -> handle ~out xs| `Logs false :: xs -> handle ~out:(read_logs () :: out) xs| `Frame frame :: xs ->if Lwt.is_sleeping p_p then Lwt.wakeup_later u_p (Ok p);let fwd = Lwt_pipe.write p frame >|= fun r -> `Fwd r inhandle ~out:(fwd :: out) xs| `Fwd true :: xs ->let r = read_frame () inhandle ~out:(r :: out) xs| `Fwd false :: _ ->L.warn (fun m -> m "receiving end of pipe closed, cleaning up...");List.iter Lwt.cancel out;let clean =Lwt_io.abort proc#stdin >>= fun () ->Lwt_io.abort proc#stdout >>= fun () ->Lwt_io.abort proc#stderr >|= fun () ->proc#terminate;`EofinOk [ clean ]| `Write true :: xs ->let w = write () inhandle ~out:(w :: out) xs| (`Exn _ as e) :: _ ->List.iter Lwt.cancel out;L.err (fun m -> m "boop %a" Error.pp e);Error ein
let stdout_args = [ "pipe:" ]
let res =let open Lwt_result.Syntax inlet* () =poll' [ read_frame (); write () ] >>= function| Error _ as e ->Lwt_pipe.close_nonblock p;close_input input;proc#close >|= Fun.const e| Ok () as o -> Lwt.return oinL.info (fun m -> m "closing the process");proc#close >|= function| Unix.WEXITED 0 when Lwt.is_sleeping p_p ->Error.msg "0 byte stream?"| Unix.WEXITED 0 -> Ok ()| Unix.WEXITED n -> Error.msgf "non 0 status code: %d" n| Unix.WSIGNALED n | Unix.WSTOPPED n ->L.warn (fun m -> m "stopped with code: %d" n);Ok ()in
let frame_reader () =let buf = Bigstringaf.create (Rtp._FRAME_SIZE * Rtp._CHANNELS * 2) inlet c_buf = Cstruct.of_bigarray buf inlet parse_frame =let parser =Parser.s16le ~channels:Rtp._CHANNELS ~frame_size:Rtp._FRAME_SIZEinfun () -> parser bufinfun flow ->match Eio.Flow.read_exact flow c_buf with| () -> Some (parse_frame ())| exception End_of_file -> None
res >|= fun r ->L.err (fun m -> m "oyy m8");match (Lwt.is_sleeping p_p, r) with| true, (Error _ as e) -> Lwt.wakeup_later u_p e| true, Ok () -> failwith "unreachable"| false, Error e -> L.err (fun m -> m "%s" (Error.to_string e))| false, Ok () -> ())inlet k =Lwt.catch spawn (fun e ->L.err (fun m -> m "spawn error: %a" Error.pp (`Exn e));Lwt.return_unit)inLwt_pipe.keep p k;Lwt.on_termination k (fun () ->L.err (fun m -> m "HALP");Lwt_pipe.close_nonblock p);p_p |> Lwt_result.map of_pipe *)
let process ~sw input =(* TODO @quartz55: better handle this for use cases like nix *)let cmd = "ffmpeg" inlet i =match input with| `Stream _ -> [ "-analyzeduration"; "0"; "-i"; "-" ]| `File path -> [ "-analyzeduration"; "-i"; path ]| `Url url -> [ "-i"; url ]inlet args = List.concat [ i; pcm_args; stdout_args ] inlet p, u = Promise.create () inlet spawn () =Switch.run @@ fun sw ->let cmd_str = cmd :: args |> List.to_string ~sep:" " Fun.id inL.debug (fun m -> m "running cmd: %s" cmd_str);let proc = Eio_proc.spawn ~sw cmd ~args inL.debug (fun m -> m "ffmpeg running with pid=%d" (Eio_proc.pid proc));let stdin, stderr, stdout =Eio_proc.(stdin proc, stderr proc, stdout proc)inlet read = frame_reader () inlet logs_thread =Fiber.fork_promise ~sw @@ fun () ->let logs_b = Buffer.create (80 * 40) inlet logs = Eio.Flow.buffer_sink logs_b inEio.Flow.copy stderr logs;Buffer.to_bytes logs_b |> Bytes.unsafe_to_stringinlet cleanup () =let status = Eio_proc.close proc inmatch Promise.await_exn logs_thread with| logs -> (status, logs)| exception exc ->let reason = Printexc.to_string exc inL.warn (fun m -> m "couldn't get ffmpeg logs: %s" reason);(status, Format.sprintf "<no logs>: reason: %s" reason)inlet write_thread () =match input with| `Stream flow -> Eio.Flow.copy flow stdin| _ -> Eio.Flow.close stdininFiber.fork ~sw write_thread;(* try to read at least one frame to check if valid input *)match read stdout with| Some f ->let done_p, done_u = Promise.create () inlet init () = true inlet pull first =(* TODO error handling *)if first then Some (f, false)else read stdout |> Option.map (fun f -> (f, false))inlet stop _ =let _ = cleanup () inif not @@ Promise.is_resolved done_p then Promise.resolve done_u ()inlet src = Streaming.Source.make ~init ~pull ~stop () inPromise.resolve_ok u src;Promise.await done_p;raise Exit| None ->let status, logs = cleanup () inlet status_str =match status with| Unix.WEXITED n -> "exited " ^ string_of_int n| Unix.WSIGNALED n -> "signaled " ^ string_of_int n| Unix.WSTOPPED n -> "stopped " ^ string_of_int ninlet exc =Invalid_input (Format.sprintf "status: %s logs: %s" status_str logs)inPromise.resolve_error u exc| exception exc ->let status, logs = cleanup () inlet status_str =match status with| Unix.WEXITED n -> "exited " ^ string_of_int n| Unix.WSIGNALED n -> "signaled " ^ string_of_int n| Unix.WSTOPPED n -> "stopped " ^ string_of_int ninlet exc =Invalid_input(Format.sprintf "exc: %s status: %s logs: %s"(Printexc.to_string exc) status_str logs)inPromise.resolve_error u excinFiber.fork ~sw (fun () -> try spawn () with Exit -> ());Promise.await_exn p |> Audio_stream.of_raw_s16
(* module Make (C : Mirage_clock.MCLOCK) = structmodule Platform (M : Alcotest_engine.Monad.S) = structlet time () = Duration.to_f @@ C.elapsed_ns ()let getcwd () = ""let stdout_isatty () = truelet stdout_columns () = Nonelet setup_std_outputs ?style_renderer:_ ?utf_8:_ () = ()(* Pre-4.07 doesn't support empty variant types. *)type file_descriptor = { empty : 'a. 'a }let log_trap_supported = falselet prepare_log_trap ~root:_ = assert falselet file_exists _ = assert falselet open_write_only _ = assert falselet close = function (fd : file_descriptor) -> fd.emptylet with_redirect = function (fd : file_descriptor) -> fd.emptylet home_directory () =Error (`Msg "Home directory not available for the MirageOS platform")endmodule Tester = Alcotest_engine.V1.Cli.Make (Platform) (Lwt)include Testerlet test_case_sync n s f = test_case n s (fun x -> Lwt.return (f x))let run_test fn args =let async_ex, async_waker = Lwt.wait () inlet handle_exn ex =Logs.debug (fun f -> f "Uncaught async exception: %a" Fmt.exn ex);if Lwt.state async_ex = Lwt.Sleep then Lwt.wakeup_exn async_waker exinLwt.async_exception_hook := handle_exn;Lwt_switch.with_switch (fun sw -> Lwt.pick [ fn sw args; async_ex ])let test_case n s f = test_case n s (run_test f)end *)
(* The testslet test_lowercase () =Alcotest.(check string) "same string" "hello!" (To_test.lowercase "hELLO!")let test_capitalize () =Alcotest.(check string) "same string" "World." (To_test.capitalize "world.")let test_str_concat () =Alcotest.(check string) "same string" "foobar" (To_test.str_concat ["foo"; "bar"])let test_list_concat () =Alcotest.(check (list int)) "same lists" [1; 2; 3] (To_test.list_concat [1] [2; 3])(* Run it *)let () =let open Alcotest inrun "Utils" ["string-case", [test_case "Lower case" `Quick test_lowercase;test_case "Capitalization" `Quick test_capitalize;];"string-concat", [ test_case "String mashing" `Quick test_str_concat ];"list-concat", [ test_case "List mashing" `Slow test_list_concat ];] *)