fix: make audio waiter actually work

This commit is contained in:
talwat 2025-12-02 14:09:44 +01:00
parent 20a87d5363
commit 1f3a751a90
6 changed files with 70 additions and 54 deletions

View File

@ -1,11 +1,4 @@
use std::{ pub mod waiter;
sync::{atomic::AtomicBool, Arc},
thread::sleep,
time::Duration,
};
use rodio::Sink;
use tokio::sync::mpsc;
/// This gets the output stream while also shutting up alsa with [libc]. /// This gets the output stream while also shutting up alsa with [libc].
/// Uses raw libc calls, and therefore is functional only on Linux. /// Uses raw libc calls, and therefore is functional only on Linux.
@ -47,23 +40,3 @@ pub fn silent_get_output_stream() -> eyre::Result<rodio::OutputStream, crate::Er
Ok(stream) Ok(stream)
} }
static LISTEN: AtomicBool = AtomicBool::new(false);
pub fn playing(status: bool) {
LISTEN.store(status, std::sync::atomic::Ordering::Relaxed);
}
pub fn waiter(sink: Arc<Sink>, tx: mpsc::Sender<crate::Message>) -> crate::Result<()> {
loop {
if Arc::strong_count(&sink) == 1 {
break Ok(());
}
sleep(Duration::from_millis(100));
sink.sleep_until_end();
if LISTEN.load(std::sync::atomic::Ordering::Relaxed) {
tx.blocking_send(crate::Message::Next)?;
}
}
}

49
src/audio/waiter.rs Normal file
View File

@ -0,0 +1,49 @@
use std::{sync::Arc, thread::sleep, time::Duration};
use rodio::Sink;
use tokio::{
sync::{broadcast, mpsc},
task::{self, JoinHandle},
};
use crate::ui::{self, Update};
pub struct Handle {
_task: JoinHandle<()>,
}
impl Handle {
pub fn new(
sink: Arc<Sink>,
tx: mpsc::Sender<crate::Message>,
rx: broadcast::Receiver<ui::Update>,
) -> Self {
Self {
_task: task::spawn_blocking(|| Self::waiter(sink, tx, rx)),
}
}
fn waiter(
sink: Arc<Sink>,
tx: mpsc::Sender<crate::Message>,
mut rx: broadcast::Receiver<ui::Update>,
) {
'main: loop {
if !matches!(rx.blocking_recv(), Ok(Update::Track(_))) {
continue;
}
while !sink.empty() {
if matches!(rx.try_recv(), Ok(Update::Quit)) {
break 'main;
}
sleep(Duration::from_millis(8));
}
if let Err(_) = tx.try_send(crate::Message::Next) {
break;
};
}
}
}

View File

@ -46,4 +46,7 @@ pub enum Error {
#[error("ui failure")] #[error("ui failure")]
UI(#[from] ui::Error), UI(#[from] ui::Error),
#[error("join error")]
JoinError(#[from] tokio::task::JoinError),
} }

View File

@ -5,9 +5,6 @@ pub enum Message {
/// Deliberate user request to go to the next song. /// Deliberate user request to go to the next song.
Next, Next,
/// Sent by the audio waiter whenever it believes a track has ended.
End,
/// When a track is loaded after the caller previously being told to wait. /// When a track is loaded after the caller previously being told to wait.
/// If a track is taken from the queue, then there is no waiting, so this /// If a track is taken from the queue, then there is no waiting, so this
/// is never actually sent. /// is never actually sent.

View File

@ -1,15 +1,12 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::{ use tokio::sync::{
sync::{ broadcast,
broadcast, mpsc::{self, Receiver, Sender},
mpsc::{self, Receiver},
},
task::JoinHandle,
}; };
use crate::{ use crate::{
audio, audio::waiter,
bookmark::Bookmarks, bookmark::Bookmarks,
download::{self, Downloader}, download::{self, Downloader},
tracks::{self, List}, tracks::{self, List},
@ -38,14 +35,15 @@ pub struct Player {
broadcast: broadcast::Sender<ui::Update>, broadcast: broadcast::Sender<ui::Update>,
current: Current, current: Current,
ui: ui::Handle, ui: ui::Handle,
waiter: JoinHandle<crate::Result<()>>, _tx: Sender<crate::Message>,
_waiter: waiter::Handle,
_stream: rodio::OutputStream, _stream: rodio::OutputStream,
} }
impl Drop for Player { impl Drop for Player {
fn drop(&mut self) { fn drop(&mut self) {
self.sink.stop(); self.sink.stop();
self.waiter.abort(); self.broadcast.send(ui::Update::Quit).unwrap();
} }
} }
@ -88,25 +86,20 @@ impl Player {
let list = List::load(args.track_list.as_ref()).await?; let list = List::load(args.track_list.as_ref()).await?;
let state = ui::State::initial(sink.clone(), &args, current.clone(), list.name.clone()); let state = ui::State::initial(sink.clone(), &args, current.clone(), list.name.clone());
let ui = ui::Handle::init(tx.clone(), urx, state.clone(), &args).await?;
let volume = PersistentVolume::load().await?; let volume = PersistentVolume::load().await?;
sink.set_volume(volume.float()); sink.set_volume(volume.float());
let bookmarks = Bookmarks::load().await?;
let downloader = Downloader::init(args.buffer_size, list, tx.clone()).await;
let clone = sink.clone();
let waiter = tokio::task::spawn_blocking(move || audio::waiter(clone, tx));
Ok(Self { Ok(Self {
downloader: Downloader::init(args.buffer_size, list, tx.clone()).await,
ui: ui::Handle::init(tx.clone(), urx.resubscribe(), state.clone(), &args).await?,
_waiter: waiter::Handle::new(sink.clone(), tx.clone(), urx),
bookmarks: Bookmarks::load().await?,
current, current,
downloader,
broadcast: utx, broadcast: utx,
rx, rx,
sink, sink,
bookmarks, _tx: tx,
ui,
waiter,
_stream: stream, _stream: stream,
}) })
} }
@ -129,21 +122,18 @@ impl Player {
pub async fn run(mut self) -> crate::Result<()> { pub async fn run(mut self) -> crate::Result<()> {
while let Some(message) = self.rx.recv().await { while let Some(message) = self.rx.recv().await {
match message { match message {
Message::Next | Message::Init | Message::Loaded | Message::End => { Message::Next | Message::Init | Message::Loaded => {
if message == Message::Next && self.current.loading() { if message == Message::Next && self.current.loading() {
continue; continue;
} }
audio::playing(false);
self.sink.stop(); self.sink.stop();
match self.downloader.track().await { match self.downloader.track().await {
download::Output::Loading(progress) => { download::Output::Loading(progress) => {
self.set_current(Current::Loading(progress)).await?; self.set_current(Current::Loading(progress)).await?;
} }
download::Output::Queued(queued) => { download::Output::Queued(queued) => {
self.play(queued).await?; self.play(queued).await?;
audio::playing(true);
} }
}; };
} }

View File

@ -94,6 +94,10 @@ impl List {
x.to_owned() x.to_owned()
}; };
if let Some(progress) = progress {
progress.store(100, Ordering::Relaxed);
}
let result = tokio::fs::read(path.clone()).await.track(x)?; let result = tokio::fs::read(path.clone()).await.track(x)?;
result.into() result.into()
} else { } else {