diff --git a/Cargo.lock b/Cargo.lock index 04ff2fd..091133d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1350,7 +1350,7 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "lowfi" -version = "2.0.0" +version = "2.0.1" dependencies = [ "arc-swap", "bytes", diff --git a/Cargo.toml b/Cargo.toml index be7302c..50aba1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lowfi" -version = "2.0.0" +version = "2.0.1" rust-version = "1.83.0" edition = "2021" description = "An extremely simple lofi player." diff --git a/src/main.rs b/src/main.rs index f71fc78..4de1758 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,11 +130,15 @@ async fn main() -> eyre::Result<()> { let stream = audio::stream()?; let environment = ui::Environment::ready(&args)?; - let (mut player, mut tasks) = Player::init(args, stream.mixer()) + let (mut player, tasks) = Player::init(args, stream.mixer()) .await .inspect_err(|_| environment.cleanup(false).unwrap())?; - let result = tasks.wait(player.run()).await; + let result = tokio::select! { + r = player.run() => r, + r = tasks => r, + }; + environment.cleanup(result.is_ok())?; player.close().await?; diff --git a/src/tasks.rs b/src/tasks.rs index 61926a6..2bac72d 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -3,15 +3,15 @@ //! This file aims to abstract a lot of potentially annoying Rust async logic, which may be //! subject to change. -use futures_util::TryFutureExt; -use std::future::Future; -use tokio::{select, sync::mpsc, task::JoinSet}; +use futures_util::{FutureExt, TryFutureExt}; +use std::{future::Future, pin::Pin, task::Poll}; +use tokio::{sync::mpsc, task::JoinHandle}; /// Handles all of the processes within lowfi. /// This entails initializing/closing tasks, and handling any potential errors that arise. pub struct Tasks { - /// The [`JoinSet`], which contains all of the task handles. - pub set: JoinSet>, + /// A simple [`Vec`] of [`JoinHandle`]s. + pub handles: Vec>>, /// A sender, which is kept for convenience to be used when /// initializing various other tasks. @@ -20,45 +20,39 @@ pub struct Tasks { impl Tasks { /// Creates a new task manager. - pub fn new(tx: mpsc::Sender) -> Self { + pub const fn new(tx: mpsc::Sender) -> Self { Self { tx, - set: JoinSet::new(), + handles: Vec::new(), } } - /// Processes a task, and adds it to the internal [`JoinSet`]. + /// Processes a task, and adds it to the internal buffer. pub fn spawn + Send + Sync + 'static>( &mut self, future: impl Future> + Send + 'static, ) { - self.set.spawn(future.map_err(Into::into)); + self.handles.push(tokio::spawn(future.map_err(Into::into))); } /// Gets a copy of the internal [`mpsc::Sender`]. pub fn tx(&self) -> mpsc::Sender { self.tx.clone() } +} - /// Actively polls all of the handles previously added. - /// - /// An additional `runner` is for the main player future, which - /// can't be added as a "task" because it shares data with the - /// main thread. - /// - /// This either returns when the runner completes, or if an error occurs - /// in any of the internally held tasks. - pub async fn wait( - &mut self, - runner: impl Future> + std::marker::Send, - ) -> crate::Result<()> { - select! { - result = runner => result, - Some(result) = self.set.join_next() => match result { - Ok(res) => res, - Err(e) if !e.is_cancelled() => Err(crate::Error::JoinError(e)), - Err(_) => Ok(()), +impl Future for Tasks { + type Output = crate::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + for handle in &mut self.get_mut().handles { + match handle.poll_unpin(cx) { + Poll::Ready(Ok(x)) => return Poll::Ready(x), + Poll::Ready(Err(x)) => return Poll::Ready(Err(crate::Error::JoinError(x))), + Poll::Pending => (), } } + + Poll::Pending } }