diff --git a/Cargo.lock b/Cargo.lock index 607f66b..0636c11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayvec" version = "0.7.6" @@ -1342,9 +1348,9 @@ dependencies = [ [[package]] name = "indenter" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" +checksum = "964de6e86d545b246d84badc0fef527924ace5134f30641c203ef52ba83f58d5" [[package]] name = "indexmap" @@ -1485,6 +1491,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" name = "lowfi" version = "1.7.2" dependencies = [ + "arc-swap", "bytes", "clap", "convert_case 0.8.0", diff --git a/Cargo.toml b/Cargo.toml index d6222bb..4f9f2bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ thiserror = "2.0.12" # Async tokio = { version = "1.41.1", features = ["macros", "rt-multi-thread", "fs"], default-features = false } +arc-swap = "1.7.1" futures = "0.3.31" # Data diff --git a/src/audio.rs b/src/audio.rs index 7f93526..bd72aca 100644 --- a/src/audio.rs +++ b/src/audio.rs @@ -1,3 +1,12 @@ +use std::{ + sync::{atomic::AtomicBool, Arc}, + thread::sleep, + time::Duration, +}; + +use rodio::Sink; +use tokio::sync::mpsc; + /// This gets the output stream while also shutting up alsa with [libc]. /// Uses raw libc calls, and therefore is functional only on Linux. #[cfg(target_os = "linux")] @@ -38,3 +47,19 @@ pub fn silent_get_output_stream() -> eyre::Result, tx: mpsc::Sender) -> crate::Result<()> { + loop { + sleep(Duration::from_millis(100)); + sink.sleep_until_end(); + + if LISTEN.load(std::sync::atomic::Ordering::Relaxed) { + tx.blocking_send(crate::Message::Next)?; + } + } +} diff --git a/src/error.rs b/src/error.rs index 1edafe0..e198752 100644 --- a/src/error.rs +++ b/src/error.rs @@ -46,14 +46,4 @@ pub enum Error { #[error("ui failure")] UI(#[from] ui::Error), - - #[cfg(feature = "mpris")] - #[error("mpris bus error")] - ZBus(#[from] mpris_server::zbus::Error), - - // TODO: This has a terrible error message, mainly because I barely understand - // what this error even represents. What does fdo mean?!?!? Why, MPRIS!?!? - #[cfg(feature = "mpris")] - #[error("mpris fdo (zbus interface) error")] - Fdo(#[from] mpris_server::zbus::fdo::Error), } diff --git a/src/main.rs b/src/main.rs index 73b8560..774c14a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,12 +15,12 @@ pub mod audio; pub mod bookmark; pub mod download; pub mod player; -#[allow(clippy::all, clippy::pedantic, clippy::nursery, clippy::restriction)] -#[cfg(feature = "scrape")] -mod scrapers; pub mod tracks; pub mod volume; +#[cfg(feature = "scrape")] +mod scrapers; + #[cfg(feature = "scrape")] use crate::scrapers::Source; @@ -108,7 +108,11 @@ async fn main() -> eyre::Result<()> { } } else { let player = Player::init(args).await?; - player.run().await?; + let environment = player.environment(); + let result = player.run().await; + + environment.cleanup(result.is_ok())?; + result?; }; Ok(()) diff --git a/src/message.rs b/src/message.rs index 8ac7d8f..8be1224 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,17 +1,22 @@ /// Handles communication between different parts of the program. +#[allow(dead_code, reason = "this code may not be dead depending on features")] #[derive(PartialEq, Debug, Clone)] pub enum Message { - /// Notifies the audio server that it should update the track. + /// Deliberate user request to go to the next song. Next, - /// When a track is loaded after a caller previously being told to wait. + /// Sent by the audio waiter whenever it believes a track has ended. + End, + + /// When a track is loaded after the caller previously being told to wait. + /// If a track is taken from the queue, then there is no waiting, so this + /// is never actually sent. Loaded, /// Similar to Next, but specific to the first track. Init, /// Unpause the [Sink]. - #[allow(dead_code, reason = "this code may not be dead depending on features")] Play, /// Pauses the [Sink]. @@ -23,6 +28,9 @@ pub enum Message { /// Change the volume of playback. ChangeVolume(f32), + /// Set the volume of playback, rather than changing it. + SetVolume(f32), + /// Bookmark the current track. Bookmark, diff --git a/src/player.rs b/src/player.rs index 527b5ad..a0216f3 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,11 +1,15 @@ use std::sync::Arc; -use tokio::sync::{ - broadcast, - mpsc::{self, Receiver, Sender}, +use tokio::{ + sync::{ + broadcast, + mpsc::{self, Receiver}, + }, + task::JoinHandle, }; use crate::{ + audio, bookmark::Bookmarks, download::{self, Downloader}, tracks::{self, List}, @@ -20,26 +24,36 @@ pub enum Current { Track(tracks::Info), } +impl Current { + pub fn loading(&self) -> bool { + return matches!(self, Current::Loading(_)); + } +} + pub struct Player { downloader: download::Handle, - volume: PersistentVolume, bookmarks: Bookmarks, sink: Arc, rx: Receiver, broadcast: broadcast::Sender, current: Current, - _ui: ui::Handle, - _tx: Sender, + ui: ui::Handle, + waiter: JoinHandle>, _stream: rodio::OutputStream, } impl Drop for Player { fn drop(&mut self) { self.sink.stop(); + self.waiter.abort(); } } impl Player { + pub fn environment(&self) -> ui::Environment { + self.ui.environment + } + pub async fn set_current(&mut self, current: Current) -> crate::Result<()> { self.current = current.clone(); self.update(ui::Update::Track(current)).await?; @@ -61,7 +75,7 @@ impl Player { pub async fn init(args: crate::Args) -> crate::Result { #[cfg(target_os = "linux")] - let mut stream = audio::silent_get_output_stream()?; + let mut stream = crate::audio::silent_get_output_stream()?; #[cfg(not(target_os = "linux"))] let mut stream = rodio::OutputStreamBuilder::open_default_stream()?; stream.log_on_drop(false); @@ -72,15 +86,18 @@ impl Player { let (utx, urx) = broadcast::channel(8); let current = Current::Loading(download::progress()); - let state = ui::State::initial(sink.clone(), &args, current.clone()); + let list = List::load(args.track_list.as_ref()).await?; + let state = ui::State::initial(sink.clone(), &args, current.clone(), list.name.clone()); let ui = ui::Handle::init(tx.clone(), urx, state.clone(), &args).await?; let volume = PersistentVolume::load().await?; + sink.set_volume(volume.float()); let bookmarks = Bookmarks::load().await?; - - let list = List::load(args.track_list.as_ref()).await?; let downloader = Downloader::init(args.buffer_size, list, tx.clone()).await; + let clone = sink.clone(); + let waiter = tokio::task::spawn_blocking(move || audio::waiter(clone, tx)); + Ok(Self { current, downloader, @@ -88,16 +105,15 @@ impl Player { rx, sink, bookmarks, - volume, - _ui: ui, + ui, + waiter, _stream: stream, - _tx: tx, }) } pub async fn close(&self) -> crate::Result<()> { self.bookmarks.save().await?; - self.volume.save().await?; + PersistentVolume::save(self.sink.volume() as f32).await?; Ok(()) } @@ -113,26 +129,29 @@ impl Player { pub async fn run(mut self) -> crate::Result<()> { while let Some(message) = self.rx.recv().await { match message { - Message::Next | Message::Init | Message::Loaded => { + Message::Next | Message::Init | Message::Loaded | Message::End => { + if message == Message::Next && self.current.loading() { + continue; + } + + audio::playing(false); self.sink.stop(); + match self.downloader.track().await { download::Output::Loading(progress) => { - self.set_current(Current::Loading(progress)).await? + self.set_current(Current::Loading(progress)).await?; + } + download::Output::Queued(queued) => { + self.play(queued).await?; + audio::playing(true); } - download::Output::Queued(queued) => self.play(queued).await?, }; } Message::Play => { self.sink.play(); - - // #[cfg(feature = "mpris")] - // mpris.playback(PlaybackStatus::Playing).await?; } Message::Pause => { self.sink.pause(); - - // #[cfg(feature = "mpris")] - // mpris.playback(PlaybackStatus::Paused).await?; } Message::PlayPause => { if self.sink.is_paused() { @@ -140,32 +159,43 @@ impl Player { } else { self.sink.pause(); } - - // #[cfg(feature = "mpris")] - // mpris - // .playback(mpris.player().playback_status().await?) - // .await?; } Message::ChangeVolume(change) => { - self.sink.set_volume(self.sink.volume() + change); - - // #[cfg(feature = "mpris")] - // mpris - // .changed(vec![Property::Volume(player.sink.volume().into())]) - // .await?; + self.sink + .set_volume((self.sink.volume() + change).clamp(0.0, 1.0)); + self.update(ui::Update::Volume).await?; + } + Message::SetVolume(set) => { + self.sink.set_volume(set.clamp(0.0, 1.0)); + self.update(ui::Update::Volume).await?; } Message::Bookmark => { let Current::Track(current) = &self.current else { continue; }; - self.bookmarks.bookmark(current).await?; + let bookmarked = self.bookmarks.bookmark(current).await?; + self.update(ui::Update::Bookmarked(bookmarked)).await?; } Message::Quit => break, } + + #[cfg(feature = "mpris")] + match message { + Message::ChangeVolume(_) | Message::SetVolume(_) => { + self.ui.mpris.update_volume().await? + } + Message::Play | Message::Pause | Message::PlayPause => { + self.ui.mpris.update_playback().await? + } + Message::Init | Message::Loaded | Message::Next => { + self.ui.mpris.update_metadata().await? + } + _ => (), + } } - // self.close().await?; + self.close().await?; Ok(()) } } diff --git a/src/ui.rs b/src/ui.rs index 09d3679..4888d09 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::{ player::Current, - ui::{self, environment::Environment, window::Window}, + ui::{self, window::Window}, Args, }; use tokio::{ @@ -12,10 +12,14 @@ use tokio::{ }; mod components; mod environment; +pub use environment::Environment; mod input; mod interface; mod window; +#[cfg(feature = "mpris")] +pub mod mpris; + type Result = std::result::Result; /// The error type for the UI, which is used to handle errors that occur @@ -33,6 +37,14 @@ pub enum Error { #[error("sharing state between backend and frontend failed")] UiSend(#[from] tokio::sync::broadcast::error::SendError), + + #[cfg(feature = "mpris")] + #[error("mpris bus error")] + ZBus(#[from] mpris_server::zbus::Error), + + #[cfg(feature = "mpris")] + #[error("mpris fdo (zbus interface) error")] + Fdo(#[from] mpris_server::zbus::fdo::Error), } #[derive(Clone)] @@ -40,17 +52,19 @@ pub struct State { pub sink: Arc, pub current: Current, pub bookmarked: bool, + list: String, timer: Option, width: usize, } impl State { - pub fn initial(sink: Arc, args: &Args, current: Current) -> Self { + pub fn initial(sink: Arc, args: &Args, current: Current, list: String) -> Self { let width = 21 + args.width.min(32) * 2; Self { width, sink, current, + list, bookmarked: false, timer: None, } @@ -61,6 +75,7 @@ impl State { pub enum Update { Track(Current), Bookmarked(bool), + Volume, Quit, } @@ -70,10 +85,11 @@ struct Tasks { input: JoinHandle>, } -#[derive(Debug)] pub struct Handle { tasks: Tasks, - _environment: Environment, + pub environment: Environment, + #[cfg(feature = "mpris")] + pub mpris: mpris::Server, } impl Drop for Handle { @@ -99,6 +115,7 @@ impl Handle { match message { Update::Track(track) => state.current = track, Update::Bookmarked(bookmarked) => state.bookmarked = bookmarked, + Update::Volume => state.timer = Some(Instant::now()), Update::Quit => break, } }; @@ -117,7 +134,9 @@ impl Handle { ) -> Result { let environment = Environment::ready(args.alternate)?; Ok(Self { - _environment: environment, + #[cfg(feature = "mpris")] + mpris: mpris::Server::new(state.clone(), tx.clone(), updater.resubscribe()).await?, + environment, tasks: Tasks { render: tokio::spawn(Self::ui(updater, state, interface::Params::from(args))), input: tokio::spawn(input::listen(tx)), diff --git a/src/ui/environment.rs b/src/ui/environment.rs index f3d376f..13b5851 100644 --- a/src/ui/environment.rs +++ b/src/ui/environment.rs @@ -1,4 +1,4 @@ -use std::io::stdout; +use std::{io::stdout, panic}; use crossterm::{ cursor::{Hide, MoveTo, Show}, @@ -8,7 +8,7 @@ use crossterm::{ /// Represents the terminal environment, and is used to properly /// initialize and clean up the terminal. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct Environment { /// Whether keyboard enhancements are enabled. enhancement: bool, @@ -38,15 +38,22 @@ impl Environment { )?; } - Ok(Self { + let environment = Self { enhancement, alternate, - }) + }; + + panic::set_hook(Box::new(move |info| { + let _ = environment.cleanup(false); + eprintln!("panic: {}", info); + })); + + Ok(environment) } /// Uses the information collected from initialization to safely close down /// the terminal & restore it to it's previous state. - pub fn cleanup(&self) -> super::Result<()> { + pub fn cleanup(&self, elegant: bool) -> super::Result<()> { let mut lock = stdout().lock(); if self.alternate { @@ -60,16 +67,10 @@ impl Environment { } terminal::disable_raw_mode()?; - eprintln!("bye! :)"); + if elegant { + eprintln!("bye! :)"); + } Ok(()) } } - -impl Drop for Environment { - /// Just a wrapper for [`Environment::cleanup`] which ignores any errors thrown. - fn drop(&mut self) { - // Well, we're dropping it, so it doesn't really matter if there's an error. - let _ = self.cleanup(); - } -} diff --git a/src/ui/interface.rs b/src/ui/interface.rs index 60ff22d..8e26bc2 100644 --- a/src/ui/interface.rs +++ b/src/ui/interface.rs @@ -31,17 +31,17 @@ impl From<&Args> for Params { pub async fn draw(state: &mut ui::State, window: &mut Window, params: Params) -> super::Result<()> { let action = components::action(&state, state.width); - let volume = state.sink.volume(); - let percentage = format!("{}%", (volume * 100.0).round().abs()); - let middle = match state.timer { Some(timer) => { - if timer.elapsed() > Duration::from_secs(3) { + let volume = state.sink.volume(); + let percentage = format!("{}%", (volume * 100.0).round().abs()); + if timer.elapsed() > Duration::from_secs(1) { state.timer = None; }; - components::progress_bar(&state, state.width - 16) + + components::audio_bar(state.width - 17, volume, &percentage) } - None => components::audio_bar(state.width - 17, volume, &percentage), + None => components::progress_bar(&state, state.width - 16), }; let controls = components::controls(state.width); diff --git a/src/mpris.rs b/src/ui/mpris.rs similarity index 57% rename from src/mpris.rs rename to src/ui/mpris.rs index d260d72..d26e108 100644 --- a/src/mpris.rs +++ b/src/ui/mpris.rs @@ -1,27 +1,62 @@ //! Contains the code for the MPRIS server & other helper functions. -use std::{env, process, sync::Arc}; +use std::{ + env, + hash::{DefaultHasher, Hash, Hasher}, + process, + sync::Arc, +}; +use arc_swap::ArcSwap; use mpris_server::{ zbus::{self, fdo, Result}, LoopStatus, Metadata, PlaybackRate, PlaybackStatus, PlayerInterface, Property, RootInterface, Time, TrackId, Volume, }; -use tokio::sync::mpsc::Sender; +use rodio::Sink; +use tokio::sync::{broadcast, mpsc}; -use super::ui; -use super::Message; +use crate::{player::Current, ui::Update}; +use crate::{ui, Message}; const ERROR: fdo::Error = fdo::Error::Failed(String::new()); +struct Sender { + inner: mpsc::Sender, +} + +impl Sender { + pub fn new(inner: mpsc::Sender) -> Self { + Self { inner } + } + + pub async fn send(&self, message: Message) -> fdo::Result<()> { + self.inner + .send(message) + .await + .map_err(|x| fdo::Error::Failed(x.to_string())) + } + + pub async fn zbus(&self, message: Message) -> zbus::Result<()> { + self.inner + .send(message) + .await + .map_err(|x| zbus::Error::Failure(x.to_string())) + } +} + +impl Into for crate::Error { + fn into(self) -> fdo::Error { + fdo::Error::Failed(self.to_string()) + } +} + /// The actual MPRIS player. pub struct Player { - /// A reference to the [`super::Player`] itself. - pub player: Arc, - - /// The audio server sender, which is used to communicate with - /// the audio sender for skips and a few other inputs. - pub sender: Sender, + sink: Arc, + current: ArcSwap, + list: String, + sender: Sender, } impl RootInterface for Player { @@ -30,10 +65,7 @@ impl RootInterface for Player { } async fn quit(&self) -> fdo::Result<()> { - self.sender - .send(Message::Quit) - .await - .map_err(|_error| ERROR) + self.sender.send(Message::Quit).await } async fn can_quit(&self) -> fdo::Result { @@ -79,10 +111,7 @@ impl RootInterface for Player { impl PlayerInterface for Player { async fn next(&self) -> fdo::Result<()> { - self.sender - .send(Message::Next) - .await - .map_err(|_error| ERROR) + self.sender.send(Message::Next).await } async fn previous(&self) -> fdo::Result<()> { @@ -90,17 +119,11 @@ impl PlayerInterface for Player { } async fn pause(&self) -> fdo::Result<()> { - self.sender - .send(Message::Pause) - .await - .map_err(|_error| ERROR) + self.sender.send(Message::Pause).await } async fn play_pause(&self) -> fdo::Result<()> { - self.sender - .send(Message::PlayPause) - .await - .map_err(|_error| ERROR) + self.sender.send(Message::PlayPause).await } async fn stop(&self) -> fdo::Result<()> { @@ -108,10 +131,7 @@ impl PlayerInterface for Player { } async fn play(&self) -> fdo::Result<()> { - self.sender - .send(Message::Play) - .await - .map_err(|_error| ERROR) + self.sender.send(Message::Play).await } async fn seek(&self, _offset: Time) -> fdo::Result<()> { @@ -127,9 +147,9 @@ impl PlayerInterface for Player { } async fn playback_status(&self) -> fdo::Result { - Ok(if !self.player.current_exists() { + Ok(if self.current.load().loading() { PlaybackStatus::Stopped - } else if self.player.sink.is_paused() { + } else if self.sink.is_paused() { PlaybackStatus::Paused } else { PlaybackStatus::Playing @@ -145,11 +165,11 @@ impl PlayerInterface for Player { } async fn rate(&self) -> fdo::Result { - Ok(self.player.sink.speed().into()) + Ok(self.sink.speed().into()) } async fn set_rate(&self, rate: PlaybackRate) -> Result<()> { - self.player.sink.set_speed(rate as f32); + self.sink.set_speed(rate as f32); Ok(()) } @@ -162,15 +182,23 @@ impl PlayerInterface for Player { } async fn metadata(&self) -> fdo::Result { - let metadata = self - .player - .current - .load() - .as_ref() - .map_or_else(Metadata::new, |track| { + Ok(match self.current.load().as_ref() { + Current::Loading(_) => Metadata::new(), + Current::Track(track) => { + let mut hasher = DefaultHasher::new(); + track.path.hash(&mut hasher); + + let id = mpris_server::zbus::zvariant::ObjectPath::try_from(format!( + "/com/talwat/lowfi/{}/{}", + self.list, + hasher.finish() + )) + .unwrap(); + let mut metadata = Metadata::builder() - .title(track.display_name.clone()) - .album(self.player.list.name.clone()) + .trackid(id) + .title(track.display.clone()) + .album(self.list.clone()) .build(); metadata.set_length( @@ -180,26 +208,20 @@ impl PlayerInterface for Player { ); metadata - }); - - Ok(metadata) + } + }) } async fn volume(&self) -> fdo::Result { - Ok(self.player.sink.volume().into()) + Ok(self.sink.volume().into()) } async fn set_volume(&self, volume: Volume) -> Result<()> { - self.player.set_volume(volume as f32); - ui::flash_audio(); - - Ok(()) + self.sender.zbus(Message::SetVolume(volume as f32)).await } async fn position(&self) -> fdo::Result