diff --git a/src/bookmark.rs b/src/bookmark.rs index 8974d73..83a33f4 100644 --- a/src/bookmark.rs +++ b/src/bookmark.rs @@ -21,10 +21,6 @@ pub enum Error { pub struct Bookmarks { /// The different entries in the bookmarks file. entries: Vec, - - /// The internal bookmarked register, which keeps track - /// of whether a track is bookmarked or not. - bookmarked: bool, } impl Bookmarks { @@ -55,10 +51,7 @@ impl Bookmarks { }) .collect(); - Ok(Self { - entries, - bookmarked: false, - }) + Ok(Self { entries }) } // Saves the bookmarks to the `bookmarks.txt` file. @@ -71,7 +64,7 @@ impl Bookmarks { /// Bookmarks a given track with a full path and optional custom name. /// /// Returns whether the track is now bookmarked, or not. - pub async fn bookmark(&mut self, track: &tracks::Info) -> Result<()> { + pub async fn bookmark(&mut self, track: &tracks::Info) -> Result { let entry = track.to_entry(); let idx = self.entries.iter().position(|x| **x == entry); @@ -81,19 +74,12 @@ impl Bookmarks { self.entries.push(entry); }; - self.bookmarked = idx.is_none(); - Ok(()) - } - - /// Returns whether a track is bookmarked or not by using the internal - /// bookmarked register. - pub fn bookmarked(&self) -> bool { - self.bookmarked + Ok(idx.is_none()) } /// Sets the internal bookmarked register by checking against /// the current track's info. - pub async fn set_bookmarked(&mut self, track: &tracks::Info) { - self.bookmarked = self.entries.contains(&track.to_entry()); + pub fn bookmarked(&mut self, track: &tracks::Info) -> bool { + self.entries.contains(&track.to_entry()) } } diff --git a/src/download.rs b/src/download.rs index 2c02c5f..c1cad15 100644 --- a/src/download.rs +++ b/src/download.rs @@ -1,5 +1,5 @@ use std::{ - sync::{atomic::AtomicU8, Arc}, + sync::atomic::{self, AtomicU8}, time::Duration, }; @@ -11,61 +11,90 @@ use tokio::{ use crate::tracks; +static PROGRESS: AtomicU8 = AtomicU8::new(0); +pub type Progress = &'static AtomicU8; + +pub fn progress() -> Progress { + &PROGRESS +} + pub struct Downloader { - /// TODO: Actually have a track type here. - pub progress: Arc, - queue: Receiver, - handle: JoinHandle>, + queue: Sender, + tx: Sender, + tracks: tracks::List, + client: Client, + timeout: Duration, } impl Downloader { - pub async fn track(&mut self) -> Option { - return self.queue.recv().await; + pub async fn init(size: usize, tracks: tracks::List, tx: Sender) -> Handle { + let client = Client::new(); + + let (qtx, qrx) = mpsc::channel(size); + let downloader = Self { + queue: qtx, + tx, + tracks, + client, + timeout: Duration::from_secs(1), + }; + + Handle { + queue: qrx, + handle: tokio::spawn(downloader.run()), + } } - async fn downloader( - tx: Sender, - tracks: tracks::List, - client: Client, - progress: Arc, - timeout: Duration, - ) -> crate::Result<()> { + async fn run(self) -> crate::Result<()> { loop { - let result = tracks.random(&client, progress.as_ref()).await; + let progress = if PROGRESS.load(atomic::Ordering::Relaxed) == 0 { + Some(&PROGRESS) + } else { + None + }; + + let result = self.tracks.random(&self.client, progress).await; match result { - Ok(track) => tx.send(track).await?, + Ok(track) => { + self.queue.send(track).await?; + + if progress.is_some() { + self.tx.send(crate::Message::Loaded).await?; + } + } Err(error) => { + PROGRESS.store(0, atomic::Ordering::Relaxed); if !error.timeout() { - tokio::time::sleep(timeout).await; + tokio::time::sleep(self.timeout).await; } } } } } +} +pub struct Handle { + queue: Receiver, + handle: JoinHandle>, +} - pub async fn init( - size: usize, - tracks: tracks::List, - client: Client, - progress: Arc, - ) -> Self { - let (tx, rx) = mpsc::channel(size); +pub enum Output { + Loading(Progress), + Queued(tracks::Queued), +} - Self { - queue: rx, - progress: progress.clone(), - handle: tokio::spawn(Self::downloader( - tx, - tracks, - client, - progress, - Duration::from_secs(1), - )), +impl Handle { + pub async fn track(&mut self) -> Output { + match self.queue.try_recv() { + Ok(queued) => Output::Queued(queued), + Err(_) => { + PROGRESS.store(0, atomic::Ordering::Relaxed); + Output::Loading(&PROGRESS) + } } } } -impl Drop for Downloader { +impl Drop for Handle { fn drop(&mut self) { self.handle.abort(); } diff --git a/src/error.rs b/src/error.rs index 2faba65..1edafe0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use crate::{bookmark, tracks, ui, volume}; @@ -26,19 +26,25 @@ pub enum Error { #[error("couldn't add track to the queue: {0}")] Queue(#[from] mpsc::error::SendError), + #[error("couldn't update UI state: {0}")] + Broadcast(#[from] broadcast::error::SendError), + #[error("io error: {0}")] Io(#[from] std::io::Error), #[error("directory not found")] Directory, + #[error("couldn't fetch track from downloader")] + Download, + #[error("couldn't parse integer: {0}")] Parse(#[from] std::num::ParseIntError), - #[error("track error: {0}")] + #[error("track failure")] Track(#[from] tracks::Error), - #[error("ui error: {0}")] + #[error("ui failure")] UI(#[from] ui::Error), #[cfg(feature = "mpris")] diff --git a/src/main.rs b/src/main.rs index 8129e25..73b8560 100644 --- a/src/main.rs +++ b/src/main.rs @@ -108,7 +108,7 @@ async fn main() -> eyre::Result<()> { } } else { let player = Player::init(args).await?; - player.play().await?; + player.run().await?; }; Ok(()) diff --git a/src/message.rs b/src/message.rs index 11beed7..8ac7d8f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,14 +1,12 @@ -use crate::ui; - /// Handles communication between different parts of the program. #[derive(PartialEq, Debug, Clone)] pub enum Message { - /// Sent to update the UI with new information. - Render(ui::Update), - /// Notifies the audio server that it should update the track. Next, + /// When a track is loaded after a caller previously being told to wait. + Loaded, + /// Similar to Next, but specific to the first track. Init, diff --git a/src/player.rs b/src/player.rs index 0131f9b..400cef4 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,21 +1,36 @@ -use std::sync::{atomic::AtomicU8, Arc}; +use std::sync::Arc; -use reqwest::Client; -use tokio::sync::mpsc::{self, Receiver}; +use tokio::sync::{ + broadcast, + mpsc::{self, Receiver, Sender}, +}; use crate::{ - bookmark::Bookmarks, download::Downloader, tracks::List, ui::UI, volume::PersistentVolume, + bookmark::Bookmarks, + download::{self, Downloader}, + tracks::{self, List}, + ui, + volume::PersistentVolume, Message, }; +#[derive(Clone, Debug)] +pub enum Current { + Loading(download::Progress), + Track(tracks::Info), +} + pub struct Player { - ui: UI, + ui: ui::Handle, + downloader: download::Handle, volume: PersistentVolume, bookmarks: Bookmarks, - downloader: Downloader, sink: Arc, - stream: rodio::OutputStream, rx: Receiver, + broadcast: broadcast::Sender, + current: Current, + _tx: Sender, + _stream: rodio::OutputStream, } impl Drop for Player { @@ -25,6 +40,25 @@ impl Drop for Player { } impl Player { + pub async fn set_current(&mut self, current: Current) -> crate::Result<()> { + self.current = current.clone(); + self.update(ui::Update::Track(current)).await?; + + let Current::Track(track) = &self.current else { + return Ok(()); + }; + + let bookmarked = self.bookmarks.bookmarked(&track); + self.update(ui::Update::Bookmarked(bookmarked)).await?; + + Ok(()) + } + + pub async fn update(&mut self, update: ui::Update) -> crate::Result<()> { + self.broadcast.send(update)?; + Ok(()) + } + pub async fn init(args: crate::Args) -> crate::Result { #[cfg(target_os = "linux")] let mut stream = audio::silent_get_output_stream()?; @@ -33,42 +67,105 @@ impl Player { stream.log_on_drop(false); let sink = Arc::new(rodio::Sink::connect_new(stream.mixer())); - let progress = Arc::new(AtomicU8::new(0)); let (tx, rx) = mpsc::channel(8); - let ui = UI::init(tx, progress.clone(), sink.clone(), &args).await?; + tx.send(Message::Init).await?; + let (utx, urx) = broadcast::channel(8); + let current = Current::Loading(download::progress()); + + let state = ui::State::initial(sink.clone(), &args, current.clone()); + let ui = ui::Handle::init(tx.clone(), urx, state.clone(), &args).await?; let volume = PersistentVolume::load().await?; let bookmarks = Bookmarks::load().await?; - let client = Client::new(); let list = List::load(args.track_list.as_ref()).await?; - let downloader = Downloader::init(args.buffer_size, list, client, progress).await; + let downloader = Downloader::init(args.buffer_size, list, tx.clone()).await; Ok(Self { + current, downloader, ui, + broadcast: utx, rx, sink, - stream, bookmarks, volume, + _stream: stream, + _tx: tx, }) } - pub async fn play(mut self) -> crate::Result<()> { - // self.ui - // .render(ui::Update { - // track: None, - // bookmarked: false, - // }) - // .await?; - - while let Some(message) = self.rx.recv().await { - if message == Message::Quit { - break; - }; - } + pub async fn close(&self) -> crate::Result<()> { + self.bookmarks.save().await?; + self.volume.save().await?; Ok(()) } + + pub async fn play(&mut self, queued: tracks::Queued) -> crate::Result<()> { + let decoded = queued.decode()?; + self.sink.append(decoded.data); + self.set_current(Current::Track(decoded.info)).await?; + + Ok(()) + } + + pub async fn run(mut self) -> crate::Result<()> { + while let Some(message) = self.rx.recv().await { + match message { + Message::Next | Message::Init | Message::Loaded => { + self.sink.stop(); + match self.downloader.track().await { + download::Output::Loading(progress) => { + self.set_current(Current::Loading(progress)).await? + } + download::Output::Queued(queued) => self.play(queued).await?, + }; + } + Message::Play => { + self.sink.play(); + + // #[cfg(feature = "mpris")] + // mpris.playback(PlaybackStatus::Playing).await?; + } + Message::Pause => { + self.sink.pause(); + + // #[cfg(feature = "mpris")] + // mpris.playback(PlaybackStatus::Paused).await?; + } + Message::PlayPause => { + if self.sink.is_paused() { + self.sink.play(); + } else { + self.sink.pause(); + } + + // #[cfg(feature = "mpris")] + // mpris + // .playback(mpris.player().playback_status().await?) + // .await?; + } + Message::ChangeVolume(change) => { + self.sink.set_volume(self.sink.volume() + change); + + // #[cfg(feature = "mpris")] + // mpris + // .changed(vec![Property::Volume(player.sink.volume().into())]) + // .await?; + } + Message::Bookmark => { + let Current::Track(current) = &self.current else { + continue; + }; + + self.bookmarks.bookmark(current).await?; + } + Message::Quit => break, + } + } + + // self.close().await?; + Ok(()) + } } diff --git a/src/tracks.rs b/src/tracks.rs index 69cdc4c..9b115e9 100644 --- a/src/tracks.rs +++ b/src/tracks.rs @@ -15,7 +15,7 @@ //! 2. [`Info`] created from decoded data. //! 3. [`Decoded`] made from [`Info`] and the original decoded data. -use std::{io::Cursor, time::Duration}; +use std::{fmt::Debug, io::Cursor, time::Duration}; use bytes::Bytes; use rodio::{Decoder, Source as _}; @@ -35,6 +35,7 @@ pub type DecodedData = Decoder>; /// Tracks which are still waiting in the queue, and can't be played yet. /// /// This means that only the data & track name are included. +#[derive(PartialEq)] pub struct Queued { /// Display name of the track. pub display: String, @@ -47,6 +48,16 @@ pub struct Queued { pub data: Bytes, } +impl Debug for Queued { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Queued") + .field("display", &self.display) + .field("path", &self.path) + .field("data", &self.data.len()) + .finish() + } +} + impl Queued { /// This will actually decode and format the track, /// returning a [`DecodedTrack`] which can be played diff --git a/src/tracks/error.rs b/src/tracks/error.rs index 58d0b8b..0eb885e 100644 --- a/src/tracks/error.rs +++ b/src/tracks/error.rs @@ -25,8 +25,6 @@ pub enum Kind { #[error("{kind} (track: {track:?})")] pub struct Error { pub track: Option, - - #[source] pub kind: Kind, } diff --git a/src/tracks/list.rs b/src/tracks/list.rs index 6695f93..196ebc0 100644 --- a/src/tracks/list.rs +++ b/src/tracks/list.rs @@ -70,16 +70,16 @@ impl List { &self, track: &str, client: &Client, - progress: &AtomicU8, + progress: Option<&AtomicU8>, ) -> tracks::Result<(Bytes, String)> { // If the track has a protocol, then we should ignore the base for it. - let full_path = if track.contains("://") { + let path = if track.contains("://") { track.to_owned() } else { format!("{}{}", self.base(), track) }; - let data: Bytes = if let Some(x) = full_path.strip_prefix("file://") { + let data: Bytes = if let Some(x) = path.strip_prefix("file://") { let path = if x.starts_with('~') { let home_path = dirs::home_dir() .ok_or(error::Kind::InvalidPath) @@ -97,7 +97,11 @@ impl List { let result = tokio::fs::read(path.clone()).await.track(x)?; result.into() } else { - let response = client.get(full_path.clone()).send().await.track(track)?; + let response = client.get(path.clone()).send().await.track(track)?; + let Some(progress) = progress else { + let bytes = response.bytes().await.track(track)?; + return Ok((bytes, path)); + }; let total = response .content_length() @@ -119,14 +123,18 @@ impl List { bytes.into() }; - Ok((data, full_path)) + Ok((data, path)) } /// Fetches and downloads a random track from the [List]. /// /// The Result's error is a bool, which is true if a timeout error occured, /// and false otherwise. This tells lowfi if it shouldn't wait to try again. - pub async fn random(&self, client: &Client, progress: &AtomicU8) -> tracks::Result { + pub async fn random( + &self, + client: &Client, + progress: Option<&AtomicU8>, + ) -> tracks::Result { let (path, display) = self.random_path(); let (data, path) = self.download(&path, client, progress).await?; diff --git a/src/ui.rs b/src/ui.rs index 028701b..9daccc4 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,16 +1,12 @@ -use std::{ - sync::{atomic::AtomicU8, Arc}, - time::Duration, -}; +use std::sync::Arc; use crate::{ - tracks, - ui::{environment::Environment, window::Window}, - Args, Message, + player::Current, + ui::{self, environment::Environment, window::Window}, + Args, }; -use rodio::Sink; use tokio::{ - sync::mpsc::{self, Receiver, Sender}, + sync::{broadcast, mpsc::Sender}, task::JoinHandle, }; mod components; @@ -32,74 +28,64 @@ pub enum Error { Write(#[from] std::io::Error), #[error("sending message to backend from ui failed")] - Communication(#[from] tokio::sync::mpsc::error::SendError), + CrateSend(#[from] tokio::sync::mpsc::error::SendError), + + #[error("sharing state between backend and frontend failed")] + UiSend(#[from] tokio::sync::broadcast::error::SendError), } +#[derive(Clone)] pub struct State { pub sink: Arc, - pub progress: Arc, - pub track: Option, + pub current: Current, pub bookmarked: bool, width: usize, } impl State { - pub fn update(&mut self, update: Update) { - self.track = update.track; - self.bookmarked = update.bookmarked; - } - - pub fn initial(sink: Arc, width: usize, progress: Arc) -> Self { + pub fn initial(sink: Arc, args: &Args, current: Current) -> Self { + let width = 21 + args.width.min(32) * 2; Self { width, sink, - progress, - track: None, + current, bookmarked: false, } } } -#[derive(Debug, Clone, PartialEq, Default)] -pub struct Update { - pub track: Option, - pub bookmarked: bool, + +#[derive(Debug, Clone)] +pub enum Update { + Track(Current), + Bookmarked(bool), + Quit, } #[derive(Debug)] -struct Handles { +struct Tasks { render: JoinHandle>, input: JoinHandle>, } -#[derive(Copy, Clone, Debug)] -struct Params { - borderless: bool, - minimalist: bool, - delta: Duration, -} - #[derive(Debug)] -pub struct UI { - pub utx: Sender, - handles: Handles, +pub struct Handle { + tasks: Tasks, _environment: Environment, } -impl Drop for UI { +impl Drop for Handle { fn drop(&mut self) { - self.handles.input.abort(); - self.handles.render.abort(); + self.tasks.input.abort(); + self.tasks.render.abort(); } } -impl UI { - pub async fn render(&mut self, data: Update) -> Result<()> { - self.utx.send(Message::Render(data)).await?; - - Ok(()) - } - - async fn ui(mut rx: Receiver, mut state: State, params: Params) -> Result<()> { +impl Handle { + async fn ui( + mut rx: broadcast::Receiver, + mut state: State, + params: interface::Params, + ) -> Result<()> { let mut interval = tokio::time::interval(params.delta); let mut window = Window::new(state.width, params.borderless); @@ -108,45 +94,29 @@ impl UI { if let Ok(message) = rx.try_recv() { match message { - Message::Render(update) => state.update(update), - Message::Quit => break, - _ => continue, + Update::Track(track) => state.current = track, + Update::Bookmarked(bookmarked) => state.bookmarked = bookmarked, + Update::Quit => break, } }; interval.tick().await; } - // environment.cleanup()?; Ok(()) } pub async fn init( - tx: Sender, - progress: Arc, - sink: Arc, + tx: Sender, + updater: broadcast::Receiver, + state: State, args: &Args, ) -> Result { let environment = Environment::ready(args.alternate)?; - - let (utx, urx) = mpsc::channel(8); - let delta = 1.0 / f32::from(args.fps); - let delta = Duration::from_secs_f32(delta); - let width = 21 + args.width.min(32) * 2; - Ok(Self { - utx, _environment: environment, - handles: Handles { - render: tokio::spawn(Self::ui( - urx, - State::initial(sink, width, progress), - Params { - delta, - minimalist: args.minimalist, - borderless: args.borderless, - }, - )), + tasks: Tasks { + render: tokio::spawn(Self::ui(updater, state, interface::Params::from(args))), input: tokio::spawn(input::listen(tx)), }, }) diff --git a/src/ui/components.rs b/src/ui/components.rs index fa3ae94..20a5d31 100644 --- a/src/ui/components.rs +++ b/src/ui/components.rs @@ -6,7 +6,7 @@ use std::time::Duration; use crossterm::style::Stylize as _; use unicode_segmentation::UnicodeSegmentation as _; -use crate::{tracks, ui}; +use crate::{player::Current, tracks, ui}; /// Small helper function to format durations. pub fn format_duration(duration: &Duration) -> String { @@ -19,14 +19,14 @@ pub fn format_duration(duration: &Duration) -> String { /// Creates the progress bar, as well as all the padding needed. pub fn progress_bar(state: &ui::State, width: usize) -> String { let mut duration = Duration::new(0, 0); - let elapsed = if state.track.is_some() { + let elapsed = if matches!(&state.current, Current::Track(_)) { state.sink.get_pos() } else { Duration::new(0, 0) }; let mut filled = 0; - if let Some(current) = &state.track { + if let Current::Track(current) = &state.current { if let Some(x) = current.duration { duration = x; @@ -106,33 +106,22 @@ impl ActionBar { /// Creates the top/action bar, which has the name of the track and it's status. /// This also creates all the needed padding. pub fn action(state: &ui::State, width: usize) -> String { - let (main, len) = state - .track - .as_ref() - .map_or_else( - || { - ActionBar::Loading( - state - .progress - .load(std::sync::atomic::Ordering::Acquire) - .into(), - ) - }, - |info| { - if state.sink.volume() < 0.01 { - return ActionBar::Muted; - } - - let info = info.clone(); - if state.sink.is_paused() { - ActionBar::Paused(info) - } else { - ActionBar::Playing(info) - } - }, - ) - .format(state.bookmarked); + let action = match state.current.clone() { + Current::Loading(progress) => { + ActionBar::Loading(progress.load(std::sync::atomic::Ordering::Relaxed)) + } + Current::Track(info) => { + if state.sink.volume() < 0.01 { + ActionBar::Muted + } else if state.sink.is_paused() { + ActionBar::Paused(info) + } else { + ActionBar::Playing(info) + } + } + }; + let (main, len) = action.format(state.bookmarked); if len > width { let chopped: String = main.graphemes(true).take(width + 1).collect(); diff --git a/src/ui/interface.rs b/src/ui/interface.rs index f91a736..d13965d 100644 --- a/src/ui/interface.rs +++ b/src/ui/interface.rs @@ -1,9 +1,34 @@ -use crate::ui::{self, components, window::Window}; +use std::time::Duration; + +use crate::{ + ui::{self, components, window::Window}, + Args, +}; + +#[derive(Copy, Clone, Debug)] +pub struct Params { + pub borderless: bool, + pub minimalist: bool, + pub delta: Duration, +} + +impl From<&Args> for Params { + fn from(args: &Args) -> Self { + let delta = 1.0 / f32::from(args.fps); + let delta = Duration::from_secs_f32(delta); + + Self { + delta, + minimalist: args.minimalist, + borderless: args.borderless, + } + } +} /// The code for the terminal interface itself. /// /// * `minimalist` - All this does is hide the bottom control bar. -pub async fn draw(state: &ui::State, window: &mut Window, params: ui::Params) -> super::Result<()> { +pub async fn draw(state: &ui::State, window: &mut Window, params: Params) -> super::Result<()> { let action = components::action(&state, state.width); let volume = state.sink.volume(); @@ -25,7 +50,7 @@ pub async fn draw(state: &ui::State, window: &mut Window, params: ui::Params) -> let controls = components::controls(state.width); - let menu = match (params.minimalist, &state.track) { + let menu = match (params.minimalist, &state.current) { (true, _) => vec![action, middle], // (false, Some(x)) => vec![x.path.clone(), action, middle, controls], _ => vec![action, middle, controls], diff --git a/src/volume.rs b/src/volume.rs index 94f857e..6d9508d 100644 --- a/src/volume.rs +++ b/src/volume.rs @@ -62,7 +62,7 @@ impl PersistentVolume { } /// Saves `volume` to `volume.txt`. - pub async fn save(volume: f32) -> Result<()> { + pub async fn save(&self) -> Result<()> { let config = Self::config().await?; let path = config.join(PathBuf::from("volume.txt")); @@ -72,9 +72,7 @@ impl PersistentVolume { clippy::cast_sign_loss, clippy::cast_possible_truncation )] - let percentage = (volume * 100.0).abs().round() as u16; - - fs::write(path, percentage.to_string()).await?; + fs::write(path, self.inner.to_string()).await?; Ok(()) }