From 8a1f6dd214f67e5e99c2f727699bc6f2a516c7c7 Mon Sep 17 00:00:00 2001 From: talwat <83217276+talwat@users.noreply.github.com> Date: Sat, 27 Dec 2025 09:27:05 +0100 Subject: [PATCH] feat: propagate errors within tasks by restructuring task management --- src/download.rs | 21 +++++----- src/main.rs | 12 ++++-- src/player.rs | 44 +++++++++----------- src/tasks.rs | 47 +++++++++++++++++++++ src/tests/ui.rs | 44 +++++--------------- src/ui.rs | 83 ++++++++++++++++++-------------------- src/ui/interface.rs | 36 ++++++++--------- src/ui/interface/window.rs | 41 ++++++++++--------- 8 files changed, 172 insertions(+), 156 deletions(-) create mode 100644 src/tasks.rs diff --git a/src/download.rs b/src/download.rs index 8796144..b58eac1 100644 --- a/src/download.rs +++ b/src/download.rs @@ -3,13 +3,9 @@ use std::{ time::Duration, }; -use reqwest::Client; -use tokio::{ - sync::mpsc::{self, Receiver, Sender}, - task::JoinHandle, -}; - use crate::tracks; +use reqwest::Client; +use tokio::sync::mpsc::{self, Receiver, Sender}; /// Flag indicating whether the downloader is actively fetching a track. /// @@ -81,7 +77,7 @@ impl Downloader { Ok(Handle { queue: qrx, - task: tokio::spawn(downloader.run()), + task: crate::Tasks([tokio::spawn(downloader.run())]), }) } @@ -96,6 +92,7 @@ impl Downloader { .tracks .random(&self.client, &PROGRESS, &mut self.rng) .await; + match result { Ok(track) => { self.queue.send(track).await?; @@ -123,7 +120,7 @@ pub struct Handle { queue: Receiver, /// The downloader task, which can be aborted. - task: JoinHandle>, + task: crate::Tasks, } /// The output when a track is requested from the downloader. @@ -149,10 +146,10 @@ impl Handle { }, Output::Queued, ) } -} -impl Drop for Handle { - fn drop(&mut self) { - self.task.abort(); + /// Shuts down the downloader task, returning any errors. + pub async fn close(self) -> crate::Result<()> { + let [result] = self.task.shutdown().await; + result } } diff --git a/src/main.rs b/src/main.rs index 27b42bd..14d24dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ //! An extremely simple lofi player. use crate::player::Player; use clap::{Parser, Subcommand}; -use futures_util::TryFutureExt; use std::path::PathBuf; pub mod audio; @@ -12,6 +11,7 @@ pub mod message; pub mod player; #[cfg(feature = "scrape")] mod scrapers; +pub mod tasks; mod tests; pub mod tracks; pub mod ui; @@ -21,6 +21,7 @@ pub mod volume; use crate::scrapers::Source; pub use error::{Error, Result}; pub use message::Message; +pub use tasks::Tasks; /// An extremely simple lofi player. #[derive(Parser, Clone)] @@ -121,10 +122,13 @@ async fn main() -> eyre::Result<()> { let stream = audio::stream()?; let environment = ui::Environment::ready(args.alternate)?; - let result = Player::init(args, environment, stream.mixer()) - .and_then(Player::run) - .await; + let mut player = Player::init(args, stream.mixer()) + .await + .inspect_err(|_| environment.cleanup(false).unwrap())?; + let result = player.run().await; environment.cleanup(result.is_ok())?; + player.close().await?; + Ok(result?) } diff --git a/src/player.rs b/src/player.rs index 9c892c9..a108299 100644 --- a/src/player.rs +++ b/src/player.rs @@ -73,19 +73,7 @@ pub struct Player { waiter: waiter::Handle, } -impl Drop for Player { - fn drop(&mut self) { - // Ensure playback is stopped when the player is dropped. - self.sink.stop(); - } -} - impl Player { - /// Returns the `Environment` currently used by the UI. - pub const fn environment(&self) -> ui::Environment { - self.ui.environment - } - /// Sets the in-memory current state and notifies the UI about the change. /// /// If the new state is a `Track`, this will also update the bookmarked flag @@ -115,11 +103,7 @@ impl Player { /// This sets up the audio sink, UI, downloader, bookmarks and persistent /// volume state. The function returns a fully constructed `Player` ready /// to be driven via `run`. - pub async fn init( - args: crate::Args, - environment: ui::Environment, - mixer: &rodio::mixer::Mixer, - ) -> crate::Result { + pub async fn init(args: crate::Args, mixer: &rodio::mixer::Mixer) -> crate::Result { let (tx, rx) = mpsc::channel(8); if args.paused { tx.send(Message::Pause).await?; @@ -137,7 +121,7 @@ impl Player { sink.set_volume(volume.float()); Ok(Self { - ui: ui::Handle::init(tx.clone(), environment, urx, state, &args).await?, + ui: ui::Handle::init(tx.clone(), urx, state, &args).await?, downloader: Downloader::init( args.buffer_size as usize, args.timeout, @@ -153,10 +137,23 @@ impl Player { }) } - /// Persist state that should survive a run (bookmarks and volume). - pub async fn close(&self) -> crate::Result<()> { - self.bookmarks.save().await?; - PersistentVolume::save(self.sink.volume()).await?; + /// Close any outlying processes, as well as persist state that + /// should survive such as bookmarks and volume. + pub async fn close(self) -> crate::Result<()> { + // We should prioritize reporting UI/Downloader errors, + // but still save persistent state before so that if either one fails, + // state is saved. + let saves = ( + self.bookmarks.save().await, + PersistentVolume::save(self.sink.volume()).await, + ); + + self.ui.close().await?; + self.downloader.close().await?; + self.sink.stop(); + + saves.0?; + saves.1?; Ok(()) } @@ -176,7 +173,7 @@ impl Player { /// /// This will return when a `Message::Quit` is received. It handles commands /// coming from the frontend and updates playback/UI state accordingly. - pub async fn run(mut self) -> crate::Result<()> { + pub async fn run(&mut self) -> crate::Result<()> { while let Some(message) = self.rx.recv().await { match message { Message::Next | Message::Init | Message::Loaded => { @@ -229,7 +226,6 @@ impl Player { self.ui.mpris.handle(&message).await?; } - self.close().await?; Ok(()) } } diff --git a/src/tasks.rs b/src/tasks.rs new file mode 100644 index 0000000..ed5ddde --- /dev/null +++ b/src/tasks.rs @@ -0,0 +1,47 @@ +use std::{future::Future, mem::MaybeUninit}; + +trait AsyncArrayMap { + async fn async_map(self, f: F) -> [U; N] + where + F: FnMut(T) -> Fut, + Fut: Future; +} + +impl AsyncArrayMap for [T; N] { + async fn async_map(self, mut f: F) -> [U; N] + where + F: FnMut(T) -> Fut, + Fut: Future, + { + let mut out: [MaybeUninit; N] = unsafe { MaybeUninit::uninit().assume_init() }; + + for (i, v) in self.into_iter().enumerate() { + out[i].write(f(v).await); + } + + unsafe { std::mem::transmute_copy(&out) } + } +} + +/// Wrapper around an array of JoinHandles to provide better error reporting & shutdown. +pub struct Tasks(pub [tokio::task::JoinHandle>; S]); + +impl, const S: usize> Tasks { + /// Abort tasks, and report either errors thrown from within each task + /// or from tokio about joining the task. + pub async fn shutdown(self) -> [crate::Result<()>; S] { + self.0 + .async_map(async |handle| { + if !handle.is_finished() { + handle.abort(); + } + + match handle.await { + Ok(Err(error)) => Err(error.into()), + Err(error) if !error.is_cancelled() => Err(crate::Error::JoinError(error)), + Ok(Ok(())) | Err(_) => Ok(()), + } + }) + .await + } +} diff --git a/src/tests/ui.rs b/src/tests/ui.rs index bb34284..545a7da 100644 --- a/src/tests/ui.rs +++ b/src/tests/ui.rs @@ -64,11 +64,11 @@ mod window { #[test] fn new_border_strings() { - let w = Window::new(10, false); + let w = Window::new(10, false, false, false); assert!(w.titlebar.content.starts_with('┌')); assert!(w.statusbar.starts_with('└')); - let w2 = Window::new(5, true); + let w2 = Window::new(5, true, false, false); assert!(w2.titlebar.content.is_empty()); assert!(w2.statusbar.is_empty()); } @@ -79,8 +79,8 @@ mod window { #[test] fn simple() { - let w = Window::new(3, false); - let (render, height) = w.render(vec![String::from("abc")], false, true).unwrap(); + let w = Window::new(3, false, false, false); + let (render, height) = w.render(vec![String::from("abc")]).unwrap(); const MIDDLE: &str = "─────"; assert_eq!(format!("┌{MIDDLE}┐\n{}\n└{MIDDLE}┘", sided("abc")), render); @@ -89,13 +89,13 @@ mod window { #[test] fn spaced() { - let w = Window::new(3, false); + let w = Window::new(3, false, true, false); let (render, height) = w - .render( - vec![String::from("abc"), String::from(" b"), String::from("c")], - true, - true, - ) + .render(vec![ + String::from("abc"), + String::from(" b"), + String::from("c"), + ]) .unwrap(); const MIDDLE: &str = "─────"; @@ -113,7 +113,7 @@ mod window { #[test] fn zero_width_window() { - let w = Window::new(0, false); + let w = Window::new(0, false, false, false); assert!(!w.titlebar.content.is_empty()); } } @@ -224,25 +224,3 @@ mod interface { ); } } - -#[cfg(test)] -mod environment { - use crate::ui::Environment; - - #[test] - fn ready_and_cleanup_no_panic() { - // Try to create the environment but don't fail the test if the - // terminal isn't available. We just assert the API exists. - if let Ok(env) = Environment::ready(false) { - // cleanup should succeed - let _ = env.cleanup(true); - } - } - - #[test] - fn ready_with_alternate_screen() { - if let Ok(env) = Environment::ready(true) { - let _ = env.cleanup(false); - } - } -} diff --git a/src/ui.rs b/src/ui.rs index df1bcf6..32617fd 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use crate::{player::Current, ui, Args}; use tokio::{ sync::{broadcast, mpsc::Sender}, - task::JoinHandle, time::Instant, }; @@ -82,6 +81,14 @@ impl State { volume_timer: None, } } + + /// Takes care of small updates, like resetting the volume timer. + pub fn tick(&mut self) { + let expired = |timer: Instant| timer.elapsed() > std::time::Duration::from_secs(1); + if self.volume_timer.is_some_and(expired) { + self.volume_timer = None; + } + } } /// A UI update sent out by the main player thread, which may @@ -98,54 +105,43 @@ pub enum Update { Quit, } -/// Just a simple wrapper for the two primary tasks that the UI -/// requires to function. -#[derive(Debug)] -struct Tasks { - /// The renderer, responsible for sending output to `stdout`. - render: JoinHandle>, - - /// The input, which receives data from `stdin` via [`crossterm`]. - input: JoinHandle>, -} - -impl Tasks { - /// Actually takes care of spawning the tasks for the [`ui`]. - pub fn spawn( - tx: Sender, - updater: broadcast::Receiver, - state: State, - params: interface::Params, - ) -> Self { - Self { - render: tokio::spawn(Handle::ui(updater, state, params)), - input: tokio::spawn(input::listen(tx)), - } - } -} - -impl Drop for Tasks { - fn drop(&mut self) { - self.input.abort(); - self.render.abort(); - } -} - /// The UI handle for controlling the state of the UI, as well as /// updating MPRIS information and other small interfacing tasks. pub struct Handle { - /// The terminal environment, which can be used for cleanup. - pub(crate) environment: Environment, - /// The MPRIS server, which is more or less a handle to the actual MPRIS thread. #[cfg(feature = "mpris")] pub mpris: mpris::Server, /// The UI's running tasks. - _tasks: Option, + tasks: Option>, } impl Handle { + /// Actually takes care of spawning the tasks for the UI. + fn spawn( + tx: Sender, + updater: broadcast::Receiver, + state: State, + params: interface::Params, + ) -> crate::Tasks { + crate::Tasks([ + tokio::spawn(Handle::ui(updater, state, params)), + tokio::spawn(input::listen(tx)), + ]) + } + + /// Shuts down the UI tasks, returning any encountered errors. + pub async fn close(self) -> crate::Result<()> { + let Some(tasks) = self.tasks else { + return Ok(()); + }; + for result in tasks.shutdown().await { + result? + } + + Ok(()) + } + /// The main UI process, which will both render the UI to the terminal /// and also update state. /// @@ -159,7 +155,7 @@ impl Handle { mut state: State, params: interface::Params, ) -> Result<()> { - let mut interface = Interface::new(params); + let mut interface = Interface::new(params)?; loop { if let Ok(message) = updater.try_recv() { @@ -171,7 +167,8 @@ impl Handle { } } - interface.draw(&mut state).await?; + interface.draw(&state).await?; + state.tick(); } Ok(()) @@ -181,7 +178,6 @@ impl Handle { #[allow(clippy::unused_async)] pub async fn init( tx: Sender, - environment: Environment, updater: broadcast::Receiver, state: State, args: &Args, @@ -191,10 +187,9 @@ impl Handle { Ok(Self { #[cfg(feature = "mpris")] mpris: mpris::Server::new(state.clone(), tx.clone(), updater.resubscribe()).await?, - environment, - _tasks: params + tasks: params .enabled - .then(|| Tasks::spawn(tx, updater, state, params)), + .then(|| Self::spawn(tx, updater, state, params)), }) } } diff --git a/src/ui/interface.rs b/src/ui/interface.rs index 36da665..f316c4a 100644 --- a/src/ui/interface.rs +++ b/src/ui/interface.rs @@ -2,7 +2,7 @@ use crate::{ ui::{self, State}, Args, }; -use std::{env, time::Duration}; +use std::{env, io::stdout, time::Duration}; pub mod clock; pub mod components; @@ -96,21 +96,21 @@ pub struct Interface { impl Default for Interface { #[inline] fn default() -> Self { - Self::new(Params::default()) + Self::new(Params::default()).unwrap() } } impl Interface { /// Creates a new interface. - pub fn new(params: Params) -> Self { - let mut window = Window::new(params.width, params.borderless); + pub fn new(params: Params) -> ui::Result { + let mut window = Window::new(params.width, params.borderless, false, true); - Self { + Ok(Self { clock: params.clock.then(|| Clock::new(&mut window)), interval: tokio::time::interval(params.delta), window, params, - } + }) } /// Creates a full "menu" from the [`ui::State`], which can be @@ -118,20 +118,16 @@ impl Interface { /// /// The menu really is just a [`Vec`] of the different components, /// with padding already added. - pub(crate) fn menu(&self, state: &mut State) -> Vec { + pub(crate) fn menu(&self, state: &State) -> Vec { let action = components::action(state, self.params.width); - let middle = match state.volume_timer { - Some(timer) => { - let volume = state.sink.volume(); - let percentage = format!("{}%", (volume * 100.0).round().abs()); - if timer.elapsed() > Duration::from_secs(1) { - state.volume_timer = None; - } + let middle = if state.volume_timer.is_some() { + let volume = state.sink.volume(); + let percentage = format!("{}%", (volume * 100.0).round().abs()); - components::audio_bar(self.params.width - 17, volume, &percentage) - } - None => components::progress_bar(state, self.params.width - 16), + components::audio_bar(self.params.width - 17, volume, &percentage) + } else { + components::progress_bar(state, self.params.width - 16) }; let controls = components::controls(self.params.width); @@ -144,9 +140,11 @@ impl Interface { /// Draws the terminal. This will also wait for the specified /// delta to pass before completing. - pub async fn draw(&mut self, state: &mut State) -> super::Result<()> { + pub async fn draw(&mut self, state: &State) -> super::Result<()> { self.clock.as_mut().map(|x| x.update(&mut self.window)); - self.window.draw(self.menu(state), false)?; + + let menu = self.menu(state); + self.window.draw(stdout().lock(), menu)?; self.interval.tick().await; Ok(()) diff --git a/src/ui/interface/window.rs b/src/ui/interface/window.rs index 3503028..cff7084 100644 --- a/src/ui/interface/window.rs +++ b/src/ui/interface/window.rs @@ -1,5 +1,3 @@ -use std::io::{stdout, Stdout}; - use crate::ui::{self, interface::TitleBar}; use crossterm::{ cursor::{MoveToColumn, MoveUp}, @@ -26,8 +24,11 @@ pub struct Window { /// The inner width of the window. width: usize, - /// The output, currently just an [`Stdout`]. - out: Stdout, + /// Whether content items should be automatically padded (spaced). + spaced: bool, + + /// Whether to cautiously handle ANSI sequences by adding [`style::Attribute::Reset`] generously. + fancy: bool, } impl Window { @@ -35,7 +36,7 @@ impl Window { /// /// * `width` - Inner width of the window. /// * `borderless` - Whether to include borders in the window, or not. - pub fn new(width: usize, borderless: bool) -> Self { + pub fn new(width: usize, borderless: bool, spaced: bool, fancy: bool) -> Self { let statusbar = if borderless { String::new() } else { @@ -44,11 +45,12 @@ impl Window { }; Self { + spaced, statusbar, borderless, width, + fancy, titlebar: TitleBar::new(width, borderless), - out: stdout(), } } @@ -59,27 +61,22 @@ impl Window { /// /// This returns both the final rendered window and also the full /// height of the rendered window. - pub(crate) fn render( - &self, - content: Vec, - space: bool, - testing: bool, - ) -> ui::Result<(String, u16)> { - let linefeed = if testing { "\n" } else { "\r\n" }; + pub(crate) fn render(&self, content: Vec) -> ui::Result<(String, u16)> { + const NEWLINE: &str = "\r\n"; let len: u16 = content.len().try_into()?; // Note that this will have a trailing newline, which we use later. let menu: String = content.into_iter().fold(String::new(), |mut output, x| { // Horizontal Padding & Border let padding = if self.borderless { " " } else { "│" }; - let space = if space { + let space = if self.spaced { " ".repeat(self.width.saturating_sub(x.graphemes(true).count())) } else { String::new() }; - let center = if testing { x } else { x.reset().to_string() }; - write!(output, "{padding} {center}{space} {padding}{linefeed}").unwrap(); + let center = if self.fancy { x.reset().to_string() } else { x }; + write!(output, "{padding} {center}{space} {padding}{NEWLINE}").unwrap(); output }); @@ -94,7 +91,7 @@ impl Window { // There's no need for another newline after the main menu content, because it already has one. Ok(( format!( - "{}{linefeed}{menu}{}{suffix}", + "{}{NEWLINE}{menu}{}{suffix}", self.titlebar.content, self.statusbar, ), height, @@ -102,11 +99,15 @@ impl Window { } /// Actually draws the window, with each element in `content` being on a new line. - pub fn draw(&mut self, content: Vec, space: bool) -> ui::Result<()> { - let (rendered, height) = self.render(content, space, false)?; + pub fn draw( + &mut self, + mut writer: impl std::io::Write, + content: Vec, + ) -> ui::Result<()> { + let (rendered, height) = self.render(content)?; crossterm::execute!( - self.out, + writer, Clear(ClearType::FromCursorDown), MoveToColumn(0), Print(rendered),