feat: propagate errors within tasks by restructuring task management

This commit is contained in:
talwat 2025-12-27 09:27:05 +01:00
parent 8ff0f477ac
commit 8a1f6dd214
8 changed files with 172 additions and 156 deletions

View File

@ -3,13 +3,9 @@ use std::{
time::Duration, time::Duration,
}; };
use reqwest::Client;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
task::JoinHandle,
};
use crate::tracks; use crate::tracks;
use reqwest::Client;
use tokio::sync::mpsc::{self, Receiver, Sender};
/// Flag indicating whether the downloader is actively fetching a track. /// Flag indicating whether the downloader is actively fetching a track.
/// ///
@ -81,7 +77,7 @@ impl Downloader {
Ok(Handle { Ok(Handle {
queue: qrx, queue: qrx,
task: tokio::spawn(downloader.run()), task: crate::Tasks([tokio::spawn(downloader.run())]),
}) })
} }
@ -96,6 +92,7 @@ impl Downloader {
.tracks .tracks
.random(&self.client, &PROGRESS, &mut self.rng) .random(&self.client, &PROGRESS, &mut self.rng)
.await; .await;
match result { match result {
Ok(track) => { Ok(track) => {
self.queue.send(track).await?; self.queue.send(track).await?;
@ -123,7 +120,7 @@ pub struct Handle {
queue: Receiver<tracks::Queued>, queue: Receiver<tracks::Queued>,
/// The downloader task, which can be aborted. /// The downloader task, which can be aborted.
task: JoinHandle<crate::Result<()>>, task: crate::Tasks<crate::Error, 1>,
} }
/// The output when a track is requested from the downloader. /// The output when a track is requested from the downloader.
@ -149,10 +146,10 @@ impl Handle {
}, Output::Queued, }, Output::Queued,
) )
} }
}
impl Drop for Handle { /// Shuts down the downloader task, returning any errors.
fn drop(&mut self) { pub async fn close(self) -> crate::Result<()> {
self.task.abort(); let [result] = self.task.shutdown().await;
result
} }
} }

View File

@ -1,7 +1,6 @@
//! An extremely simple lofi player. //! An extremely simple lofi player.
use crate::player::Player; use crate::player::Player;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use futures_util::TryFutureExt;
use std::path::PathBuf; use std::path::PathBuf;
pub mod audio; pub mod audio;
@ -12,6 +11,7 @@ pub mod message;
pub mod player; pub mod player;
#[cfg(feature = "scrape")] #[cfg(feature = "scrape")]
mod scrapers; mod scrapers;
pub mod tasks;
mod tests; mod tests;
pub mod tracks; pub mod tracks;
pub mod ui; pub mod ui;
@ -21,6 +21,7 @@ pub mod volume;
use crate::scrapers::Source; use crate::scrapers::Source;
pub use error::{Error, Result}; pub use error::{Error, Result};
pub use message::Message; pub use message::Message;
pub use tasks::Tasks;
/// An extremely simple lofi player. /// An extremely simple lofi player.
#[derive(Parser, Clone)] #[derive(Parser, Clone)]
@ -121,10 +122,13 @@ async fn main() -> eyre::Result<()> {
let stream = audio::stream()?; let stream = audio::stream()?;
let environment = ui::Environment::ready(args.alternate)?; let environment = ui::Environment::ready(args.alternate)?;
let result = Player::init(args, environment, stream.mixer()) let mut player = Player::init(args, stream.mixer())
.and_then(Player::run) .await
.await; .inspect_err(|_| environment.cleanup(false).unwrap())?;
let result = player.run().await;
environment.cleanup(result.is_ok())?; environment.cleanup(result.is_ok())?;
player.close().await?;
Ok(result?) Ok(result?)
} }

View File

@ -73,19 +73,7 @@ pub struct Player {
waiter: waiter::Handle, waiter: waiter::Handle,
} }
impl Drop for Player {
fn drop(&mut self) {
// Ensure playback is stopped when the player is dropped.
self.sink.stop();
}
}
impl Player { impl Player {
/// Returns the `Environment` currently used by the UI.
pub const fn environment(&self) -> ui::Environment {
self.ui.environment
}
/// Sets the in-memory current state and notifies the UI about the change. /// Sets the in-memory current state and notifies the UI about the change.
/// ///
/// If the new state is a `Track`, this will also update the bookmarked flag /// If the new state is a `Track`, this will also update the bookmarked flag
@ -115,11 +103,7 @@ impl Player {
/// This sets up the audio sink, UI, downloader, bookmarks and persistent /// This sets up the audio sink, UI, downloader, bookmarks and persistent
/// volume state. The function returns a fully constructed `Player` ready /// volume state. The function returns a fully constructed `Player` ready
/// to be driven via `run`. /// to be driven via `run`.
pub async fn init( pub async fn init(args: crate::Args, mixer: &rodio::mixer::Mixer) -> crate::Result<Self> {
args: crate::Args,
environment: ui::Environment,
mixer: &rodio::mixer::Mixer,
) -> crate::Result<Self> {
let (tx, rx) = mpsc::channel(8); let (tx, rx) = mpsc::channel(8);
if args.paused { if args.paused {
tx.send(Message::Pause).await?; tx.send(Message::Pause).await?;
@ -137,7 +121,7 @@ impl Player {
sink.set_volume(volume.float()); sink.set_volume(volume.float());
Ok(Self { Ok(Self {
ui: ui::Handle::init(tx.clone(), environment, urx, state, &args).await?, ui: ui::Handle::init(tx.clone(), urx, state, &args).await?,
downloader: Downloader::init( downloader: Downloader::init(
args.buffer_size as usize, args.buffer_size as usize,
args.timeout, args.timeout,
@ -153,10 +137,23 @@ impl Player {
}) })
} }
/// Persist state that should survive a run (bookmarks and volume). /// Close any outlying processes, as well as persist state that
pub async fn close(&self) -> crate::Result<()> { /// should survive such as bookmarks and volume.
self.bookmarks.save().await?; pub async fn close(self) -> crate::Result<()> {
PersistentVolume::save(self.sink.volume()).await?; // We should prioritize reporting UI/Downloader errors,
// but still save persistent state before so that if either one fails,
// state is saved.
let saves = (
self.bookmarks.save().await,
PersistentVolume::save(self.sink.volume()).await,
);
self.ui.close().await?;
self.downloader.close().await?;
self.sink.stop();
saves.0?;
saves.1?;
Ok(()) Ok(())
} }
@ -176,7 +173,7 @@ impl Player {
/// ///
/// This will return when a `Message::Quit` is received. It handles commands /// This will return when a `Message::Quit` is received. It handles commands
/// coming from the frontend and updates playback/UI state accordingly. /// coming from the frontend and updates playback/UI state accordingly.
pub async fn run(mut self) -> crate::Result<()> { pub async fn run(&mut self) -> crate::Result<()> {
while let Some(message) = self.rx.recv().await { while let Some(message) = self.rx.recv().await {
match message { match message {
Message::Next | Message::Init | Message::Loaded => { Message::Next | Message::Init | Message::Loaded => {
@ -229,7 +226,6 @@ impl Player {
self.ui.mpris.handle(&message).await?; self.ui.mpris.handle(&message).await?;
} }
self.close().await?;
Ok(()) Ok(())
} }
} }

47
src/tasks.rs Normal file
View File

@ -0,0 +1,47 @@
use std::{future::Future, mem::MaybeUninit};
trait AsyncArrayMap<T, const N: usize> {
async fn async_map<U, F, Fut>(self, f: F) -> [U; N]
where
F: FnMut(T) -> Fut,
Fut: Future<Output = U>;
}
impl<T, const N: usize> AsyncArrayMap<T, N> for [T; N] {
async fn async_map<U, F, Fut>(self, mut f: F) -> [U; N]
where
F: FnMut(T) -> Fut,
Fut: Future<Output = U>,
{
let mut out: [MaybeUninit<U>; N] = unsafe { MaybeUninit::uninit().assume_init() };
for (i, v) in self.into_iter().enumerate() {
out[i].write(f(v).await);
}
unsafe { std::mem::transmute_copy(&out) }
}
}
/// Wrapper around an array of JoinHandles to provide better error reporting & shutdown.
pub struct Tasks<E, const S: usize>(pub [tokio::task::JoinHandle<Result<(), E>>; S]);
impl<T: Send + 'static + Into<crate::Error>, const S: usize> Tasks<T, S> {
/// Abort tasks, and report either errors thrown from within each task
/// or from tokio about joining the task.
pub async fn shutdown(self) -> [crate::Result<()>; S] {
self.0
.async_map(async |handle| {
if !handle.is_finished() {
handle.abort();
}
match handle.await {
Ok(Err(error)) => Err(error.into()),
Err(error) if !error.is_cancelled() => Err(crate::Error::JoinError(error)),
Ok(Ok(())) | Err(_) => Ok(()),
}
})
.await
}
}

View File

@ -64,11 +64,11 @@ mod window {
#[test] #[test]
fn new_border_strings() { fn new_border_strings() {
let w = Window::new(10, false); let w = Window::new(10, false, false, false);
assert!(w.titlebar.content.starts_with('┌')); assert!(w.titlebar.content.starts_with('┌'));
assert!(w.statusbar.starts_with('└')); assert!(w.statusbar.starts_with('└'));
let w2 = Window::new(5, true); let w2 = Window::new(5, true, false, false);
assert!(w2.titlebar.content.is_empty()); assert!(w2.titlebar.content.is_empty());
assert!(w2.statusbar.is_empty()); assert!(w2.statusbar.is_empty());
} }
@ -79,8 +79,8 @@ mod window {
#[test] #[test]
fn simple() { fn simple() {
let w = Window::new(3, false); let w = Window::new(3, false, false, false);
let (render, height) = w.render(vec![String::from("abc")], false, true).unwrap(); let (render, height) = w.render(vec![String::from("abc")]).unwrap();
const MIDDLE: &str = "─────"; const MIDDLE: &str = "─────";
assert_eq!(format!("{MIDDLE}\n{}\n{MIDDLE}", sided("abc")), render); assert_eq!(format!("{MIDDLE}\n{}\n{MIDDLE}", sided("abc")), render);
@ -89,13 +89,13 @@ mod window {
#[test] #[test]
fn spaced() { fn spaced() {
let w = Window::new(3, false); let w = Window::new(3, false, true, false);
let (render, height) = w let (render, height) = w
.render( .render(vec![
vec![String::from("abc"), String::from(" b"), String::from("c")], String::from("abc"),
true, String::from(" b"),
true, String::from("c"),
) ])
.unwrap(); .unwrap();
const MIDDLE: &str = "─────"; const MIDDLE: &str = "─────";
@ -113,7 +113,7 @@ mod window {
#[test] #[test]
fn zero_width_window() { fn zero_width_window() {
let w = Window::new(0, false); let w = Window::new(0, false, false, false);
assert!(!w.titlebar.content.is_empty()); assert!(!w.titlebar.content.is_empty());
} }
} }
@ -224,25 +224,3 @@ mod interface {
); );
} }
} }
#[cfg(test)]
mod environment {
use crate::ui::Environment;
#[test]
fn ready_and_cleanup_no_panic() {
// Try to create the environment but don't fail the test if the
// terminal isn't available. We just assert the API exists.
if let Ok(env) = Environment::ready(false) {
// cleanup should succeed
let _ = env.cleanup(true);
}
}
#[test]
fn ready_with_alternate_screen() {
if let Ok(env) = Environment::ready(true) {
let _ = env.cleanup(false);
}
}
}

View File

@ -3,7 +3,6 @@ use std::sync::Arc;
use crate::{player::Current, ui, Args}; use crate::{player::Current, ui, Args};
use tokio::{ use tokio::{
sync::{broadcast, mpsc::Sender}, sync::{broadcast, mpsc::Sender},
task::JoinHandle,
time::Instant, time::Instant,
}; };
@ -82,6 +81,14 @@ impl State {
volume_timer: None, volume_timer: None,
} }
} }
/// Takes care of small updates, like resetting the volume timer.
pub fn tick(&mut self) {
let expired = |timer: Instant| timer.elapsed() > std::time::Duration::from_secs(1);
if self.volume_timer.is_some_and(expired) {
self.volume_timer = None;
}
}
} }
/// A UI update sent out by the main player thread, which may /// A UI update sent out by the main player thread, which may
@ -98,54 +105,43 @@ pub enum Update {
Quit, Quit,
} }
/// Just a simple wrapper for the two primary tasks that the UI
/// requires to function.
#[derive(Debug)]
struct Tasks {
/// The renderer, responsible for sending output to `stdout`.
render: JoinHandle<Result<()>>,
/// The input, which receives data from `stdin` via [`crossterm`].
input: JoinHandle<Result<()>>,
}
impl Tasks {
/// Actually takes care of spawning the tasks for the [`ui`].
pub fn spawn(
tx: Sender<crate::Message>,
updater: broadcast::Receiver<ui::Update>,
state: State,
params: interface::Params,
) -> Self {
Self {
render: tokio::spawn(Handle::ui(updater, state, params)),
input: tokio::spawn(input::listen(tx)),
}
}
}
impl Drop for Tasks {
fn drop(&mut self) {
self.input.abort();
self.render.abort();
}
}
/// The UI handle for controlling the state of the UI, as well as /// The UI handle for controlling the state of the UI, as well as
/// updating MPRIS information and other small interfacing tasks. /// updating MPRIS information and other small interfacing tasks.
pub struct Handle { pub struct Handle {
/// The terminal environment, which can be used for cleanup.
pub(crate) environment: Environment,
/// The MPRIS server, which is more or less a handle to the actual MPRIS thread. /// The MPRIS server, which is more or less a handle to the actual MPRIS thread.
#[cfg(feature = "mpris")] #[cfg(feature = "mpris")]
pub mpris: mpris::Server, pub mpris: mpris::Server,
/// The UI's running tasks. /// The UI's running tasks.
_tasks: Option<Tasks>, tasks: Option<crate::Tasks<ui::Error, 2>>,
} }
impl Handle { impl Handle {
/// Actually takes care of spawning the tasks for the UI.
fn spawn(
tx: Sender<crate::Message>,
updater: broadcast::Receiver<ui::Update>,
state: State,
params: interface::Params,
) -> crate::Tasks<Error, 2> {
crate::Tasks([
tokio::spawn(Handle::ui(updater, state, params)),
tokio::spawn(input::listen(tx)),
])
}
/// Shuts down the UI tasks, returning any encountered errors.
pub async fn close(self) -> crate::Result<()> {
let Some(tasks) = self.tasks else {
return Ok(());
};
for result in tasks.shutdown().await {
result?
}
Ok(())
}
/// The main UI process, which will both render the UI to the terminal /// The main UI process, which will both render the UI to the terminal
/// and also update state. /// and also update state.
/// ///
@ -159,7 +155,7 @@ impl Handle {
mut state: State, mut state: State,
params: interface::Params, params: interface::Params,
) -> Result<()> { ) -> Result<()> {
let mut interface = Interface::new(params); let mut interface = Interface::new(params)?;
loop { loop {
if let Ok(message) = updater.try_recv() { if let Ok(message) = updater.try_recv() {
@ -171,7 +167,8 @@ impl Handle {
} }
} }
interface.draw(&mut state).await?; interface.draw(&state).await?;
state.tick();
} }
Ok(()) Ok(())
@ -181,7 +178,6 @@ impl Handle {
#[allow(clippy::unused_async)] #[allow(clippy::unused_async)]
pub async fn init( pub async fn init(
tx: Sender<crate::Message>, tx: Sender<crate::Message>,
environment: Environment,
updater: broadcast::Receiver<ui::Update>, updater: broadcast::Receiver<ui::Update>,
state: State, state: State,
args: &Args, args: &Args,
@ -191,10 +187,9 @@ impl Handle {
Ok(Self { Ok(Self {
#[cfg(feature = "mpris")] #[cfg(feature = "mpris")]
mpris: mpris::Server::new(state.clone(), tx.clone(), updater.resubscribe()).await?, mpris: mpris::Server::new(state.clone(), tx.clone(), updater.resubscribe()).await?,
environment, tasks: params
_tasks: params
.enabled .enabled
.then(|| Tasks::spawn(tx, updater, state, params)), .then(|| Self::spawn(tx, updater, state, params)),
}) })
} }
} }

View File

@ -2,7 +2,7 @@ use crate::{
ui::{self, State}, ui::{self, State},
Args, Args,
}; };
use std::{env, time::Duration}; use std::{env, io::stdout, time::Duration};
pub mod clock; pub mod clock;
pub mod components; pub mod components;
@ -96,21 +96,21 @@ pub struct Interface {
impl Default for Interface { impl Default for Interface {
#[inline] #[inline]
fn default() -> Self { fn default() -> Self {
Self::new(Params::default()) Self::new(Params::default()).unwrap()
} }
} }
impl Interface { impl Interface {
/// Creates a new interface. /// Creates a new interface.
pub fn new(params: Params) -> Self { pub fn new(params: Params) -> ui::Result<Self> {
let mut window = Window::new(params.width, params.borderless); let mut window = Window::new(params.width, params.borderless, false, true);
Self { Ok(Self {
clock: params.clock.then(|| Clock::new(&mut window)), clock: params.clock.then(|| Clock::new(&mut window)),
interval: tokio::time::interval(params.delta), interval: tokio::time::interval(params.delta),
window, window,
params, params,
} })
} }
/// Creates a full "menu" from the [`ui::State`], which can be /// Creates a full "menu" from the [`ui::State`], which can be
@ -118,20 +118,16 @@ impl Interface {
/// ///
/// The menu really is just a [`Vec`] of the different components, /// The menu really is just a [`Vec`] of the different components,
/// with padding already added. /// with padding already added.
pub(crate) fn menu(&self, state: &mut State) -> Vec<String> { pub(crate) fn menu(&self, state: &State) -> Vec<String> {
let action = components::action(state, self.params.width); let action = components::action(state, self.params.width);
let middle = match state.volume_timer { let middle = if state.volume_timer.is_some() {
Some(timer) => {
let volume = state.sink.volume(); let volume = state.sink.volume();
let percentage = format!("{}%", (volume * 100.0).round().abs()); let percentage = format!("{}%", (volume * 100.0).round().abs());
if timer.elapsed() > Duration::from_secs(1) {
state.volume_timer = None;
}
components::audio_bar(self.params.width - 17, volume, &percentage) components::audio_bar(self.params.width - 17, volume, &percentage)
} } else {
None => components::progress_bar(state, self.params.width - 16), components::progress_bar(state, self.params.width - 16)
}; };
let controls = components::controls(self.params.width); let controls = components::controls(self.params.width);
@ -144,9 +140,11 @@ impl Interface {
/// Draws the terminal. This will also wait for the specified /// Draws the terminal. This will also wait for the specified
/// delta to pass before completing. /// delta to pass before completing.
pub async fn draw(&mut self, state: &mut State) -> super::Result<()> { pub async fn draw(&mut self, state: &State) -> super::Result<()> {
self.clock.as_mut().map(|x| x.update(&mut self.window)); self.clock.as_mut().map(|x| x.update(&mut self.window));
self.window.draw(self.menu(state), false)?;
let menu = self.menu(state);
self.window.draw(stdout().lock(), menu)?;
self.interval.tick().await; self.interval.tick().await;
Ok(()) Ok(())

View File

@ -1,5 +1,3 @@
use std::io::{stdout, Stdout};
use crate::ui::{self, interface::TitleBar}; use crate::ui::{self, interface::TitleBar};
use crossterm::{ use crossterm::{
cursor::{MoveToColumn, MoveUp}, cursor::{MoveToColumn, MoveUp},
@ -26,8 +24,11 @@ pub struct Window {
/// The inner width of the window. /// The inner width of the window.
width: usize, width: usize,
/// The output, currently just an [`Stdout`]. /// Whether content items should be automatically padded (spaced).
out: Stdout, spaced: bool,
/// Whether to cautiously handle ANSI sequences by adding [`style::Attribute::Reset`] generously.
fancy: bool,
} }
impl Window { impl Window {
@ -35,7 +36,7 @@ impl Window {
/// ///
/// * `width` - Inner width of the window. /// * `width` - Inner width of the window.
/// * `borderless` - Whether to include borders in the window, or not. /// * `borderless` - Whether to include borders in the window, or not.
pub fn new(width: usize, borderless: bool) -> Self { pub fn new(width: usize, borderless: bool, spaced: bool, fancy: bool) -> Self {
let statusbar = if borderless { let statusbar = if borderless {
String::new() String::new()
} else { } else {
@ -44,11 +45,12 @@ impl Window {
}; };
Self { Self {
spaced,
statusbar, statusbar,
borderless, borderless,
width, width,
fancy,
titlebar: TitleBar::new(width, borderless), titlebar: TitleBar::new(width, borderless),
out: stdout(),
} }
} }
@ -59,27 +61,22 @@ impl Window {
/// ///
/// This returns both the final rendered window and also the full /// This returns both the final rendered window and also the full
/// height of the rendered window. /// height of the rendered window.
pub(crate) fn render( pub(crate) fn render(&self, content: Vec<String>) -> ui::Result<(String, u16)> {
&self, const NEWLINE: &str = "\r\n";
content: Vec<String>,
space: bool,
testing: bool,
) -> ui::Result<(String, u16)> {
let linefeed = if testing { "\n" } else { "\r\n" };
let len: u16 = content.len().try_into()?; let len: u16 = content.len().try_into()?;
// Note that this will have a trailing newline, which we use later. // Note that this will have a trailing newline, which we use later.
let menu: String = content.into_iter().fold(String::new(), |mut output, x| { let menu: String = content.into_iter().fold(String::new(), |mut output, x| {
// Horizontal Padding & Border // Horizontal Padding & Border
let padding = if self.borderless { " " } else { "" }; let padding = if self.borderless { " " } else { "" };
let space = if space { let space = if self.spaced {
" ".repeat(self.width.saturating_sub(x.graphemes(true).count())) " ".repeat(self.width.saturating_sub(x.graphemes(true).count()))
} else { } else {
String::new() String::new()
}; };
let center = if testing { x } else { x.reset().to_string() }; let center = if self.fancy { x.reset().to_string() } else { x };
write!(output, "{padding} {center}{space} {padding}{linefeed}").unwrap(); write!(output, "{padding} {center}{space} {padding}{NEWLINE}").unwrap();
output output
}); });
@ -94,7 +91,7 @@ impl Window {
// There's no need for another newline after the main menu content, because it already has one. // There's no need for another newline after the main menu content, because it already has one.
Ok(( Ok((
format!( format!(
"{}{linefeed}{menu}{}{suffix}", "{}{NEWLINE}{menu}{}{suffix}",
self.titlebar.content, self.statusbar, self.titlebar.content, self.statusbar,
), ),
height, height,
@ -102,11 +99,15 @@ impl Window {
} }
/// Actually draws the window, with each element in `content` being on a new line. /// Actually draws the window, with each element in `content` being on a new line.
pub fn draw(&mut self, content: Vec<String>, space: bool) -> ui::Result<()> { pub fn draw(
let (rendered, height) = self.render(content, space, false)?; &mut self,
mut writer: impl std::io::Write,
content: Vec<String>,
) -> ui::Result<()> {
let (rendered, height) = self.render(content)?;
crossterm::execute!( crossterm::execute!(
self.out, writer,
Clear(ClearType::FromCursorDown), Clear(ClearType::FromCursorDown),
MoveToColumn(0), MoveToColumn(0),
Print(rendered), Print(rendered),