From 1f3a751a90ac0ec1c73ed68f190f30bf70c3b5eb Mon Sep 17 00:00:00 2001 From: talwat <83217276+talwat@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:09:44 +0100 Subject: [PATCH] fix: make audio waiter actually work --- src/audio.rs | 29 +-------------------------- src/audio/waiter.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++ src/error.rs | 3 +++ src/message.rs | 3 --- src/player.rs | 36 ++++++++++++--------------------- src/tracks/list.rs | 4 ++++ 6 files changed, 70 insertions(+), 54 deletions(-) create mode 100644 src/audio/waiter.rs diff --git a/src/audio.rs b/src/audio.rs index fa5ee94..3818419 100644 --- a/src/audio.rs +++ b/src/audio.rs @@ -1,11 +1,4 @@ -use std::{ - sync::{atomic::AtomicBool, Arc}, - thread::sleep, - time::Duration, -}; - -use rodio::Sink; -use tokio::sync::mpsc; +pub mod waiter; /// This gets the output stream while also shutting up alsa with [libc]. /// Uses raw libc calls, and therefore is functional only on Linux. @@ -47,23 +40,3 @@ pub fn silent_get_output_stream() -> eyre::Result, tx: mpsc::Sender) -> crate::Result<()> { - loop { - if Arc::strong_count(&sink) == 1 { - break Ok(()); - } - - sleep(Duration::from_millis(100)); - sink.sleep_until_end(); - - if LISTEN.load(std::sync::atomic::Ordering::Relaxed) { - tx.blocking_send(crate::Message::Next)?; - } - } -} diff --git a/src/audio/waiter.rs b/src/audio/waiter.rs new file mode 100644 index 0000000..7124484 --- /dev/null +++ b/src/audio/waiter.rs @@ -0,0 +1,49 @@ +use std::{sync::Arc, thread::sleep, time::Duration}; + +use rodio::Sink; +use tokio::{ + sync::{broadcast, mpsc}, + task::{self, JoinHandle}, +}; + +use crate::ui::{self, Update}; + +pub struct Handle { + _task: JoinHandle<()>, +} + +impl Handle { + pub fn new( + sink: Arc, + tx: mpsc::Sender, + rx: broadcast::Receiver, + ) -> Self { + Self { + _task: task::spawn_blocking(|| Self::waiter(sink, tx, rx)), + } + } + + fn waiter( + sink: Arc, + tx: mpsc::Sender, + mut rx: broadcast::Receiver, + ) { + 'main: loop { + if !matches!(rx.blocking_recv(), Ok(Update::Track(_))) { + continue; + } + + while !sink.empty() { + if matches!(rx.try_recv(), Ok(Update::Quit)) { + break 'main; + } + + sleep(Duration::from_millis(8)); + } + + if let Err(_) = tx.try_send(crate::Message::Next) { + break; + }; + } + } +} diff --git a/src/error.rs b/src/error.rs index e198752..92fd3e3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -46,4 +46,7 @@ pub enum Error { #[error("ui failure")] UI(#[from] ui::Error), + + #[error("join error")] + JoinError(#[from] tokio::task::JoinError), } diff --git a/src/message.rs b/src/message.rs index 8be1224..c704625 100644 --- a/src/message.rs +++ b/src/message.rs @@ -5,9 +5,6 @@ pub enum Message { /// Deliberate user request to go to the next song. Next, - /// Sent by the audio waiter whenever it believes a track has ended. - End, - /// When a track is loaded after the caller previously being told to wait. /// If a track is taken from the queue, then there is no waiting, so this /// is never actually sent. diff --git a/src/player.rs b/src/player.rs index a0216f3..67f0e4b 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,15 +1,12 @@ use std::sync::Arc; -use tokio::{ - sync::{ - broadcast, - mpsc::{self, Receiver}, - }, - task::JoinHandle, +use tokio::sync::{ + broadcast, + mpsc::{self, Receiver, Sender}, }; use crate::{ - audio, + audio::waiter, bookmark::Bookmarks, download::{self, Downloader}, tracks::{self, List}, @@ -38,14 +35,15 @@ pub struct Player { broadcast: broadcast::Sender, current: Current, ui: ui::Handle, - waiter: JoinHandle>, + _tx: Sender, + _waiter: waiter::Handle, _stream: rodio::OutputStream, } impl Drop for Player { fn drop(&mut self) { self.sink.stop(); - self.waiter.abort(); + self.broadcast.send(ui::Update::Quit).unwrap(); } } @@ -88,25 +86,20 @@ impl Player { let list = List::load(args.track_list.as_ref()).await?; let state = ui::State::initial(sink.clone(), &args, current.clone(), list.name.clone()); - let ui = ui::Handle::init(tx.clone(), urx, state.clone(), &args).await?; let volume = PersistentVolume::load().await?; sink.set_volume(volume.float()); - let bookmarks = Bookmarks::load().await?; - let downloader = Downloader::init(args.buffer_size, list, tx.clone()).await; - - let clone = sink.clone(); - let waiter = tokio::task::spawn_blocking(move || audio::waiter(clone, tx)); Ok(Self { + downloader: Downloader::init(args.buffer_size, list, tx.clone()).await, + ui: ui::Handle::init(tx.clone(), urx.resubscribe(), state.clone(), &args).await?, + _waiter: waiter::Handle::new(sink.clone(), tx.clone(), urx), + bookmarks: Bookmarks::load().await?, current, - downloader, broadcast: utx, rx, sink, - bookmarks, - ui, - waiter, + _tx: tx, _stream: stream, }) } @@ -129,21 +122,18 @@ impl Player { pub async fn run(mut self) -> crate::Result<()> { while let Some(message) = self.rx.recv().await { match message { - Message::Next | Message::Init | Message::Loaded | Message::End => { + Message::Next | Message::Init | Message::Loaded => { if message == Message::Next && self.current.loading() { continue; } - audio::playing(false); self.sink.stop(); - match self.downloader.track().await { download::Output::Loading(progress) => { self.set_current(Current::Loading(progress)).await?; } download::Output::Queued(queued) => { self.play(queued).await?; - audio::playing(true); } }; } diff --git a/src/tracks/list.rs b/src/tracks/list.rs index 196ebc0..ca31f47 100644 --- a/src/tracks/list.rs +++ b/src/tracks/list.rs @@ -94,6 +94,10 @@ impl List { x.to_owned() }; + if let Some(progress) = progress { + progress.store(100, Ordering::Relaxed); + } + let result = tokio::fs::read(path.clone()).await.track(x)?; result.into() } else {