VR6XFOE7TKQQYWIE26QTRB2XWYCZTNBWZ32SB43M2JVOW6EN5KJQC (* | Some ("join", vchan) -> (let guild_id = Option.get_exn guild_id inlet vchan = M.Snowflake.of_string vchan inlet voice = Client.voice client inlet call = Voice.Manager.get ~guild_id voice inVoice.Call.join call ~channel_id:vchan >>= function| Error e ->let msg =Msg.fmt "⚠️ Couldn't join voice channel: %a" Voice.Error.pp einClient.send_message channel_id msg client| Ok () -> Lwt.return_unit)| Some ("leave", _) ->let guild_id = Option.get_exn guild_id inlet voice = Client.voice client inlet call = Voice.Manager.get ~guild_id voice inVoice.Call.leave call *)
| Some ("join", vchan) -> (let guild_id = Option.get_exn guild_id inlet vchan = M.Snowflake.of_string vchan inlet voice = Client.voice client inlet call = Voice.Manager.get ~guild_id voice intry Voice.Call.join call ~channel_id:vchanwith exn ->let msg =Msg.fmt "⚠️ Couldn't join voice channel: %s"(Printexc.to_string exn)inClient.send_message channel_id msg client)| Some ("leave", _) ->let guild_id = Option.get_exn guild_id inlet voice = Client.voice client inlet call = Voice.Manager.get ~guild_id voice inVoice.Call.leave call
(* open! Disco_core.Globalsopen Lwt.Infixmodule Snowflake = Disco_models.Snowflakemodule E = Disco_core.Eventsmodule Gateway = Disco_core.Gatewaymodule F = Relog.Field
open! Disco_core.Globalsmodule Snowflake = Disco_models.Snowflakemodule E = Disco_core.Eventsmodule Gateway = Disco_core.Gatewaymodule F = Relog.Fieldmodule L = (val Relog.logger ~namespace:__MODULE__ ())
and gw_update = Srv of srv_info | Sess of sess_info | Dc
let wait_for_session conn =let b = Backoff.create () inlet rec loop () =match Atomic.get conn with| Live session -> session| Init ({ req = o_req; _ } as init) as st ->let p, u = Promise.create () inifAtomic.compare_and_set conn st(Init{init withreq =(fun res ->o_req res;Promise.resolve u ());})then Promise.await pelse Backoff.once b;loop ()| Detached o_req as st ->let p, u = Promise.create () inifAtomic.compare_and_set conn st(Detached(fun res ->o_req res;Promise.resolve u ()))then Promise.await pelse Backoff.once b;loop ()inloop ()
and conn_info = Empty | Got_srv of srv_info | Got_sess of sess_info
let manage ~net ~dmgr t =Switch.run @@ fun sw ->let user_id = (Gateway.user t.gw).id inlet connect ~srv ~sess =Session.create ~sw ~net ~guild_id:t.guild_id ~user_id~channel_id:sess.channel_id ~session_id:sess.session_id ~token:srv.tokensrv.endpointinlet mixer_thread () =Switch.on_release sw (fun () -> Mixer.destroy t.mixer);Eio.Domain_manager.run dmgr @@ fun () ->let play i f =let s = wait_for_session t.conn inSession.start_speaking s;Session.send_rtp s f;i < 15inlet stop () = wait_for_session t.conn |> Session.stop_speaking inMixer.run ~play ~stop t.mixerinlet b = Backoff.create () inlet rec main_thread () =let ev = Eio.Stream.take t.evs inhandle evand cas ~ev ost nst fn =if Atomic.compare_and_set t.conn ost nst then fn ()else (Backoff.once b;handle ev)and handle ev =match (Atomic.get t.conn, ev) with| (Detached o_req as st), Req_join (cid, n_req) ->let req res =o_req res;n_req resincas ~ev st (Init { cid; req; info = Empty }) @@ fun () ->(* TODO timeout *)L.info (fun m -> m "join request while detached");Gateway.send_voice_state_update t.gw ~channel_id:cid ~self_mute:t.muted~self_deaf:t.deafened t.guild_id;main_thread ()| Detached _, (Gw_dc | Gw_sess _ | Gw_srv _) ->L.warn (fun m -> m "gateway event on detached call");main_thread ()| Init { info = Got_sess sess; req; _ }, Gw_srv srv| Init { info = Got_srv srv; req; _ }, Gw_sess sess ->(* any race conditions with mixer that require a CAS & extra state ? *)let res =match connect ~srv ~sess with| sess ->Atomic.set t.conn @@ Live sess;Ok ()| exception exn ->Atomic.set t.conn @@ Detached stub;Error exninreq res;main_thread ()| (Init ({ info = Got_sess _ | Empty; _ } as init) as st), Gw_sess sess ->cas ~ev st (Init { init with info = Got_sess sess }) @@ main_thread| (Init ({ info = Got_srv _ | Empty; _ } as init) as st), Gw_srv srv ->cas ~ev st (Init { init with info = Got_srv srv }) @@ main_thread| (Init { req; _ } as st), Gw_dc ->cas ~ev st (Detached stub) @@ fun () -> req @@ Error Detached| (Init ({ cid; req = o_req; _ } as init) as st), Req_join (n_cid, n_req)when Snowflake.(cid = n_cid) ->let req res =o_req res;n_req resincas ~ev st (Init { init with req }) @@ main_thread| (Init ({ req = o_req; _ } as init) as st), Req_join _ ->let req res =o_req res;Eio.Stream.add t.evs evincas ~ev st (Init { init with req }) @@ main_thread| Live sess, Req_join (cid, req)when Snowflake.(cid = Session.channel_id sess) ->req @@ Ok ()| (Live sess as st), Req_join _ ->cas ~ev st (Detached stub) @@ fun () ->L.info (fun m -> m "join request while live, switching channels");Session.disconnect sess;handle ev| (Live sess as st), Gw_sess nsesswhen Snowflake.(Session.channel_id sess <> nsess.channel_id)|| String.(Session.session_id sess <> nsess.session_id) ->let srv = Session.{ token = token sess; endpoint = endpoint sess } incas ~ev st(Init { req = stub; cid = nsess.channel_id; info = Got_srv srv })@@ fun () ->L.warn (fun m -> m "new voice session info, reconnecting");Session.disconnect sess;handle ev| (Live sess as st), Gw_dc ->cas ~ev st (Detached stub) @@ fun () ->L.info (fun m -> m "disconnected");Session.disconnect sess;main_thread ()| Live _, (Gw_sess _ | Gw_srv _) -> main_thread ()inFiber.both main_thread mixer_thread;Mixer.destroy t.mixer;raise Exit
let make ?(muted = false) ?(deafened = false) ~guild_id gw =let ( let+ ) = Result.( let+ ) inlet op_rx = Lwt_pipe.create () inlet mixer_rx = Lwt_pipe.create () inlet+ mixer = Mixer.create (Lwt_pipe.write mixer_rx) inlet t = { gw; guild_id; muted; deafened; tx = op_rx; mixer } in
let make ~sw ~net ~dmgr ?(muted = false) ?(deafened = false) ~guild_id gw =let t ={gw;guild_id;mixer = Mixer.create ();conn = Atomic.make @@ Detached stub;muted;deafened;evs = Eio.Stream.create max_int;}inFiber.fork ~sw (fun () -> try manage ~net ~dmgr t with Exit -> ());t
let read_op () =Lwt_pipe.read op_rx >|= Option.get_exn >|= function| Req r -> `Op r| Gw g -> `Gw ginlet _read_mixer () =Lwt_pipe.read mixer_rx >|= Option.get_exn >|= fun mix -> `Mixer mixin
let update_server ~token ~endpoint t =Eio.Stream.add t.evs @@ Gw_srv { token; endpoint }
let do_connect ~srv ~sess =let user_id = (Gateway.user t.gw).id inlet guild_id = t.guild_id inSession.create ~guild_id ~user_id ~channel_id:sess.channel_id~session_id:sess.session_id ~token:srv.token srv.endpointin
let update_session ?channel_id ~session_id t =let ev =match channel_id with| None -> Gw_dc| Some channel_id -> Gw_sess { channel_id; session_id }inEio.Stream.add t.evs ev
let run () =let conn = ref Detached inlet rec poll q =Lwt.nchoose_split q >>= function| [], [] -> Lwt_result.return ()| rs, ps -> (match handle ps rs with| Ok [] -> Lwt_result.return ()| Ok q -> poll q| Error _ as err -> Lwt.return err)and handle out rs =match (rs, !conn) with| [], _ -> Ok out| `Noop :: xs, _ -> handle out xs| `Cancel :: xs, Init { req; _ } ->Lwt.wakeup_later (snd req) @@ Error.msg "timed out";conn := Detached;handle out xs| `Cancel :: xs, _ -> handle out xs| `Connected (Ok session) :: xs, (Init { req; _ } | Connecting { req; _ })->conn := Live session;L.info (fun m -> m "connected");Lwt.wakeup_later (snd req) (Ok ());handle out xs| ( `Connected (Error _ as err) :: xs,(Init { req; _ } | Connecting { req; _ }) ) ->conn := Detached;L.err (fun m -> m "error connecting");Lwt.wakeup_later (snd req) err;handle out xs| `Connected (Ok session) :: xs, _ ->conn := Live session;handle out xs| `Connected (Error _) :: xs, _ ->conn := Detached;handle out xs| ( (`Op (Join (cid, nreq)) as op) :: xs,(Init { req; channel_id; _ } | Connecting { req; channel_id; _ }) ) ->L.info (fun m -> m "join request while connecting");if Snowflake.(cid = channel_id) then (Lwt.async (fun () ->fst req >|= fun res -> Lwt.wakeup_later (snd nreq) res);handle (read_op () :: out) xs)else handle (Lwt.return op :: read_op () :: out) xs| `Op (Join (channel_id, req)) :: xs, Live session ->L.info (fun m -> m "join request while connected");if Snowflake.(channel_id = Session.channel_id session) then (Lwt.wakeup_later (snd req) (Ok ());handle (read_op () :: out) xs)else (conn := Init { channel_id; req; info = Empty };let reconn =Session.disconnect session >>= fun () ->Gateway.send_voice_state_update t.gw ~channel_id~self_mute:t.muted ~self_deaf:t.deafened t.guild_id>|= fun () -> `Noopinhandle (reconn :: read_op () :: out) xs)| `Op (Join (channel_id, req)) :: xs, Detached ->L.info (fun m -> m "join request while detached");conn := Init { channel_id; req; info = Empty };let send =Gateway.send_voice_state_update t.gw ~channel_id ~self_mute:t.muted~self_deaf:t.deafened t.guild_id>|= fun () -> `Noopinlet timeout =Lwt.pick[(Lwt_unix.sleep 5. >|= fun () -> `Cancel);(fst req >|= fun _ -> `Noop);]inhandle (send :: timeout :: read_op () :: out) xs| `Gw Dc :: xs, Live session ->L.info (fun m -> m "disconnected");conn := Detached;let dc = Session.disconnect session >|= fun () -> `Noop inhandle (dc :: read_op () :: out) xs| `Gw (Srv srv) :: xs, Live s ->ifString.(Session.endpoint s <> srv.endpoint || Session.token s <> srv.token)then (L.warn (fun m -> m "new voice server info, reconnecting");let req = Lwt.wait () inlet sess ={session_id = Session.session_id s;channel_id = Session.channel_id s;}inconn := Connecting { channel_id = sess.channel_id; req };let reconn =Session.disconnect s >>= fun () ->do_connect ~srv ~sess >|= fun res -> `Connected resinhandle (reconn :: read_op () :: out) xs)else handle (read_op () :: out) xs| `Gw (Sess sess) :: xs, Live s ->ifSnowflake.(Session.channel_id s <> sess.channel_id)|| String.(Session.session_id s <> sess.session_id)then (L.warn (fun m -> m "new voice session info, reconnecting");let req = Lwt.wait () inlet srv ={ token = Session.token s; endpoint = Session.endpoint s }inconn := Connecting { channel_id = sess.channel_id; req };let reconn =Session.disconnect s >>= fun () ->do_connect ~srv ~sess >|= fun res -> `Connected resinhandle (reconn :: read_op () :: out) xs)else handle (read_op () :: out) xs| `Gw _ :: xs, Detached ->L.warn (fun m -> m "gateway event on detached call");handle (read_op () :: out) xs| (`Gw _ as ev) :: xs, Connecting _ ->L.info (fun m -> m "buffering gw event while connecting");handle (Lwt.return ev :: read_op () :: out) xs| `Gw (Srv srv) :: xs, Init { info = Got_sess sess; channel_id; req }| `Gw (Sess sess) :: xs, Init { info = Got_srv srv; channel_id; req } ->conn := Connecting { channel_id; req };let connect = do_connect ~srv ~sess >|= fun res -> `Connected res inhandle (connect :: read_op () :: out) xs| `Gw (Srv nsrv) :: xs, Init ({ info = Got_srv _ | Empty; _ } as init) ->conn := Init { init with info = Got_srv nsrv };handle (read_op () :: out) xs| `Gw (Sess nsess) :: xs, Init ({ info = Got_sess _ | Empty; _ } as init)->conn := Init { init with info = Got_sess nsess };handle (read_op () :: out) xs| `Gw Dc :: xs, Init { req; _ } ->Lwt.wakeup_later (snd req) (Error.msg "dc'ed when connecting?");handle (read_op () :: out) xsinpoll [ read_op () ]inlet evloop =run () >|= function| Ok () -> ()| Error e -> L.err (fun m -> m "call evloop crashed: %a" Error.pp e)inLwt_pipe.keep op_rx evloop;Lwt.on_termination evloop (fun () ->Lwt_pipe.close_nonblock op_rx;Mixer.destroy mixer;Lwt_pipe.close_nonblock mixer_rx);tlet update_server ~token ~endpoint t =Lwt_pipe.write_exn t.tx @@ Gw (Srv { token; endpoint })let update_session ?channel_id ~session_id t =let upd =match channel_id with| None -> Dc| Some channel_id -> Sess { channel_id; session_id }inLwt_pipe.write_exn t.tx @@ Gw upd
let join ~channel_id t =let p, u = Promise.create () inEio.Stream.add t.evs @@ Req_join (channel_id, Promise.resolve u);Promise.await_exn p
let join ~channel_id t =let req = Lwt.wait () inLwt_pipe.write_exn t.tx @@ Req (Join (channel_id, req)) >>= fun () -> fst reqlet leave t =Lwt_pipe.write t.tx @@ Gw Dc >>= fun _ ->Gateway.send_voice_state_update t.gw ~self_mute:t.muted ~self_deaf:t.deafenedt.guild_id *)
let leave t =Eio.Stream.add t.evs Gw_dc;Gateway.send_voice_state_update t.gw ~self_mute:t.muted ~self_deaf:t.deafenedt.guild_id
(* open! Disco_core.Globalsmodule Snowflake = Disco_models.Snowflakemodule E = Disco_core.Eventsmodule Sf_map = Map.Make (Snowflake)module Gateway = Disco_core.Gatewaymodule L = (val Relog.logger ~namespace:__MODULE__ ())module F = Relog.Field
open! Disco_core.Globalsmodule Snowflake = Disco_models.Snowflakemodule E = Disco_core.Eventsmodule Sf_map = Map.Make (Snowflake)module Gateway = Disco_core.Gatewaymodule L = (val Relog.logger ~namespace:__MODULE__ ())module F = Relog.Field
let server_update t srv =Eio_mutex.with_ t.mtx @@ fun () ->L.info (fun m -> m "srv update");Sf_map.get srv.E.Voice_server_update.guild_id t.calls|> Option.iter (fun call ->Call.update_server ~token:srv.token ~endpoint:srv.endpoint call)
let server_update t srv =Eio_mutex.with_ t.mtx @@ fun () ->L.info (fun m -> m "srv update");Sf_map.get srv.E.Voice_server_update.guild_id t.calls|> Option.iter (fun call ->Call.update_server ~token:srv.token ~endpoint:srv.endpoint call)
let state_update t vst =Eio_mutex.with_ t.mtx @@ fun () ->L.info (fun m -> m "state update");match vst.E.Voice_state.guild_id with| Some guild_id when Snowflake.(user_id t = vst.user_id) ->Sf_map.get guild_id t.calls|> Option.iter (fun call ->Call.update_session ?channel_id:vst.channel_id~session_id:vst.session_id call)| _ -> ()
let state_update t vst =Eio_mutex.with_ t.mtx @@ fun () ->L.info (fun m -> m "state update");match vst.E.Voice_state.guild_id with| Some guild_id when Snowflake.(user_id t = vst.user_id) ->Sf_map.get guild_id t.calls|> Option.iter (fun call ->Call.update_session ?channel_id:vst.channel_id~session_id:vst.session_id call)| _ -> ()
let make gw =let t = { mtx = Eio_mutex.make (); calls = Sf_map.empty; gw } inGateway.sub gw ~fn:(function| E.Voice_server_update v -> server_update t v| Voice_state_update v -> state_update t| _ -> ());t
let make ~env ~sw gw =let net = Eio.Stdenv.net env inlet dmgr = Eio.Stdenv.domain_mgr env inlet evs = Eio.Stream.create 0 inlet poison () = Eio.Stream.add evs `Poison inlet rec t ={mtx = Eio_mutex.make ();calls = Sf_map.empty;gw;spawn;poison;dead = false;}and spawn guild_id =let p, u = Promise.create () inEio.Stream.add evs @@ `Spawn (guild_id, Promise.resolve u);Promise.await_exn pinGateway.sub gw ~fn:(function| E.Voice_server_update v -> server_update t v| Voice_state_update v -> state_update t v| _ -> ());let run () =Switch.run @@ fun sw ->let rec loop () =match Eio.Stream.take evs with| `Poison ->Eio_mutex.with_ t.mtx @@ fun () ->Sf_map.to_seq t.calls|> Seq.map (fun (_, call) () -> Call.leave call)|> Seq.to_list |> Fiber.all;t.calls <- Sf_map.empty| `Spawn (guild_id, req) ->Eio_mutex.with_ t.mtx (fun () ->match Sf_map.get guild_id t.calls with| Some call -> req @@ Ok call| None ->let res =Result.guard @@ fun () ->Call.make ~sw ~net ~dmgr ~guild_id gwinResult.iter(fun call -> t.calls <- Sf_map.add guild_id call t.calls)res;req res);loop ()inloop ();raise ExitinFiber.fork ~sw (fun () -> try run () with Exit -> ());t
let get t ~guild_id =Eio_mutex.with_ t.mtx @@ fun () ->match Sf_map.get guild_id t.calls with| Some call -> call| None ->let call = Call.make t.gw ~guild_id |> Result.get_exn int.calls <- Sf_map.add guild_id call t.calls;call *)
let get t ~guild_id =Eio_mutex.lock t.mtx;match Sf_map.get guild_id t.calls with| Some call ->Eio_mutex.unlock t.mtx;call| None ->Eio_mutex.unlock t.mtx;t.spawn guild_id
type pcm_f32_frame =(float, Bigarray.float32_elt, Bigarray.c_layout) Bigarray.Array1.t
let spin_sleep ?(acc = 100_000L) dur =let b = Backoff.create () inlet elapsed =let start = Mtime_clock.now () infun () -> Mtime.span start (Mtime_clock.now ()) |> Mtime.Span.to_uint64_nsin(if Int64.(dur > acc) thenlet init_s = Int64.(to_float (dur - acc)) /. 1e9 inUnix.sleepf init_s);while Int64.(elapsed () < dur) doBackoff.once bdone
and state = Idle | Playing of streamand stream = { waker : waker; rx : bigstring Lwt_pipe.Reader.t }and waker = bool Lwt.t * bool Lwt.u
and 'a res = ('a, exn) result -> unitand state = Idle | Playing of streamand stream = { waker : bool -> unit; rx : Audio_stream.t }
let waker : unit -> waker = Lwt.tasklet make_silence ?n () = Audio_stream.n_silence_pipe ?n ()let create () = { state = Idle; silence = None; dead = false }let now () = Mtime_clock.elapsed_ns ()let playing t = match t.state with Playing s -> Some s | _ -> Nonelet is_playing t = match t.state with Playing _ -> true | _ -> false
let make_silence ?n () = Audio_stream.silence_frames ?n ()
let destroy_active t =match t.state with| Playing s ->Lwt.wakeup_later (snd s.waker) false;t.state <- Idle| Idle -> ()
let create () ={reqs = Lf_queue.create ();state = Idle;silence = None;dead = Atomic.make false;wakeup = Atomic.make None;}
let stop = destroy_active
let wakeup t =let b = Backoff.create () inlet rec loop () =match Atomic.get t.wakeup with| None -> ()| Some w as waker when Atomic.compare_and_set t.wakeup waker None -> w ()| Some _ ->Backoff.once b;loop ()inloop ()
let play ~s t =match t.state with| Playing os ->Lwt.wakeup_later (snd os.waker) false;if Option.is_none t.silence then t.silence <- Some (make_silence ());t.state <- Playing s| Idle -> t.state <- Playing s
let check t = if Atomic.get t.dead then raise Destroyed
let run ~yield ~play ~stop t =let rec exhaust ?(i = 0) p =Lwt_pipe.read p >>= function| Some frame ->play (i + 1) frame >>= fun ok ->let i = i + 1 inif ok then exhaust ~i p else Lwt.return (`Yield i)| None -> Lwt.return (`Closed i)inlet framelen = float Rtp._FRAME_LEN /. 1e3 inlet schedule_next ?drift ?(frames = 1) () =let timeout = float frames *. framelen inlet with_drift d =let delta = Int64.(now () - d |> to_float) /. 1e9 intimeout -. Float.min delta timeoutinlet timeout =Option.map with_drift drift |> Option.get_or ~default:timeoutinL.trace (fun m ->m "sent %d frames, next tick in %f seconds" frames timeout);Lwt_unix.sleep timeout >|= fun () -> `Tickinlet last_yield = ref None inlet last_tick = ref (now ()) inlet rec yield_till tick =let y =match !last_yield with| Some y -> y| None ->let y = yield t inlast_yield := Some y;yinLwt.choose [ tick; (y >|= fun () -> `Yield) ] >>= function| `Yield when t.dead ->Lwt.cancel tick;Lwt.return_unit| `Yield ->last_yield := None;yield_till tick| `Tick ->let t = now () inL.trace (fun m ->let delta = Int64.(t - !last_tick |> to_float) inm "tick +%2fms" (delta /. 1e6));last_tick := t;Lwt.return_unitinlet rec f' ?(drift = now ()) ?(i = 0) () =match (t.silence, t.state) with| _ when t.dead ->destroy_active t;Lwt.return_unit| Some s, st -> (exhaust ~i s >>= function| `Closed i ->t.silence <- None;(match st with Idle -> stop () | _ -> Lwt.return_unit)>>= f' ~drift ~i| `Yield frames -> yield_till (schedule_next ~drift ~frames ()) >>= f')| None, Idle ->(match !last_yield with| Some y ->last_yield := None;y| None -> yield t)>>= f'| None, Playing s -> (exhaust ~i s.rx >>= function| `Closed i ->t.silence <- Some (make_silence ());t.state <- Idle;Lwt.wakeup_later (snd s.waker) true;f' ~drift ~i ()| `Yield frames ->yield_till (schedule_next ~drift ~frames ()) >>= fun () -> f' ())inf' ()end
let now_playing t =let p, u = Promise.create () inreq t (Now_playing (Promise.resolve u));Promise.await_exn plet is_playing t = Option.is_some @@ now_playing tlet stop t =let p, u = Promise.create () inreq t (Stop (Promise.resolve u));Promise.await_exn p
type t = { driver : Driver.t; evloop_tx : evloop_msg Lwt_pipe.Writer.t }and evloop_msg = Stop | Play of pcm_s16_frame Lwt_pipe.Reader.t * Driver.waker
let play ~s t =let p, u = Promise.create () inreq t (Play (s, Promise.resolve u));Promise.await_exn p
let create ?(burst = 15) out =let chan = Lwt_pipe.create () inlet evloop_tx = Lwt_pipe.Writer.map ~f:(fun msg -> `Req msg) chan inlet driver = Driver.create () inlet yield driver =Lwt_pipe.read chan >|= Option.get_or ~default:`Poison >|= function| `Req (Play (s, waker)) ->let s = Lwt_pipe.Reader.filter_map ~f:encode s inDriver.play driver ~s:{ waker; rx = s }| `Req Stop -> Driver.stop driver| `Poison -> Driver.destroy driverinlet play i frame = out (`Play frame) >|= fun r -> r && i < burst inlet stop () = out `Stop >|= ignore inlet f = Driver.run ~yield ~play ~stop driver inLwt_pipe.keep chan f;Lwt_pipe.link_close chan ~after:evloop_tx;Lwt.on_termination f (fun () -> Lwt_pipe.close_nonblock chan);{ driver; evloop_tx }
let destroy t =Atomic.set t.dead true;wakeup t
let play ?(k = fun ~status:_ s -> Lwt_pipe.close_nonblock s) t stream =let ((wp, _) as waker) = Driver.waker () inLwt.on_success wp (fun status -> k ~status stream);Lwt_pipe.write_exn t.evloop_tx (Play (stream, waker))
let destroy_active t =match t.state with| Playing s ->s.waker false;t.state <- Idle| Idle -> ()
let destroy t = Lwt_pipe.close_nonblock t.evloop_tx *)
let run ~play ~stop t =let now () = Mtime_clock.elapsed_ns () inlet rec exhaust ?(i = 0) src =Audio_stream.read src |> function| Some frame ->let i = i + 1 inif play (i + 1) frame then exhaust ~i src else `Yield i| None -> `Closed iinlet framelen = Int64.(of_int Rtp._FRAME_LEN * 100_000L) inlet schedule_next ?drift ?(frames = 1) () =let timeout = Int64.(of_int frames * framelen) inlet with_drift d =let delta = Int64.(now () - d) inInt64.(timeout - min delta timeout)inlet timeout =Option.map with_drift drift |> Option.get_or ~default:timeoutinL.trace (fun m ->m "sent %d frames, next tick in %Ld ns (%f s)" frames timeout(Int64.to_float timeout /. 1e9));spin_sleep timeoutinlet rec loop ?(drift = now ()) ?(i = 0) () =if Atomic.get t.dead then (destroy_active t;(* TODO @quartz55: cancel/cleanup queued requests (how?) *)raise Exit);handle_reqs ();process ~drift ~iand handle_reqs () =match Lf_queue.pop t.reqs with| Some (Play (s, res)) ->(match t.state with| Playing os ->os.waker false;if Option.is_none t.silence then t.silence <- Some (make_silence ());t.state <- Playing s| Idle -> t.state <- Playing s);res (Ok ());handle_reqs ()| Some (Stop res) ->destroy_active t;res (Ok ());handle_reqs ()| Some (Now_playing res) ->res @@ Ok (match t.state with Idle -> None | Playing s -> Some s);handle_reqs ()| None -> ()and process ~drift ~i =match (t.silence, t.state) with| Some s, st -> (exhaust ~i s |> function| `Closed i ->t.silence <- None;(match st with Idle -> stop () | _ -> ());loop ~drift ~i ()| `Yield frames ->schedule_next ~drift ~frames ();loop ())| None, Playing s -> (exhaust ~i s.rx |> function| `Closed i ->t.silence <- Some (make_silence ());t.state <- Idle;s.waker true;loop ~drift ~i ()| `Yield frames ->schedule_next ~drift ~frames ();loop ())| None, Idle ->assert (Option.is_none @@ Atomic.get t.wakeup);let p, u = Promise.create () inAtomic.set t.wakeup (Some (Promise.resolve u));Promise.await p;loop ()intry loop () with Exit -> ()
let guild_id { guild_id; _ } = guild_idlet user_id { user_id; _ } = user_idlet channel_id { channel_id; _ } = channel_idlet session_id { session_id; _ } = session_idlet token { token; _ } = tokenlet endpoint { endpoint; _ } = endpoint
let discord_addr = `Udp (Eio.Net.Ipaddr.of_raw ip, port) inlet sock = Eio.Net.datagram_socket ~sw net discord_addr inL.info (fun m ->m "discovering external ip using discord's voice server: %s:%d" ip port);
let ipaddr = Eio_unix.Ipaddr.of_unix @@ Unix.inet_addr_of_string ip inlet discord_addr = `Udp (ipaddr, port) inL.dbg (fun m ->m "discovering external ip using discord's voice server: %a"Eio.Net.Sockaddr.pp discord_addr);let sock =Eio.Net.datagram_socket ~sw net @@ `Udp (Eio.Net.Ipaddr.V4.any, 0)in
L.info (fun m -> m "discovered ip and port: %s:%d" ip_d port_d);let local_addr = `Udp (Eio.Net.Ipaddr.of_raw ip_d, port_d) in
L.dbg (fun m -> m "discovered ip and port: %s:%d" ip_d port_d);let ipaddr_d = Eio_unix.Ipaddr.of_unix @@ Unix.inet_addr_of_string ip_d inlet local_addr = `Udp (ipaddr_d, port_d) in