From 6fadfe6304ba169099784da732fea700be85b339 Mon Sep 17 00:00:00 2001 From: Tal <83217276+talwat@users.noreply.github.com> Date: Wed, 2 Jul 2025 19:36:53 +0200 Subject: [PATCH] chore: restructure and clean up --- src/play.rs | 9 +- src/player.rs | 131 +++-------------------------- src/player/audio.rs | 42 +++++++++ src/{tracks => player}/bookmark.rs | 0 src/player/downloader.rs | 58 ++++++------- src/player/queue.rs | 81 ++++++++++++++++++ src/player/ui.rs | 29 +++++-- src/player/ui/input.rs | 7 +- src/tracks.rs | 3 +- 9 files changed, 194 insertions(+), 166 deletions(-) create mode 100644 src/player/audio.rs rename src/{tracks => player}/bookmark.rs (100%) create mode 100644 src/player/queue.rs diff --git a/src/play.rs b/src/play.rs index 0272c0c..fc2c31e 100644 --- a/src/play.rs +++ b/src/play.rs @@ -119,14 +119,7 @@ pub async fn play(args: Args) -> eyre::Result<()> { tx.send(Messages::Init).await?; // Actually starts the player. - Player::play( - Arc::clone(&player), - tx.clone(), - rx, - args.buffer_size, - args.debug, - ) - .await?; + Player::play(Arc::clone(&player), tx.clone(), rx, args.debug).await?; // Save the volume.txt file for the next session. PersistentVolume::save(player.sink.volume()).await?; diff --git a/src/player.rs b/src/player.rs index 670e03d..9ed8e34 100644 --- a/src/player.rs +++ b/src/player.rs @@ -22,7 +22,6 @@ use tokio::{ RwLock, }, task, - time::sleep, }; #[cfg(feature = "mpris")] @@ -31,11 +30,14 @@ use mpris_server::{PlaybackStatus, PlayerInterface, Property}; use crate::{ messages::Messages, play::{PersistentVolume, SendableOutputStream}, - tracks::{self, bookmark, list::List}, + tracks::{self, list::List}, Args, }; +pub mod audio; +pub mod bookmark; pub mod downloader; +pub mod queue; pub mod ui; #[cfg(feature = "mpris")] @@ -55,6 +57,9 @@ pub struct Player { /// [rodio]'s [`Sink`] which can control playback. pub sink: Sink, + /// The internal buffer size. + pub buffer_size: usize, + /// Whether the current track has been bookmarked. bookmarked: AtomicBool, @@ -85,46 +90,6 @@ pub struct Player { } impl Player { - /// This gets the output stream while also shutting up alsa with [libc]. - /// Uses raw libc calls, and therefore is functional only on Linux. - #[cfg(target_os = "linux")] - fn silent_get_output_stream() -> eyre::Result<(OutputStream, OutputStreamHandle)> { - use libc::freopen; - use std::ffi::CString; - - // Get the file descriptor to stderr from libc. - extern "C" { - static stderr: *mut libc::FILE; - } - - // This is a bit of an ugly hack that basically just uses `libc` to redirect alsa's - // output to `/dev/null` so that it wont be shoved down our throats. - - // The mode which to redirect terminal output with. - let mode = CString::new("w")?; - - // First redirect to /dev/null, which basically silences alsa. - let null = CString::new("/dev/null")?; - - // SAFETY: Simple enough to be impossible to fail. Hopefully. - unsafe { - freopen(null.as_ptr(), mode.as_ptr(), stderr); - } - - // Make the OutputStream while stderr is still redirected to /dev/null. - let (stream, handle) = OutputStream::try_default()?; - - // Redirect back to the current terminal, so that other output isn't silenced. - let tty = CString::new("/dev/tty")?; - - // SAFETY: See the first call to `freopen`. - unsafe { - freopen(tty.as_ptr(), mode.as_ptr(), stderr); - } - - Ok((stream, handle)) - } - /// Just a shorthand for setting `current`. fn set_current(&self, info: tracks::Info) { self.current.store(Some(Arc::new(info))); @@ -153,7 +118,7 @@ impl Player { // We should only shut up alsa forcefully on Linux if we really have to. #[cfg(target_os = "linux")] let (stream, handle) = if !args.alternate && !args.debug { - Self::silent_get_output_stream()? + audio::silent_get_output_stream()? } else { OutputStream::try_default()? }; @@ -178,6 +143,7 @@ impl Player { let player = Self { tracks: RwLock::new(VecDeque::with_capacity(args.buffer_size)), + buffer_size: args.buffer_size, current: ArcSwapOption::new(None), client, sink, @@ -190,80 +156,6 @@ impl Player { Ok((player, SendableOutputStream(stream))) } - /// This will play the next track, as well as refilling the buffer in the background. - /// - /// This will also set `current` to the newly loaded song. - pub async fn next(&self) -> Result { - // TODO: Consider replacing this with `unwrap_or_else` when async closures are stablized. - let track = self.tracks.write().await.pop_front(); - let track = if let Some(track) = track { - track - } else { - // If the queue is completely empty, then fallback to simply getting a new track. - // This is relevant particularly at the first song. - - // Serves as an indicator that the queue is "loading". - // We're doing it here so that we don't get the "loading" display - // for only a frame in the other case that the buffer is not empty. - self.current.store(None); - self.list.random(&self.client).await? - }; - - let decoded = track.decode()?; - - // Set the current track. - self.set_current(decoded.info.clone()); - - Ok(decoded) - } - - /// This basically just calls [`Player::next`], and then appends the new track to the player. - /// - /// This also notifies the background thread to get to work, and will send `TryAgain` - /// if it fails. This functions purpose is to be called in the background, so that - /// when the audio server recieves a `Next` signal it will still be able to respond to other - /// signals while it's loading. - /// - /// This also sends the `NewSong` signal to `tx` apon successful completion. - async fn handle_next( - player: Arc, - itx: Sender<()>, - tx: Sender, - debug: bool, - ) -> eyre::Result<()> { - // Stop the sink. - player.sink.stop(); - - let track = player.next().await; - - match track { - Ok(track) => { - // Start playing the new track. - player.sink.append(track.data); - - // Notify the background downloader that there's an empty spot - // in the buffer. - Downloader::notify(&itx).await?; - - // Notify the audio server that the next song has actually been downloaded. - tx.send(Messages::NewSong).await?; - } - Err(error) => { - if !error.is_timeout() { - if debug { - panic!("{:?}", error) - } - - sleep(TIMEOUT).await; - } - - tx.send(Messages::TryAgain).await?; - } - }; - - Ok(()) - } - /// This is the main "audio server". /// /// `rx` & `tx` are used to communicate with it, for example when to @@ -275,7 +167,6 @@ impl Player { player: Arc, tx: Sender, mut rx: Receiver, - buf_size: usize, debug: bool, ) -> eyre::Result<()> { // Initialize the mpris player. @@ -292,7 +183,7 @@ impl Player { })?; // `itx` is used to notify the `Downloader` when it needs to download new tracks. - let downloader = Downloader::new(Arc::clone(&player), buf_size); + let downloader = Downloader::new(Arc::clone(&player)); let (itx, downloader) = downloader.start(debug); // Start buffering tracks immediately. @@ -345,7 +236,7 @@ impl Player { // Handle the rest of the signal in the background, // as to not block the main audio server thread. - task::spawn(Self::handle_next( + task::spawn(Self::next( Arc::clone(&player), itx.clone(), tx.clone(), diff --git a/src/player/audio.rs b/src/player/audio.rs new file mode 100644 index 0000000..5942b6c --- /dev/null +++ b/src/player/audio.rs @@ -0,0 +1,42 @@ +#[cfg(target_os = "linux")] +use rodio::{OutputStream, OutputStreamHandle}; + +/// This gets the output stream while also shutting up alsa with [libc]. +/// Uses raw libc calls, and therefore is functional only on Linux. +#[cfg(target_os = "linux")] +pub fn silent_get_output_stream() -> eyre::Result<(OutputStream, OutputStreamHandle)> { + use libc::freopen; + use std::ffi::CString; + + // Get the file descriptor to stderr from libc. + extern "C" { + static stderr: *mut libc::FILE; + } + + // This is a bit of an ugly hack that basically just uses `libc` to redirect alsa's + // output to `/dev/null` so that it wont be shoved down our throats. + + // The mode which to redirect terminal output with. + let mode = CString::new("w")?; + + // First redirect to /dev/null, which basically silences alsa. + let null = CString::new("/dev/null")?; + + // SAFETY: Simple enough to be impossible to fail. Hopefully. + unsafe { + freopen(null.as_ptr(), mode.as_ptr(), stderr); + } + + // Make the OutputStream while stderr is still redirected to /dev/null. + let (stream, handle) = OutputStream::try_default()?; + + // Redirect back to the current terminal, so that other output isn't silenced. + let tty = CString::new("/dev/tty")?; + + // SAFETY: See the first call to `freopen`. + unsafe { + freopen(tty.as_ptr(), mode.as_ptr(), stderr); + } + + Ok((stream, handle)) +} diff --git a/src/tracks/bookmark.rs b/src/player/bookmark.rs similarity index 100% rename from src/tracks/bookmark.rs rename to src/player/bookmark.rs diff --git a/src/player/downloader.rs b/src/player/downloader.rs index 48f86a2..5b3407e 100644 --- a/src/player/downloader.rs +++ b/src/player/downloader.rs @@ -24,9 +24,6 @@ pub struct Downloader { /// A copy of the internal sender, which can be useful for keeping /// track of it. tx: Sender<()>, - - /// The size of the internal download buffer. - buf_size: usize, } impl Downloader { @@ -40,40 +37,41 @@ impl Downloader { /// /// This also sends a [`Sender`] which can be used to notify /// when the downloader needs to begin downloading more tracks. - pub fn new(player: Arc, buf_size: usize) -> Self { + pub fn new(player: Arc) -> Self { let (tx, rx) = mpsc::channel(8); - Self { - player, - rx, - tx, - buf_size, + Self { player, rx, tx } + } + + /// Push a new, random track onto the internal buffer. + pub async fn push_buffer(&self, debug: bool) { + let data = self.player.list.random(&self.player.client).await; + match data { + Ok(track) => self.player.tracks.write().await.push_back(track), + Err(error) if !error.is_timeout() => { + if debug { + panic!("{}", error) + } + + sleep(TIMEOUT).await; + } + _ => {} } } /// Actually starts & consumes the [Downloader]. pub fn start(mut self, debug: bool) -> (Sender<()>, JoinHandle<()>) { - ( - self.tx, - task::spawn(async move { - // Loop through each update notification. - while self.rx.recv().await == Some(()) { - // For each update notification, we'll push tracks until the buffer is completely full. - while self.player.tracks.read().await.len() < self.buf_size { - let data = self.player.list.random(&self.player.client).await; - match data { - Ok(track) => self.player.tracks.write().await.push_back(track), - Err(error) if !error.is_timeout() => { - if debug { - panic!("{}", error) - } + let tx = self.tx.clone(); - sleep(TIMEOUT).await; - } - _ => {} - } - } + let handle = task::spawn(async move { + // Loop through each update notification. + while self.rx.recv().await == Some(()) { + // For each update notification, we'll push tracks until the buffer is completely full. + while self.player.tracks.read().await.len() < self.player.buffer_size { + self.push_buffer(debug).await; } - }), - ) + } + }); + + return (tx, handle); } } diff --git a/src/player/queue.rs b/src/player/queue.rs new file mode 100644 index 0000000..6744c4d --- /dev/null +++ b/src/player/queue.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; +use tokio::{sync::mpsc::Sender, time::sleep}; + +use crate::{ + messages::Messages, + player::{downloader::Downloader, Player, TIMEOUT}, + tracks, +}; + +impl Player { + /// Fetches the next track from the queue, or a random track if the queue is empty. + /// This will also set the current track to the fetched track's info. + async fn fetch(&self) -> Result { + // TODO: Consider replacing this with `unwrap_or_else` when async closures are stablized. + let track = self.tracks.write().await.pop_front(); + let track = if let Some(track) = track { + track + } else { + // If the queue is completely empty, then fallback to simply getting a new track. + // This is relevant particularly at the first song. + + // Serves as an indicator that the queue is "loading". + // We're doing it here so that we don't get the "loading" display + // for only a frame in the other case that the buffer is not empty. + self.current.store(None); + self.list.random(&self.client).await? + }; + + let decoded = track.decode()?; + + // Set the current track. + self.set_current(decoded.info.clone()); + + Ok(decoded) + } + + /// Gets, decodes, and plays the next track in the queue while also handling the downloader. + /// + /// This functions purpose is to be called in the background, so that when the audio server recieves a + /// `Next` signal it will still be able to respond to other signals while it's loading. + /// + /// This also sends the either a `NewSong` or `TryAgain` signal to `tx`. + pub async fn next( + player: Arc, + itx: Sender<()>, + tx: Sender, + debug: bool, + ) -> eyre::Result<()> { + // Stop the sink. + player.sink.stop(); + + let track = player.fetch().await; + + match track { + Ok(track) => { + // Start playing the new track. + player.sink.append(track.data); + + // Notify the background downloader that there's an empty spot + // in the buffer. + Downloader::notify(&itx).await?; + + // Notify the audio server that the next song has actually been downloaded. + tx.send(Messages::NewSong).await?; + } + Err(error) => { + if !error.is_timeout() { + if debug { + panic!("{:?}", error) + } + + sleep(TIMEOUT).await; + } + + tx.send(Messages::TryAgain).await?; + } + }; + + Ok(()) + } +} diff --git a/src/player/ui.rs b/src/player/ui.rs index 1a7768d..00b6879 100644 --- a/src/player/ui.rs +++ b/src/player/ui.rs @@ -28,6 +28,7 @@ use crossterm::{ }; use lazy_static::lazy_static; +use thiserror::Error; use tokio::{sync::mpsc::Sender, task, time::sleep}; use unicode_segmentation::UnicodeSegmentation; @@ -36,6 +37,20 @@ use super::{Messages, Player}; mod components; mod input; +/// The error type for the UI, which is used to handle errors that occur +/// while drawing the UI or handling input. +#[derive(Debug, Error)] +pub enum UIError { + #[error("unable to convert number")] + Conversion(#[from] std::num::TryFromIntError), + + #[error("unable to write output")] + Write(#[from] std::io::Error), + + #[error("sending message to backend failed")] + Communication(#[from] tokio::sync::mpsc::error::SendError), +} + /// How long the audio bar will be visible for when audio is adjusted. /// This is in frames. const AUDIO_BAR_DURATION: usize = 10; @@ -100,7 +115,7 @@ impl Window { } /// Actually draws the window, with each element in `content` being on a new line. - pub fn draw(&mut self, content: Vec, space: bool) -> eyre::Result<()> { + pub fn draw(&mut self, content: Vec, space: bool) -> eyre::Result<(), UIError> { let len: u16 = content.len().try_into()?; // Note that this will have a trailing newline, which we use later. @@ -151,7 +166,7 @@ async fn interface( borderless: bool, fps: u8, width: usize, -) -> eyre::Result<()> { +) -> eyre::Result<(), UIError> { let mut window = Window::new(width, borderless); loop { @@ -207,7 +222,7 @@ pub struct Environment { impl Environment { /// This prepares the terminal, returning an [Environment] helpful /// for cleaning up afterwards. - pub fn ready(alternate: bool) -> eyre::Result { + pub fn ready(alternate: bool) -> eyre::Result { let mut lock = stdout().lock(); crossterm::execute!(lock, Hide)?; @@ -234,7 +249,7 @@ impl Environment { /// Uses the information collected from initialization to safely close down /// the terminal & restore it to it's previous state. - pub fn cleanup(&self) -> eyre::Result<()> { + pub fn cleanup(&self) -> eyre::Result<(), UIError> { let mut lock = stdout().lock(); if self.alternate { @@ -267,7 +282,11 @@ impl Drop for Environment { /// /// `alternate` controls whether to use [`EnterAlternateScreen`] in order to hide /// previous terminal history. -pub async fn start(player: Arc, sender: Sender, args: Args) -> eyre::Result<()> { +pub async fn start( + player: Arc, + sender: Sender, + args: Args, +) -> eyre::Result<(), UIError> { let environment = Environment::ready(args.alternate)?; let interface = task::spawn(interface( Arc::clone(&player), diff --git a/src/player/ui/input.rs b/src/player/ui/input.rs index 6b021de..6b3d0a9 100644 --- a/src/player/ui/input.rs +++ b/src/player/ui/input.rs @@ -5,10 +5,13 @@ use crossterm::event::{self, EventStream, KeyCode, KeyEventKind, KeyModifiers}; use futures::{FutureExt as _, StreamExt as _}; use tokio::sync::mpsc::Sender; -use crate::player::{ui, Messages}; +use crate::player::{ + ui::{self, UIError}, + Messages, +}; /// Starts the listener to recieve input from the terminal for various events. -pub async fn listen(sender: Sender) -> eyre::Result<()> { +pub async fn listen(sender: Sender) -> eyre::Result<(), UIError> { let mut reader = EventStream::new(); loop { diff --git a/src/tracks.rs b/src/tracks.rs index 8ef9f29..1c33cff 100644 --- a/src/tracks.rs +++ b/src/tracks.rs @@ -25,9 +25,10 @@ use tokio::io; use unicode_segmentation::UnicodeSegmentation; use url::form_urlencoded; -pub mod bookmark; pub mod list; +/// The error type for the track system, which is used to handle errors that occur +/// while downloading, decoding, or playing tracks. #[derive(Debug, Error)] pub enum TrackError { #[error("timeout")]