feat: better task management (#113)

* docs: attempt to explain tasks.rs

* fix: switch to using a joinset

* chore: remove unused tokio-utils import

* style: reorganize code
This commit is contained in:
Tal 2025-12-28 17:12:40 +01:00 committed by GitHub
parent fa236439e3
commit 6802db1a1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 225 additions and 259 deletions

View File

@ -3,57 +3,51 @@ use std::{sync::Arc, time::Duration};
use rodio::Sink; use rodio::Sink;
use tokio::{ use tokio::{
sync::{mpsc, Notify}, sync::{mpsc, Notify},
task::{self, JoinHandle},
time, time,
}; };
/// Background loop that waits for the sink to drain and then attempts
/// to send a `Message::Next` to the provided channel.
async fn waiter(
sink: Arc<Sink>,
tx: mpsc::Sender<crate::Message>,
notify: Arc<Notify>,
) -> crate::Result<()> {
loop {
notify.notified().await;
while !sink.empty() {
time::sleep(Duration::from_millis(16)).await;
}
if tx.try_send(crate::Message::Next).is_err() {
break Ok(());
}
}
}
/// Lightweight helper that waits for the current sink to drain and then /// Lightweight helper that waits for the current sink to drain and then
/// notifies the player to advance to the next track. /// notifies the player to advance to the next track.
pub struct Handle { pub struct Handle {
/// Background task monitoring the sink.
task: JoinHandle<()>,
/// Notification primitive used to wake the waiter. /// Notification primitive used to wake the waiter.
notify: Arc<Notify>, notify: Arc<Notify>,
} }
impl Drop for Handle {
fn drop(&mut self) {
self.task.abort();
}
}
impl Handle { impl Handle {
/// Create a new `Handle` which watches the provided `sink` and sends
/// `Message::Next` down `tx` when the sink becomes empty.
pub fn new(sink: Arc<Sink>, tx: mpsc::Sender<crate::Message>) -> Self {
let notify = Arc::new(Notify::new());
Self {
task: task::spawn(Self::waiter(sink, tx, Arc::clone(&notify))),
notify,
}
}
/// Notify the waiter that playback state may have changed and it should /// Notify the waiter that playback state may have changed and it should
/// re-check the sink emptiness condition. /// re-check the sink emptiness condition.
pub fn notify(&self) { pub fn notify(&self) {
self.notify.notify_one(); self.notify.notify_one();
} }
}
/// Background loop that waits for the sink to drain and then attempts impl crate::Tasks {
/// to send a `Message::Next` to the provided channel. /// Create a new `Handle` which watches the provided `sink` and sends
async fn waiter(sink: Arc<Sink>, tx: mpsc::Sender<crate::Message>, notify: Arc<Notify>) { /// `Message::Next` down `tx` when the sink becomes empty.
loop { pub fn waiter(&mut self, sink: Arc<Sink>) -> Handle {
notify.notified().await; let notify = Arc::new(Notify::new());
self.spawn(waiter(sink, self.tx(), notify.clone()));
while !sink.empty() { Handle { notify }
time::sleep(Duration::from_millis(8)).await;
}
if tx.try_send(crate::Message::Next).is_err() {
break;
}
}
} }
} }

View File

@ -5,7 +5,7 @@ use std::{
use crate::tracks; use crate::tracks;
use reqwest::Client; use reqwest::Client;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc;
/// Flag indicating whether the downloader is actively fetching a track. /// Flag indicating whether the downloader is actively fetching a track.
/// ///
@ -28,14 +28,14 @@ pub struct Downloader {
/// The track queue itself, which in this case is actually /// The track queue itself, which in this case is actually
/// just an asynchronous sender. /// just an asynchronous sender.
/// ///
/// It is a [`Sender`] because the tracks will have to be /// It is a [`mpsc::Sender`] because the tracks will have to be
/// received by a completely different thread, so this avoids /// received by a completely different thread, so this avoids
/// the need to use an explicit [`tokio::sync::Mutex`]. /// the need to use an explicit [`tokio::sync::Mutex`].
queue: Sender<tracks::Queued>, queue: mpsc::Sender<tracks::Queued>,
/// The [`Sender`] which is used to inform the /// The [`mpsc::Sender`] which is used to inform the
/// [`crate::Player`] with [`crate::Message::Loaded`]. /// [`crate::Player`] with [`crate::Message::Loaded`].
tx: Sender<crate::Message>, tx: mpsc::Sender<crate::Message>,
/// The list of tracks to download from. /// The list of tracks to download from.
tracks: tracks::List, tracks: tracks::List,
@ -48,39 +48,6 @@ pub struct Downloader {
} }
impl Downloader { impl Downloader {
/// Initializes the downloader with a track list.
///
/// `tx` specifies the [`Sender`] to be notified with [`crate::Message::Loaded`].
pub fn init(
size: usize,
timeout: u64,
tracks: tracks::List,
tx: Sender<crate::Message>,
) -> crate::Result<Handle> {
let client = Client::builder()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
.timeout(Duration::from_secs(timeout))
.build()?;
let (qtx, qrx) = mpsc::channel(size - 1);
let downloader = Self {
queue: qtx,
tx,
tracks,
client,
rng: fastrand::Rng::new(),
};
Ok(Handle {
queue: qrx,
task: crate::Tasks([tokio::spawn(downloader.run())]),
})
}
/// Actually runs the downloader, consuming it and beginning /// Actually runs the downloader, consuming it and beginning
/// the cycle of downloading tracks and reporting to the /// the cycle of downloading tracks and reporting to the
/// rest of the program. /// rest of the program.
@ -117,10 +84,7 @@ impl Downloader {
pub struct Handle { pub struct Handle {
/// The queue receiver, which can be used to actually /// The queue receiver, which can be used to actually
/// fetch a track from the queue. /// fetch a track from the queue.
queue: Receiver<tracks::Queued>, queue: mpsc::Receiver<tracks::Queued>,
/// The downloader task, which can be aborted.
task: crate::Tasks<crate::Error, 1>,
} }
/// The output when a track is requested from the downloader. /// The output when a track is requested from the downloader.
@ -146,10 +110,37 @@ impl Handle {
}, Output::Queued, }, Output::Queued,
) )
} }
}
/// Shuts down the downloader task, returning any errors. impl crate::Tasks {
pub async fn close(self) -> crate::Result<()> { /// Initializes the downloader with a track list.
let [result] = self.task.shutdown().await; ///
result /// `tx` specifies the [`Sender`] to be notified with [`crate::Message::Loaded`].
pub fn downloader(
&mut self,
size: usize,
timeout: u64,
tracks: tracks::List,
) -> crate::Result<Handle> {
let client = Client::builder()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
.timeout(Duration::from_secs(timeout))
.build()?;
let (qtx, qrx) = mpsc::channel(size - 1);
let downloader = Downloader {
queue: qtx,
tx: self.tx(),
tracks,
client,
rng: fastrand::Rng::new(),
};
self.spawn(downloader.run());
Ok(Handle { queue: qrx })
} }
} }

View File

@ -122,11 +122,11 @@ async fn main() -> eyre::Result<()> {
let stream = audio::stream()?; let stream = audio::stream()?;
let environment = ui::Environment::ready(args.alternate)?; let environment = ui::Environment::ready(args.alternate)?;
let mut player = Player::init(args, stream.mixer()) let (mut player, mut tasks) = Player::init(args, stream.mixer())
.await .await
.inspect_err(|_| environment.cleanup(false).unwrap())?; .inspect_err(|_| environment.cleanup(false).unwrap())?;
let result = player.run().await; let result = tasks.wait(player.run()).await;
environment.cleanup(result.is_ok())?; environment.cleanup(result.is_ok())?;
player.close().await?; player.close().await?;

View File

@ -1,25 +1,22 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{ use tokio::sync::mpsc::{self, Receiver};
broadcast,
mpsc::{self, Receiver},
};
use crate::{ use crate::{
audio::waiter, audio::waiter,
bookmark::Bookmarks, bookmark::Bookmarks,
download::{self, Downloader}, download,
tracks::{self, List}, tracks::{self, List},
ui, ui,
volume::PersistentVolume, volume::PersistentVolume,
Message, Message, Tasks,
}; };
#[derive(Clone, Debug)]
/// Represents the currently known playback state. /// Represents the currently known playback state.
/// ///
/// * [`Current::Loading`] indicates the player is waiting for data. /// * [`Current::Loading`] indicates the player is waiting for data.
/// * [`Current::Track`] indicates the player has a decoded track available. /// * [`Current::Track`] indicates the player has a decoded track available.
#[derive(Clone, Debug)]
pub enum Current { pub enum Current {
/// Waiting for a track to arrive. The optional `Progress` is used to /// Waiting for a track to arrive. The optional `Progress` is used to
/// indicate global download progress when present. /// indicate global download progress when present.
@ -48,23 +45,20 @@ impl Current {
/// `Player` composes the downloader, UI, audio sink and bookkeeping state. /// `Player` composes the downloader, UI, audio sink and bookkeeping state.
/// It owns background `Handle`s and drives the main message loop in `run`. /// It owns background `Handle`s and drives the main message loop in `run`.
pub struct Player { pub struct Player {
/// Background downloader that fills the internal queue.
downloader: download::Handle,
/// Persistent bookmark storage used by the player. /// Persistent bookmark storage used by the player.
bookmarks: Bookmarks, bookmarks: Bookmarks,
/// Shared audio sink used for playback. /// Current playback state (loading or track).
sink: Arc<rodio::Sink>, current: Current,
/// Background downloader that fills the internal queue.
downloader: download::Handle,
/// Receiver for incoming `Message` commands. /// Receiver for incoming `Message` commands.
rx: Receiver<crate::Message>, rx: Receiver<crate::Message>,
/// Broadcast channel used to send UI updates. /// Shared audio sink used for playback.
updater: broadcast::Sender<ui::Update>, sink: Arc<rodio::Sink>,
/// Current playback state (loading or track).
current: Current,
/// UI handle for rendering and input. /// UI handle for rendering and input.
ui: ui::Handle, ui: ui::Handle,
@ -80,38 +74,34 @@ impl Player {
/// based on persistent bookmarks. /// based on persistent bookmarks.
pub fn set_current(&mut self, current: Current) -> crate::Result<()> { pub fn set_current(&mut self, current: Current) -> crate::Result<()> {
self.current = current.clone(); self.current = current.clone();
self.update(ui::Update::Track(current))?; self.ui.update(ui::Update::Track(current))?;
let Current::Track(track) = &self.current else { let Current::Track(track) = &self.current else {
return Ok(()); return Ok(());
}; };
let bookmarked = self.bookmarks.bookmarked(track); let bookmarked = self.bookmarks.bookmarked(track);
self.update(ui::Update::Bookmarked(bookmarked))?; self.ui.update(ui::Update::Bookmarked(bookmarked))?;
Ok(()) Ok(())
} }
/// Sends a `ui::Update` to the broadcast channel.
pub fn update(&mut self, update: ui::Update) -> crate::Result<()> {
self.updater.send(update)?;
Ok(())
}
/// Initialize a `Player` with the provided CLI `args` and audio `mixer`. /// Initialize a `Player` with the provided CLI `args` and audio `mixer`.
/// ///
/// This sets up the audio sink, UI, downloader, bookmarks and persistent /// This sets up the audio sink, UI, downloader, bookmarks and persistent
/// volume state. The function returns a fully constructed `Player` ready /// volume state. The function returns a fully constructed `Player` ready
/// to be driven via `run`. /// to be driven via `run`.
pub async fn init(args: crate::Args, mixer: &rodio::mixer::Mixer) -> crate::Result<Self> { pub async fn init(
args: crate::Args,
mixer: &rodio::mixer::Mixer,
) -> crate::Result<(Self, crate::Tasks)> {
let (tx, rx) = mpsc::channel(8); let (tx, rx) = mpsc::channel(8);
let mut tasks = Tasks::new(tx.clone());
if args.paused { if args.paused {
tx.send(Message::Pause).await?; tx.send(Message::Pause).await?;
} }
tx.send(Message::Init).await?; tx.send(Message::Init).await?;
let (utx, urx) = broadcast::channel(8);
let list = List::load(args.track_list.as_ref()).await?; let list = List::load(args.track_list.as_ref()).await?;
let sink = Arc::new(rodio::Sink::connect_new(mixer)); let sink = Arc::new(rodio::Sink::connect_new(mixer));
@ -120,40 +110,25 @@ impl Player {
let volume = PersistentVolume::load().await?; let volume = PersistentVolume::load().await?;
sink.set_volume(volume.float()); sink.set_volume(volume.float());
Ok(Self { let player = Self {
ui: ui::Handle::init(tx.clone(), urx, state, &args).await?, ui: tasks.ui(state, &args).await?,
downloader: Downloader::init( downloader: tasks.downloader(args.buffer_size as usize, args.timeout, list)?,
args.buffer_size as usize, waiter: tasks.waiter(Arc::clone(&sink)),
args.timeout,
list,
tx.clone(),
)?,
waiter: waiter::Handle::new(Arc::clone(&sink), tx),
bookmarks: Bookmarks::load().await?, bookmarks: Bookmarks::load().await?,
current: Current::default(), current: Current::default(),
updater: utx,
rx, rx,
sink, sink,
}) };
Ok((player, tasks))
} }
/// Close any outlying processes, as well as persist state that /// Close any outlying processes, as well as persist state that
/// should survive such as bookmarks and volume. /// should survive such as bookmarks and volume.
pub async fn close(self) -> crate::Result<()> { pub async fn close(self) -> crate::Result<()> {
// We should prioritize reporting UI/Downloader errors,
// but still save persistent state before so that if either one fails,
// state is saved.
let saves = (
self.bookmarks.save().await,
PersistentVolume::save(self.sink.volume()).await,
);
self.ui.close().await?;
self.downloader.close().await?;
self.sink.stop(); self.sink.stop();
self.bookmarks.save().await?;
saves.0?; PersistentVolume::save(self.sink.volume()).await?;
saves.1?;
Ok(()) Ok(())
} }
@ -205,11 +180,11 @@ impl Player {
Message::ChangeVolume(change) => { Message::ChangeVolume(change) => {
self.sink self.sink
.set_volume((self.sink.volume() + change).clamp(0.0, 1.0)); .set_volume((self.sink.volume() + change).clamp(0.0, 1.0));
self.update(ui::Update::Volume)?; self.ui.update(ui::Update::Volume)?;
} }
Message::SetVolume(set) => { Message::SetVolume(set) => {
self.sink.set_volume(set.clamp(0.0, 1.0)); self.sink.set_volume(set.clamp(0.0, 1.0));
self.update(ui::Update::Volume)?; self.ui.update(ui::Update::Volume)?;
} }
Message::Bookmark => { Message::Bookmark => {
let Current::Track(current) = &self.current else { let Current::Track(current) = &self.current else {
@ -217,7 +192,7 @@ impl Player {
}; };
let bookmarked = self.bookmarks.bookmark(current)?; let bookmarked = self.bookmarks.bookmark(current)?;
self.update(ui::Update::Bookmarked(bookmarked))?; self.ui.update(ui::Update::Bookmarked(bookmarked))?;
} }
Message::Quit => break, Message::Quit => break,
} }

View File

@ -1,47 +1,64 @@
use std::{future::Future, mem::MaybeUninit}; //! Task management.
//!
//! This file aims to abstract a lot of annoying Rust async logic, which may be subject to change.
//! For those who are not intimately familiar with async rust, this will be very confusing.
trait AsyncArrayMap<T, const N: usize> { use futures_util::TryFutureExt;
async fn async_map<U, F, Fut>(self, f: F) -> [U; N] use std::future::Future;
where use tokio::{select, sync::mpsc, task::JoinSet};
F: FnMut(T) -> Fut,
Fut: Future<Output = U>; /// Handles all of the processes within lowfi.
/// This entails initializing/closing tasks, and handling any potential errors that arise.
pub struct Tasks {
/// The [`JoinSet`], which contains all of the task handles.
pub set: JoinSet<crate::Result<()>>,
/// A sender, which is kept for convenience to be used when
/// initializing various other tasks.
tx: mpsc::Sender<crate::Message>,
} }
impl<T, const N: usize> AsyncArrayMap<T, N> for [T; N] { impl Tasks {
async fn async_map<U, F, Fut>(self, mut f: F) -> [U; N] /// Creates a new task manager.
where pub fn new(tx: mpsc::Sender<crate::Message>) -> Self {
F: FnMut(T) -> Fut, Self {
Fut: Future<Output = U>, tx,
{ set: JoinSet::new(),
let mut out: [MaybeUninit<U>; N] = unsafe { MaybeUninit::uninit().assume_init() }; }
for (i, v) in self.into_iter().enumerate() {
out[i].write(f(v).await);
} }
unsafe { std::mem::transmute_copy(&out) } /// Processes a task, and adds it to the internal [`JoinSet`].
} pub fn spawn<E: Into<crate::Error> + Send + Sync + 'static>(
} &mut self,
future: impl Future<Output = Result<(), E>> + Send + 'static,
/// Wrapper around an array of JoinHandles to provide better error reporting & shutdown. ) {
pub struct Tasks<E, const S: usize>(pub [tokio::task::JoinHandle<Result<(), E>>; S]); self.set.spawn(future.map_err(Into::into));
}
impl<T: Send + 'static + Into<crate::Error>, const S: usize> Tasks<T, S> {
/// Abort tasks, and report either errors thrown from within each task /// Gets a copy of the internal [`mpsc::Sender`].
/// or from tokio about joining the task. pub fn tx(&self) -> mpsc::Sender<crate::Message> {
pub async fn shutdown(self) -> [crate::Result<()>; S] { self.tx.clone()
self.0 }
.async_map(async |handle| {
if !handle.is_finished() { /// Actively polls all of the handles previously added.
handle.abort(); ///
} /// An additional `runner` is for the main player future, which
/// can't be added as a "task" because it shares data with the
match handle.await { /// main thread.
Ok(Err(error)) => Err(error.into()), ///
Err(error) if !error.is_cancelled() => Err(crate::Error::JoinError(error)), /// This either returns when the runner completes, or if an error occurs
Ok(Ok(())) | Err(_) => Ok(()), /// in any of the internally held tasks.
} pub async fn wait(
}) &mut self,
.await runner: impl Future<Output = Result<(), crate::Error>> + std::marker::Send,
) -> crate::Result<()> {
select! {
result = runner => result,
Some(result) = self.set.join_next() => match result {
Ok(res) => res,
Err(e) if !e.is_cancelled() => Err(crate::Error::JoinError(e)),
Err(_) => Ok(()),
}
}
} }
} }

View File

@ -1,12 +1,10 @@
use std::sync::Arc; use std::sync::Arc;
use crate::{player::Current, ui, Args}; use crate::player::Current;
use tokio::{ use tokio::{sync::broadcast, time::Instant};
sync::{broadcast, mpsc::Sender},
time::Instant,
};
pub mod environment; pub mod environment;
pub mod task;
pub use environment::Environment; pub use environment::Environment;
pub mod input; pub mod input;
pub mod interface; pub mod interface;
@ -108,53 +106,35 @@ pub enum Update {
/// The UI handle for controlling the state of the UI, as well as /// The UI handle for controlling the state of the UI, as well as
/// updating MPRIS information and other small interfacing tasks. /// updating MPRIS information and other small interfacing tasks.
pub struct Handle { pub struct Handle {
/// Broadcast channel used to send UI updates.
updater: broadcast::Sender<Update>,
/// The MPRIS server, which is more or less a handle to the actual MPRIS thread. /// The MPRIS server, which is more or less a handle to the actual MPRIS thread.
#[cfg(feature = "mpris")] #[cfg(feature = "mpris")]
pub mpris: mpris::Server, pub mpris: mpris::Server,
/// The UI's running tasks.
tasks: Option<crate::Tasks<ui::Error, 2>>,
} }
impl Handle { impl Handle {
/// Actually takes care of spawning the tasks for the UI. /// Sends a `ui::Update` to the broadcast channel.
fn spawn( pub fn update(&mut self, update: Update) -> crate::Result<()> {
tx: Sender<crate::Message>, self.updater.send(update)?;
updater: broadcast::Receiver<ui::Update>,
state: State,
params: interface::Params,
) -> crate::Tasks<Error, 2> {
crate::Tasks([
tokio::spawn(Handle::ui(updater, state, params)),
tokio::spawn(input::listen(tx)),
])
}
/// Shuts down the UI tasks, returning any encountered errors.
pub async fn close(self) -> crate::Result<()> {
let Some(tasks) = self.tasks else {
return Ok(());
};
for result in tasks.shutdown().await {
result?
}
Ok(()) Ok(())
} }
}
/// The main UI process, which will both render the UI to the terminal /// The main UI process, which will both render the UI to the terminal
/// and also update state. /// and also update state.
/// ///
/// It does both of these things at a fixed interval, due to things /// It does both of these things at a fixed interval, due to things
/// like the track duration changing too frequently. /// like the track duration changing too frequently.
/// ///
/// `rx` is the receiver for state updates, `state` the initial state, /// `rx` is the receiver for state updates, `state` the initial state,
/// and `params` specifies aesthetic options that are specified by the user. /// and `params` specifies aesthetic options that are specified by the user.
async fn ui( pub async fn run(
mut updater: broadcast::Receiver<Update>, mut updater: broadcast::Receiver<Update>,
mut state: State, mut state: State,
params: interface::Params, params: interface::Params,
) -> Result<()> { ) -> Result<()> {
let mut interface = Interface::new(params)?; let mut interface = Interface::new(params)?;
loop { loop {
@ -172,24 +152,4 @@ impl Handle {
} }
Ok(()) Ok(())
}
/// Initializes the UI itself, along with all of the tasks that are related to it.
#[allow(clippy::unused_async)]
pub async fn init(
tx: Sender<crate::Message>,
updater: broadcast::Receiver<ui::Update>,
state: State,
args: &Args,
) -> Result<Self> {
let params = interface::Params::try_from(args)?;
Ok(Self {
#[cfg(feature = "mpris")]
mpris: mpris::Server::new(state.clone(), tx.clone(), updater.resubscribe()).await?,
tasks: params
.enabled
.then(|| Self::spawn(tx, updater, state, params)),
})
}
} }

View File

@ -141,7 +141,9 @@ impl Interface {
/// Draws the terminal. This will also wait for the specified /// Draws the terminal. This will also wait for the specified
/// delta to pass before completing. /// delta to pass before completing.
pub async fn draw(&mut self, state: &State) -> super::Result<()> { pub async fn draw(&mut self, state: &State) -> super::Result<()> {
self.clock.as_mut().map(|x| x.update(&mut self.window)); if let Some(x) = self.clock.as_mut() {
x.update(&mut self.window);
}
let menu = self.menu(state); let menu = self.menu(state);
self.window.draw(stdout().lock(), menu)?; self.window.draw(stdout().lock(), menu)?;

27
src/ui/task.rs Normal file
View File

@ -0,0 +1,27 @@
//! Contains the code for initializing the UI and creating a [`ui::Handle`].
use crate::ui::{self, input, interface};
use tokio::sync::broadcast;
impl crate::Tasks {
/// Initializes the UI itself, along with all of the tasks that are related to it.
#[allow(clippy::unused_async)]
pub async fn ui(&mut self, state: ui::State, args: &crate::Args) -> crate::Result<ui::Handle> {
let (utx, urx) = broadcast::channel(8);
#[cfg(feature = "mpris")]
let mpris = ui::mpris::Server::new(state.clone(), self.tx(), urx.resubscribe()).await?;
let params = interface::Params::try_from(args)?;
if params.enabled {
self.spawn(ui::run(urx, state, params));
self.spawn(input::listen(self.tx()));
}
Ok(ui::Handle {
updater: utx,
#[cfg(feature = "mpris")]
mpris,
})
}
}