From 6802db1a1e53194e9e39b682925649dfcf0149a6 Mon Sep 17 00:00:00 2001 From: Tal <83217276+talwat@users.noreply.github.com> Date: Sun, 28 Dec 2025 17:12:40 +0100 Subject: [PATCH] feat: better task management (#113) * docs: attempt to explain tasks.rs * fix: switch to using a joinset * chore: remove unused tokio-utils import * style: reorganize code --- src/audio/waiter.rs | 62 ++++++++++------------ src/download.rs | 83 ++++++++++++++---------------- src/main.rs | 4 +- src/player.rs | 85 +++++++++++------------------- src/tasks.rs | 97 ++++++++++++++++++++--------------- src/ui.rs | 122 +++++++++++++++----------------------------- src/ui/interface.rs | 4 +- src/ui/task.rs | 27 ++++++++++ 8 files changed, 225 insertions(+), 259 deletions(-) create mode 100644 src/ui/task.rs diff --git a/src/audio/waiter.rs b/src/audio/waiter.rs index ed13ada..5c12586 100644 --- a/src/audio/waiter.rs +++ b/src/audio/waiter.rs @@ -3,57 +3,51 @@ use std::{sync::Arc, time::Duration}; use rodio::Sink; use tokio::{ sync::{mpsc, Notify}, - task::{self, JoinHandle}, time, }; +/// Background loop that waits for the sink to drain and then attempts +/// to send a `Message::Next` to the provided channel. +async fn waiter( + sink: Arc, + tx: mpsc::Sender, + notify: Arc, +) -> crate::Result<()> { + loop { + notify.notified().await; + + while !sink.empty() { + time::sleep(Duration::from_millis(16)).await; + } + + if tx.try_send(crate::Message::Next).is_err() { + break Ok(()); + } + } +} + /// Lightweight helper that waits for the current sink to drain and then /// notifies the player to advance to the next track. pub struct Handle { - /// Background task monitoring the sink. - task: JoinHandle<()>, - /// Notification primitive used to wake the waiter. notify: Arc, } -impl Drop for Handle { - fn drop(&mut self) { - self.task.abort(); - } -} - impl Handle { - /// Create a new `Handle` which watches the provided `sink` and sends - /// `Message::Next` down `tx` when the sink becomes empty. - pub fn new(sink: Arc, tx: mpsc::Sender) -> Self { - let notify = Arc::new(Notify::new()); - - Self { - task: task::spawn(Self::waiter(sink, tx, Arc::clone(¬ify))), - notify, - } - } - /// Notify the waiter that playback state may have changed and it should /// re-check the sink emptiness condition. pub fn notify(&self) { self.notify.notify_one(); } +} - /// Background loop that waits for the sink to drain and then attempts - /// to send a `Message::Next` to the provided channel. - async fn waiter(sink: Arc, tx: mpsc::Sender, notify: Arc) { - loop { - notify.notified().await; +impl crate::Tasks { + /// Create a new `Handle` which watches the provided `sink` and sends + /// `Message::Next` down `tx` when the sink becomes empty. + pub fn waiter(&mut self, sink: Arc) -> Handle { + let notify = Arc::new(Notify::new()); + self.spawn(waiter(sink, self.tx(), notify.clone())); - while !sink.empty() { - time::sleep(Duration::from_millis(8)).await; - } - - if tx.try_send(crate::Message::Next).is_err() { - break; - } - } + Handle { notify } } } diff --git a/src/download.rs b/src/download.rs index b58eac1..906aa36 100644 --- a/src/download.rs +++ b/src/download.rs @@ -5,7 +5,7 @@ use std::{ use crate::tracks; use reqwest::Client; -use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::mpsc; /// Flag indicating whether the downloader is actively fetching a track. /// @@ -28,14 +28,14 @@ pub struct Downloader { /// The track queue itself, which in this case is actually /// just an asynchronous sender. /// - /// It is a [`Sender`] because the tracks will have to be + /// It is a [`mpsc::Sender`] because the tracks will have to be /// received by a completely different thread, so this avoids /// the need to use an explicit [`tokio::sync::Mutex`]. - queue: Sender, + queue: mpsc::Sender, - /// The [`Sender`] which is used to inform the + /// The [`mpsc::Sender`] which is used to inform the /// [`crate::Player`] with [`crate::Message::Loaded`]. - tx: Sender, + tx: mpsc::Sender, /// The list of tracks to download from. tracks: tracks::List, @@ -48,39 +48,6 @@ pub struct Downloader { } impl Downloader { - /// Initializes the downloader with a track list. - /// - /// `tx` specifies the [`Sender`] to be notified with [`crate::Message::Loaded`]. - pub fn init( - size: usize, - timeout: u64, - tracks: tracks::List, - tx: Sender, - ) -> crate::Result { - let client = Client::builder() - .user_agent(concat!( - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION") - )) - .timeout(Duration::from_secs(timeout)) - .build()?; - - let (qtx, qrx) = mpsc::channel(size - 1); - let downloader = Self { - queue: qtx, - tx, - tracks, - client, - rng: fastrand::Rng::new(), - }; - - Ok(Handle { - queue: qrx, - task: crate::Tasks([tokio::spawn(downloader.run())]), - }) - } - /// Actually runs the downloader, consuming it and beginning /// the cycle of downloading tracks and reporting to the /// rest of the program. @@ -117,10 +84,7 @@ impl Downloader { pub struct Handle { /// The queue receiver, which can be used to actually /// fetch a track from the queue. - queue: Receiver, - - /// The downloader task, which can be aborted. - task: crate::Tasks, + queue: mpsc::Receiver, } /// The output when a track is requested from the downloader. @@ -146,10 +110,37 @@ impl Handle { }, Output::Queued, ) } +} - /// Shuts down the downloader task, returning any errors. - pub async fn close(self) -> crate::Result<()> { - let [result] = self.task.shutdown().await; - result +impl crate::Tasks { + /// Initializes the downloader with a track list. + /// + /// `tx` specifies the [`Sender`] to be notified with [`crate::Message::Loaded`]. + pub fn downloader( + &mut self, + size: usize, + timeout: u64, + tracks: tracks::List, + ) -> crate::Result { + let client = Client::builder() + .user_agent(concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION") + )) + .timeout(Duration::from_secs(timeout)) + .build()?; + + let (qtx, qrx) = mpsc::channel(size - 1); + let downloader = Downloader { + queue: qtx, + tx: self.tx(), + tracks, + client, + rng: fastrand::Rng::new(), + }; + + self.spawn(downloader.run()); + Ok(Handle { queue: qrx }) } } diff --git a/src/main.rs b/src/main.rs index 0c3ab6c..97d1405 100644 --- a/src/main.rs +++ b/src/main.rs @@ -122,11 +122,11 @@ async fn main() -> eyre::Result<()> { let stream = audio::stream()?; let environment = ui::Environment::ready(args.alternate)?; - let mut player = Player::init(args, stream.mixer()) + let (mut player, mut tasks) = Player::init(args, stream.mixer()) .await .inspect_err(|_| environment.cleanup(false).unwrap())?; - let result = player.run().await; + let result = tasks.wait(player.run()).await; environment.cleanup(result.is_ok())?; player.close().await?; diff --git a/src/player.rs b/src/player.rs index a108299..6f32744 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,25 +1,22 @@ use std::sync::Arc; -use tokio::sync::{ - broadcast, - mpsc::{self, Receiver}, -}; +use tokio::sync::mpsc::{self, Receiver}; use crate::{ audio::waiter, bookmark::Bookmarks, - download::{self, Downloader}, + download, tracks::{self, List}, ui, volume::PersistentVolume, - Message, + Message, Tasks, }; -#[derive(Clone, Debug)] /// Represents the currently known playback state. /// /// * [`Current::Loading`] indicates the player is waiting for data. /// * [`Current::Track`] indicates the player has a decoded track available. +#[derive(Clone, Debug)] pub enum Current { /// Waiting for a track to arrive. The optional `Progress` is used to /// indicate global download progress when present. @@ -48,23 +45,20 @@ impl Current { /// `Player` composes the downloader, UI, audio sink and bookkeeping state. /// It owns background `Handle`s and drives the main message loop in `run`. pub struct Player { - /// Background downloader that fills the internal queue. - downloader: download::Handle, - /// Persistent bookmark storage used by the player. bookmarks: Bookmarks, - /// Shared audio sink used for playback. - sink: Arc, + /// Current playback state (loading or track). + current: Current, + + /// Background downloader that fills the internal queue. + downloader: download::Handle, /// Receiver for incoming `Message` commands. rx: Receiver, - /// Broadcast channel used to send UI updates. - updater: broadcast::Sender, - - /// Current playback state (loading or track). - current: Current, + /// Shared audio sink used for playback. + sink: Arc, /// UI handle for rendering and input. ui: ui::Handle, @@ -80,38 +74,34 @@ impl Player { /// based on persistent bookmarks. pub fn set_current(&mut self, current: Current) -> crate::Result<()> { self.current = current.clone(); - self.update(ui::Update::Track(current))?; + self.ui.update(ui::Update::Track(current))?; let Current::Track(track) = &self.current else { return Ok(()); }; let bookmarked = self.bookmarks.bookmarked(track); - self.update(ui::Update::Bookmarked(bookmarked))?; + self.ui.update(ui::Update::Bookmarked(bookmarked))?; Ok(()) } - /// Sends a `ui::Update` to the broadcast channel. - pub fn update(&mut self, update: ui::Update) -> crate::Result<()> { - self.updater.send(update)?; - Ok(()) - } - /// Initialize a `Player` with the provided CLI `args` and audio `mixer`. /// /// This sets up the audio sink, UI, downloader, bookmarks and persistent /// volume state. The function returns a fully constructed `Player` ready /// to be driven via `run`. - pub async fn init(args: crate::Args, mixer: &rodio::mixer::Mixer) -> crate::Result { + pub async fn init( + args: crate::Args, + mixer: &rodio::mixer::Mixer, + ) -> crate::Result<(Self, crate::Tasks)> { let (tx, rx) = mpsc::channel(8); + let mut tasks = Tasks::new(tx.clone()); if args.paused { tx.send(Message::Pause).await?; } tx.send(Message::Init).await?; - - let (utx, urx) = broadcast::channel(8); let list = List::load(args.track_list.as_ref()).await?; let sink = Arc::new(rodio::Sink::connect_new(mixer)); @@ -120,40 +110,25 @@ impl Player { let volume = PersistentVolume::load().await?; sink.set_volume(volume.float()); - Ok(Self { - ui: ui::Handle::init(tx.clone(), urx, state, &args).await?, - downloader: Downloader::init( - args.buffer_size as usize, - args.timeout, - list, - tx.clone(), - )?, - waiter: waiter::Handle::new(Arc::clone(&sink), tx), + let player = Self { + ui: tasks.ui(state, &args).await?, + downloader: tasks.downloader(args.buffer_size as usize, args.timeout, list)?, + waiter: tasks.waiter(Arc::clone(&sink)), bookmarks: Bookmarks::load().await?, current: Current::default(), - updater: utx, rx, sink, - }) + }; + + Ok((player, tasks)) } /// Close any outlying processes, as well as persist state that /// should survive such as bookmarks and volume. pub async fn close(self) -> crate::Result<()> { - // We should prioritize reporting UI/Downloader errors, - // but still save persistent state before so that if either one fails, - // state is saved. - let saves = ( - self.bookmarks.save().await, - PersistentVolume::save(self.sink.volume()).await, - ); - - self.ui.close().await?; - self.downloader.close().await?; self.sink.stop(); - - saves.0?; - saves.1?; + self.bookmarks.save().await?; + PersistentVolume::save(self.sink.volume()).await?; Ok(()) } @@ -205,11 +180,11 @@ impl Player { Message::ChangeVolume(change) => { self.sink .set_volume((self.sink.volume() + change).clamp(0.0, 1.0)); - self.update(ui::Update::Volume)?; + self.ui.update(ui::Update::Volume)?; } Message::SetVolume(set) => { self.sink.set_volume(set.clamp(0.0, 1.0)); - self.update(ui::Update::Volume)?; + self.ui.update(ui::Update::Volume)?; } Message::Bookmark => { let Current::Track(current) = &self.current else { @@ -217,7 +192,7 @@ impl Player { }; let bookmarked = self.bookmarks.bookmark(current)?; - self.update(ui::Update::Bookmarked(bookmarked))?; + self.ui.update(ui::Update::Bookmarked(bookmarked))?; } Message::Quit => break, } diff --git a/src/tasks.rs b/src/tasks.rs index ed5ddde..a8afbe0 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -1,47 +1,64 @@ -use std::{future::Future, mem::MaybeUninit}; +//! Task management. +//! +//! This file aims to abstract a lot of annoying Rust async logic, which may be subject to change. +//! For those who are not intimately familiar with async rust, this will be very confusing. -trait AsyncArrayMap { - async fn async_map(self, f: F) -> [U; N] - where - F: FnMut(T) -> Fut, - Fut: Future; +use futures_util::TryFutureExt; +use std::future::Future; +use tokio::{select, sync::mpsc, task::JoinSet}; + +/// Handles all of the processes within lowfi. +/// This entails initializing/closing tasks, and handling any potential errors that arise. +pub struct Tasks { + /// The [`JoinSet`], which contains all of the task handles. + pub set: JoinSet>, + + /// A sender, which is kept for convenience to be used when + /// initializing various other tasks. + tx: mpsc::Sender, } -impl AsyncArrayMap for [T; N] { - async fn async_map(self, mut f: F) -> [U; N] - where - F: FnMut(T) -> Fut, - Fut: Future, - { - let mut out: [MaybeUninit; N] = unsafe { MaybeUninit::uninit().assume_init() }; - - for (i, v) in self.into_iter().enumerate() { - out[i].write(f(v).await); +impl Tasks { + /// Creates a new task manager. + pub fn new(tx: mpsc::Sender) -> Self { + Self { + tx, + set: JoinSet::new(), } + } - unsafe { std::mem::transmute_copy(&out) } - } -} - -/// Wrapper around an array of JoinHandles to provide better error reporting & shutdown. -pub struct Tasks(pub [tokio::task::JoinHandle>; S]); - -impl, const S: usize> Tasks { - /// Abort tasks, and report either errors thrown from within each task - /// or from tokio about joining the task. - pub async fn shutdown(self) -> [crate::Result<()>; S] { - self.0 - .async_map(async |handle| { - if !handle.is_finished() { - handle.abort(); - } - - match handle.await { - Ok(Err(error)) => Err(error.into()), - Err(error) if !error.is_cancelled() => Err(crate::Error::JoinError(error)), - Ok(Ok(())) | Err(_) => Ok(()), - } - }) - .await + /// Processes a task, and adds it to the internal [`JoinSet`]. + pub fn spawn + Send + Sync + 'static>( + &mut self, + future: impl Future> + Send + 'static, + ) { + self.set.spawn(future.map_err(Into::into)); + } + + /// Gets a copy of the internal [`mpsc::Sender`]. + pub fn tx(&self) -> mpsc::Sender { + self.tx.clone() + } + + /// Actively polls all of the handles previously added. + /// + /// An additional `runner` is for the main player future, which + /// can't be added as a "task" because it shares data with the + /// main thread. + /// + /// This either returns when the runner completes, or if an error occurs + /// in any of the internally held tasks. + pub async fn wait( + &mut self, + runner: impl Future> + std::marker::Send, + ) -> crate::Result<()> { + select! { + result = runner => result, + Some(result) = self.set.join_next() => match result { + Ok(res) => res, + Err(e) if !e.is_cancelled() => Err(crate::Error::JoinError(e)), + Err(_) => Ok(()), + } + } } } diff --git a/src/ui.rs b/src/ui.rs index 32617fd..1c045cb 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,12 +1,10 @@ use std::sync::Arc; -use crate::{player::Current, ui, Args}; -use tokio::{ - sync::{broadcast, mpsc::Sender}, - time::Instant, -}; +use crate::player::Current; +use tokio::{sync::broadcast, time::Instant}; pub mod environment; +pub mod task; pub use environment::Environment; pub mod input; pub mod interface; @@ -108,88 +106,50 @@ pub enum Update { /// The UI handle for controlling the state of the UI, as well as /// updating MPRIS information and other small interfacing tasks. pub struct Handle { + /// Broadcast channel used to send UI updates. + updater: broadcast::Sender, + /// The MPRIS server, which is more or less a handle to the actual MPRIS thread. #[cfg(feature = "mpris")] pub mpris: mpris::Server, - - /// The UI's running tasks. - tasks: Option>, } impl Handle { - /// Actually takes care of spawning the tasks for the UI. - fn spawn( - tx: Sender, - updater: broadcast::Receiver, - state: State, - params: interface::Params, - ) -> crate::Tasks { - crate::Tasks([ - tokio::spawn(Handle::ui(updater, state, params)), - tokio::spawn(input::listen(tx)), - ]) - } - - /// Shuts down the UI tasks, returning any encountered errors. - pub async fn close(self) -> crate::Result<()> { - let Some(tasks) = self.tasks else { - return Ok(()); - }; - for result in tasks.shutdown().await { - result? - } - + /// Sends a `ui::Update` to the broadcast channel. + pub fn update(&mut self, update: Update) -> crate::Result<()> { + self.updater.send(update)?; Ok(()) } - - /// The main UI process, which will both render the UI to the terminal - /// and also update state. - /// - /// It does both of these things at a fixed interval, due to things - /// like the track duration changing too frequently. - /// - /// `rx` is the receiver for state updates, `state` the initial state, - /// and `params` specifies aesthetic options that are specified by the user. - async fn ui( - mut updater: broadcast::Receiver, - mut state: State, - params: interface::Params, - ) -> Result<()> { - let mut interface = Interface::new(params)?; - - loop { - if let Ok(message) = updater.try_recv() { - match message { - Update::Track(track) => state.current = track, - Update::Bookmarked(bookmarked) => state.bookmarked = bookmarked, - Update::Volume => state.volume_timer = Some(Instant::now()), - Update::Quit => break, - } - } - - interface.draw(&state).await?; - state.tick(); - } - - Ok(()) - } - - /// Initializes the UI itself, along with all of the tasks that are related to it. - #[allow(clippy::unused_async)] - pub async fn init( - tx: Sender, - updater: broadcast::Receiver, - state: State, - args: &Args, - ) -> Result { - let params = interface::Params::try_from(args)?; - - Ok(Self { - #[cfg(feature = "mpris")] - mpris: mpris::Server::new(state.clone(), tx.clone(), updater.resubscribe()).await?, - tasks: params - .enabled - .then(|| Self::spawn(tx, updater, state, params)), - }) - } +} + +/// The main UI process, which will both render the UI to the terminal +/// and also update state. +/// +/// It does both of these things at a fixed interval, due to things +/// like the track duration changing too frequently. +/// +/// `rx` is the receiver for state updates, `state` the initial state, +/// and `params` specifies aesthetic options that are specified by the user. +pub async fn run( + mut updater: broadcast::Receiver, + mut state: State, + params: interface::Params, +) -> Result<()> { + let mut interface = Interface::new(params)?; + + loop { + if let Ok(message) = updater.try_recv() { + match message { + Update::Track(track) => state.current = track, + Update::Bookmarked(bookmarked) => state.bookmarked = bookmarked, + Update::Volume => state.volume_timer = Some(Instant::now()), + Update::Quit => break, + } + } + + interface.draw(&state).await?; + state.tick(); + } + + Ok(()) } diff --git a/src/ui/interface.rs b/src/ui/interface.rs index f316c4a..b82113a 100644 --- a/src/ui/interface.rs +++ b/src/ui/interface.rs @@ -141,7 +141,9 @@ impl Interface { /// Draws the terminal. This will also wait for the specified /// delta to pass before completing. pub async fn draw(&mut self, state: &State) -> super::Result<()> { - self.clock.as_mut().map(|x| x.update(&mut self.window)); + if let Some(x) = self.clock.as_mut() { + x.update(&mut self.window); + } let menu = self.menu(state); self.window.draw(stdout().lock(), menu)?; diff --git a/src/ui/task.rs b/src/ui/task.rs new file mode 100644 index 0000000..fe02613 --- /dev/null +++ b/src/ui/task.rs @@ -0,0 +1,27 @@ +//! Contains the code for initializing the UI and creating a [`ui::Handle`]. + +use crate::ui::{self, input, interface}; +use tokio::sync::broadcast; + +impl crate::Tasks { + /// Initializes the UI itself, along with all of the tasks that are related to it. + #[allow(clippy::unused_async)] + pub async fn ui(&mut self, state: ui::State, args: &crate::Args) -> crate::Result { + let (utx, urx) = broadcast::channel(8); + + #[cfg(feature = "mpris")] + let mpris = ui::mpris::Server::new(state.clone(), self.tx(), urx.resubscribe()).await?; + + let params = interface::Params::try_from(args)?; + if params.enabled { + self.spawn(ui::run(urx, state, params)); + self.spawn(input::listen(self.tx())); + } + + Ok(ui::Handle { + updater: utx, + #[cfg(feature = "mpris")] + mpris, + }) + } +}