feat: fix progress

This commit is contained in:
talwat 2025-11-16 20:20:21 +01:00
parent bf1b5f4f4e
commit bd525b0813
13 changed files with 318 additions and 203 deletions

View File

@ -21,10 +21,6 @@ pub enum Error {
pub struct Bookmarks {
/// The different entries in the bookmarks file.
entries: Vec<String>,
/// The internal bookmarked register, which keeps track
/// of whether a track is bookmarked or not.
bookmarked: bool,
}
impl Bookmarks {
@ -55,10 +51,7 @@ impl Bookmarks {
})
.collect();
Ok(Self {
entries,
bookmarked: false,
})
Ok(Self { entries })
}
// Saves the bookmarks to the `bookmarks.txt` file.
@ -71,7 +64,7 @@ impl Bookmarks {
/// Bookmarks a given track with a full path and optional custom name.
///
/// Returns whether the track is now bookmarked, or not.
pub async fn bookmark(&mut self, track: &tracks::Info) -> Result<()> {
pub async fn bookmark(&mut self, track: &tracks::Info) -> Result<bool> {
let entry = track.to_entry();
let idx = self.entries.iter().position(|x| **x == entry);
@ -81,19 +74,12 @@ impl Bookmarks {
self.entries.push(entry);
};
self.bookmarked = idx.is_none();
Ok(())
}
/// Returns whether a track is bookmarked or not by using the internal
/// bookmarked register.
pub fn bookmarked(&self) -> bool {
self.bookmarked
Ok(idx.is_none())
}
/// Sets the internal bookmarked register by checking against
/// the current track's info.
pub async fn set_bookmarked(&mut self, track: &tracks::Info) {
self.bookmarked = self.entries.contains(&track.to_entry());
pub fn bookmarked(&mut self, track: &tracks::Info) -> bool {
self.entries.contains(&track.to_entry())
}
}

View File

@ -1,5 +1,5 @@
use std::{
sync::{atomic::AtomicU8, Arc},
sync::atomic::{self, AtomicU8},
time::Duration,
};
@ -11,61 +11,90 @@ use tokio::{
use crate::tracks;
static PROGRESS: AtomicU8 = AtomicU8::new(0);
pub type Progress = &'static AtomicU8;
pub fn progress() -> Progress {
&PROGRESS
}
pub struct Downloader {
/// TODO: Actually have a track type here.
pub progress: Arc<AtomicU8>,
queue: Sender<tracks::Queued>,
tx: Sender<crate::Message>,
tracks: tracks::List,
client: Client,
timeout: Duration,
}
impl Downloader {
pub async fn init(size: usize, tracks: tracks::List, tx: Sender<crate::Message>) -> Handle {
let client = Client::new();
let (qtx, qrx) = mpsc::channel(size);
let downloader = Self {
queue: qtx,
tx,
tracks,
client,
timeout: Duration::from_secs(1),
};
Handle {
queue: qrx,
handle: tokio::spawn(downloader.run()),
}
}
async fn run(self) -> crate::Result<()> {
loop {
let progress = if PROGRESS.load(atomic::Ordering::Relaxed) == 0 {
Some(&PROGRESS)
} else {
None
};
let result = self.tracks.random(&self.client, progress).await;
match result {
Ok(track) => {
self.queue.send(track).await?;
if progress.is_some() {
self.tx.send(crate::Message::Loaded).await?;
}
}
Err(error) => {
PROGRESS.store(0, atomic::Ordering::Relaxed);
if !error.timeout() {
tokio::time::sleep(self.timeout).await;
}
}
}
}
}
}
pub struct Handle {
queue: Receiver<tracks::Queued>,
handle: JoinHandle<crate::Result<()>>,
}
impl Downloader {
pub async fn track(&mut self) -> Option<tracks::Queued> {
return self.queue.recv().await;
}
pub enum Output {
Loading(Progress),
Queued(tracks::Queued),
}
async fn downloader(
tx: Sender<tracks::Queued>,
tracks: tracks::List,
client: Client,
progress: Arc<AtomicU8>,
timeout: Duration,
) -> crate::Result<()> {
loop {
let result = tracks.random(&client, progress.as_ref()).await;
match result {
Ok(track) => tx.send(track).await?,
Err(error) => {
if !error.timeout() {
tokio::time::sleep(timeout).await;
impl Handle {
pub async fn track(&mut self) -> Output {
match self.queue.try_recv() {
Ok(queued) => Output::Queued(queued),
Err(_) => {
PROGRESS.store(0, atomic::Ordering::Relaxed);
Output::Loading(&PROGRESS)
}
}
}
}
}
pub async fn init(
size: usize,
tracks: tracks::List,
client: Client,
progress: Arc<AtomicU8>,
) -> Self {
let (tx, rx) = mpsc::channel(size);
Self {
queue: rx,
progress: progress.clone(),
handle: tokio::spawn(Self::downloader(
tx,
tracks,
client,
progress,
Duration::from_secs(1),
)),
}
}
}
impl Drop for Downloader {
impl Drop for Handle {
fn drop(&mut self) {
self.handle.abort();
}

View File

@ -1,4 +1,4 @@
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};
use crate::{bookmark, tracks, ui, volume};
@ -26,19 +26,25 @@ pub enum Error {
#[error("couldn't add track to the queue: {0}")]
Queue(#[from] mpsc::error::SendError<tracks::Queued>),
#[error("couldn't update UI state: {0}")]
Broadcast(#[from] broadcast::error::SendError<ui::Update>),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("directory not found")]
Directory,
#[error("couldn't fetch track from downloader")]
Download,
#[error("couldn't parse integer: {0}")]
Parse(#[from] std::num::ParseIntError),
#[error("track error: {0}")]
#[error("track failure")]
Track(#[from] tracks::Error),
#[error("ui error: {0}")]
#[error("ui failure")]
UI(#[from] ui::Error),
#[cfg(feature = "mpris")]

View File

@ -108,7 +108,7 @@ async fn main() -> eyre::Result<()> {
}
} else {
let player = Player::init(args).await?;
player.play().await?;
player.run().await?;
};
Ok(())

View File

@ -1,14 +1,12 @@
use crate::ui;
/// Handles communication between different parts of the program.
#[derive(PartialEq, Debug, Clone)]
pub enum Message {
/// Sent to update the UI with new information.
Render(ui::Update),
/// Notifies the audio server that it should update the track.
Next,
/// When a track is loaded after a caller previously being told to wait.
Loaded,
/// Similar to Next, but specific to the first track.
Init,

View File

@ -1,21 +1,36 @@
use std::sync::{atomic::AtomicU8, Arc};
use std::sync::Arc;
use reqwest::Client;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::{
broadcast,
mpsc::{self, Receiver, Sender},
};
use crate::{
bookmark::Bookmarks, download::Downloader, tracks::List, ui::UI, volume::PersistentVolume,
bookmark::Bookmarks,
download::{self, Downloader},
tracks::{self, List},
ui,
volume::PersistentVolume,
Message,
};
#[derive(Clone, Debug)]
pub enum Current {
Loading(download::Progress),
Track(tracks::Info),
}
pub struct Player {
ui: UI,
ui: ui::Handle,
downloader: download::Handle,
volume: PersistentVolume,
bookmarks: Bookmarks,
downloader: Downloader,
sink: Arc<rodio::Sink>,
stream: rodio::OutputStream,
rx: Receiver<crate::Message>,
broadcast: broadcast::Sender<ui::Update>,
current: Current,
_tx: Sender<crate::Message>,
_stream: rodio::OutputStream,
}
impl Drop for Player {
@ -25,6 +40,25 @@ impl Drop for Player {
}
impl Player {
pub async fn set_current(&mut self, current: Current) -> crate::Result<()> {
self.current = current.clone();
self.update(ui::Update::Track(current)).await?;
let Current::Track(track) = &self.current else {
return Ok(());
};
let bookmarked = self.bookmarks.bookmarked(&track);
self.update(ui::Update::Bookmarked(bookmarked)).await?;
Ok(())
}
pub async fn update(&mut self, update: ui::Update) -> crate::Result<()> {
self.broadcast.send(update)?;
Ok(())
}
pub async fn init(args: crate::Args) -> crate::Result<Self> {
#[cfg(target_os = "linux")]
let mut stream = audio::silent_get_output_stream()?;
@ -33,42 +67,105 @@ impl Player {
stream.log_on_drop(false);
let sink = Arc::new(rodio::Sink::connect_new(stream.mixer()));
let progress = Arc::new(AtomicU8::new(0));
let (tx, rx) = mpsc::channel(8);
let ui = UI::init(tx, progress.clone(), sink.clone(), &args).await?;
tx.send(Message::Init).await?;
let (utx, urx) = broadcast::channel(8);
let current = Current::Loading(download::progress());
let state = ui::State::initial(sink.clone(), &args, current.clone());
let ui = ui::Handle::init(tx.clone(), urx, state.clone(), &args).await?;
let volume = PersistentVolume::load().await?;
let bookmarks = Bookmarks::load().await?;
let client = Client::new();
let list = List::load(args.track_list.as_ref()).await?;
let downloader = Downloader::init(args.buffer_size, list, client, progress).await;
let downloader = Downloader::init(args.buffer_size, list, tx.clone()).await;
Ok(Self {
current,
downloader,
ui,
broadcast: utx,
rx,
sink,
stream,
bookmarks,
volume,
_stream: stream,
_tx: tx,
})
}
pub async fn play(mut self) -> crate::Result<()> {
// self.ui
// .render(ui::Update {
// track: None,
// bookmarked: false,
// })
// .await?;
while let Some(message) = self.rx.recv().await {
if message == Message::Quit {
break;
};
}
pub async fn close(&self) -> crate::Result<()> {
self.bookmarks.save().await?;
self.volume.save().await?;
Ok(())
}
pub async fn play(&mut self, queued: tracks::Queued) -> crate::Result<()> {
let decoded = queued.decode()?;
self.sink.append(decoded.data);
self.set_current(Current::Track(decoded.info)).await?;
Ok(())
}
pub async fn run(mut self) -> crate::Result<()> {
while let Some(message) = self.rx.recv().await {
match message {
Message::Next | Message::Init | Message::Loaded => {
self.sink.stop();
match self.downloader.track().await {
download::Output::Loading(progress) => {
self.set_current(Current::Loading(progress)).await?
}
download::Output::Queued(queued) => self.play(queued).await?,
};
}
Message::Play => {
self.sink.play();
// #[cfg(feature = "mpris")]
// mpris.playback(PlaybackStatus::Playing).await?;
}
Message::Pause => {
self.sink.pause();
// #[cfg(feature = "mpris")]
// mpris.playback(PlaybackStatus::Paused).await?;
}
Message::PlayPause => {
if self.sink.is_paused() {
self.sink.play();
} else {
self.sink.pause();
}
// #[cfg(feature = "mpris")]
// mpris
// .playback(mpris.player().playback_status().await?)
// .await?;
}
Message::ChangeVolume(change) => {
self.sink.set_volume(self.sink.volume() + change);
// #[cfg(feature = "mpris")]
// mpris
// .changed(vec![Property::Volume(player.sink.volume().into())])
// .await?;
}
Message::Bookmark => {
let Current::Track(current) = &self.current else {
continue;
};
self.bookmarks.bookmark(current).await?;
}
Message::Quit => break,
}
}
// self.close().await?;
Ok(())
}
}

View File

@ -15,7 +15,7 @@
//! 2. [`Info`] created from decoded data.
//! 3. [`Decoded`] made from [`Info`] and the original decoded data.
use std::{io::Cursor, time::Duration};
use std::{fmt::Debug, io::Cursor, time::Duration};
use bytes::Bytes;
use rodio::{Decoder, Source as _};
@ -35,6 +35,7 @@ pub type DecodedData = Decoder<Cursor<Bytes>>;
/// Tracks which are still waiting in the queue, and can't be played yet.
///
/// This means that only the data & track name are included.
#[derive(PartialEq)]
pub struct Queued {
/// Display name of the track.
pub display: String,
@ -47,6 +48,16 @@ pub struct Queued {
pub data: Bytes,
}
impl Debug for Queued {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Queued")
.field("display", &self.display)
.field("path", &self.path)
.field("data", &self.data.len())
.finish()
}
}
impl Queued {
/// This will actually decode and format the track,
/// returning a [`DecodedTrack`] which can be played

View File

@ -25,8 +25,6 @@ pub enum Kind {
#[error("{kind} (track: {track:?})")]
pub struct Error {
pub track: Option<String>,
#[source]
pub kind: Kind,
}

View File

@ -70,16 +70,16 @@ impl List {
&self,
track: &str,
client: &Client,
progress: &AtomicU8,
progress: Option<&AtomicU8>,
) -> tracks::Result<(Bytes, String)> {
// If the track has a protocol, then we should ignore the base for it.
let full_path = if track.contains("://") {
let path = if track.contains("://") {
track.to_owned()
} else {
format!("{}{}", self.base(), track)
};
let data: Bytes = if let Some(x) = full_path.strip_prefix("file://") {
let data: Bytes = if let Some(x) = path.strip_prefix("file://") {
let path = if x.starts_with('~') {
let home_path = dirs::home_dir()
.ok_or(error::Kind::InvalidPath)
@ -97,7 +97,11 @@ impl List {
let result = tokio::fs::read(path.clone()).await.track(x)?;
result.into()
} else {
let response = client.get(full_path.clone()).send().await.track(track)?;
let response = client.get(path.clone()).send().await.track(track)?;
let Some(progress) = progress else {
let bytes = response.bytes().await.track(track)?;
return Ok((bytes, path));
};
let total = response
.content_length()
@ -119,14 +123,18 @@ impl List {
bytes.into()
};
Ok((data, full_path))
Ok((data, path))
}
/// Fetches and downloads a random track from the [List].
///
/// The Result's error is a bool, which is true if a timeout error occured,
/// and false otherwise. This tells lowfi if it shouldn't wait to try again.
pub async fn random(&self, client: &Client, progress: &AtomicU8) -> tracks::Result<Queued> {
pub async fn random(
&self,
client: &Client,
progress: Option<&AtomicU8>,
) -> tracks::Result<Queued> {
let (path, display) = self.random_path();
let (data, path) = self.download(&path, client, progress).await?;

110
src/ui.rs
View File

@ -1,16 +1,12 @@
use std::{
sync::{atomic::AtomicU8, Arc},
time::Duration,
};
use std::sync::Arc;
use crate::{
tracks,
ui::{environment::Environment, window::Window},
Args, Message,
player::Current,
ui::{self, environment::Environment, window::Window},
Args,
};
use rodio::Sink;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
sync::{broadcast, mpsc::Sender},
task::JoinHandle,
};
mod components;
@ -32,74 +28,64 @@ pub enum Error {
Write(#[from] std::io::Error),
#[error("sending message to backend from ui failed")]
Communication(#[from] tokio::sync::mpsc::error::SendError<Message>),
CrateSend(#[from] tokio::sync::mpsc::error::SendError<crate::Message>),
#[error("sharing state between backend and frontend failed")]
UiSend(#[from] tokio::sync::broadcast::error::SendError<Update>),
}
#[derive(Clone)]
pub struct State {
pub sink: Arc<rodio::Sink>,
pub progress: Arc<AtomicU8>,
pub track: Option<tracks::Info>,
pub current: Current,
pub bookmarked: bool,
width: usize,
}
impl State {
pub fn update(&mut self, update: Update) {
self.track = update.track;
self.bookmarked = update.bookmarked;
}
pub fn initial(sink: Arc<rodio::Sink>, width: usize, progress: Arc<AtomicU8>) -> Self {
pub fn initial(sink: Arc<rodio::Sink>, args: &Args, current: Current) -> Self {
let width = 21 + args.width.min(32) * 2;
Self {
width,
sink,
progress,
track: None,
current,
bookmarked: false,
}
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Update {
pub track: Option<tracks::Info>,
pub bookmarked: bool,
#[derive(Debug, Clone)]
pub enum Update {
Track(Current),
Bookmarked(bool),
Quit,
}
#[derive(Debug)]
struct Handles {
struct Tasks {
render: JoinHandle<Result<()>>,
input: JoinHandle<Result<()>>,
}
#[derive(Copy, Clone, Debug)]
struct Params {
borderless: bool,
minimalist: bool,
delta: Duration,
}
#[derive(Debug)]
pub struct UI {
pub utx: Sender<Message>,
handles: Handles,
pub struct Handle {
tasks: Tasks,
_environment: Environment,
}
impl Drop for UI {
impl Drop for Handle {
fn drop(&mut self) {
self.handles.input.abort();
self.handles.render.abort();
self.tasks.input.abort();
self.tasks.render.abort();
}
}
impl UI {
pub async fn render(&mut self, data: Update) -> Result<()> {
self.utx.send(Message::Render(data)).await?;
Ok(())
}
async fn ui(mut rx: Receiver<Message>, mut state: State, params: Params) -> Result<()> {
impl Handle {
async fn ui(
mut rx: broadcast::Receiver<Update>,
mut state: State,
params: interface::Params,
) -> Result<()> {
let mut interval = tokio::time::interval(params.delta);
let mut window = Window::new(state.width, params.borderless);
@ -108,45 +94,29 @@ impl UI {
if let Ok(message) = rx.try_recv() {
match message {
Message::Render(update) => state.update(update),
Message::Quit => break,
_ => continue,
Update::Track(track) => state.current = track,
Update::Bookmarked(bookmarked) => state.bookmarked = bookmarked,
Update::Quit => break,
}
};
interval.tick().await;
}
// environment.cleanup()?;
Ok(())
}
pub async fn init(
tx: Sender<Message>,
progress: Arc<AtomicU8>,
sink: Arc<Sink>,
tx: Sender<crate::Message>,
updater: broadcast::Receiver<ui::Update>,
state: State,
args: &Args,
) -> Result<Self> {
let environment = Environment::ready(args.alternate)?;
let (utx, urx) = mpsc::channel(8);
let delta = 1.0 / f32::from(args.fps);
let delta = Duration::from_secs_f32(delta);
let width = 21 + args.width.min(32) * 2;
Ok(Self {
utx,
_environment: environment,
handles: Handles {
render: tokio::spawn(Self::ui(
urx,
State::initial(sink, width, progress),
Params {
delta,
minimalist: args.minimalist,
borderless: args.borderless,
},
)),
tasks: Tasks {
render: tokio::spawn(Self::ui(updater, state, interface::Params::from(args))),
input: tokio::spawn(input::listen(tx)),
},
})

View File

@ -6,7 +6,7 @@ use std::time::Duration;
use crossterm::style::Stylize as _;
use unicode_segmentation::UnicodeSegmentation as _;
use crate::{tracks, ui};
use crate::{player::Current, tracks, ui};
/// Small helper function to format durations.
pub fn format_duration(duration: &Duration) -> String {
@ -19,14 +19,14 @@ pub fn format_duration(duration: &Duration) -> String {
/// Creates the progress bar, as well as all the padding needed.
pub fn progress_bar(state: &ui::State, width: usize) -> String {
let mut duration = Duration::new(0, 0);
let elapsed = if state.track.is_some() {
let elapsed = if matches!(&state.current, Current::Track(_)) {
state.sink.get_pos()
} else {
Duration::new(0, 0)
};
let mut filled = 0;
if let Some(current) = &state.track {
if let Current::Track(current) = &state.current {
if let Some(x) = current.duration {
duration = x;
@ -106,33 +106,22 @@ impl ActionBar {
/// Creates the top/action bar, which has the name of the track and it's status.
/// This also creates all the needed padding.
pub fn action(state: &ui::State, width: usize) -> String {
let (main, len) = state
.track
.as_ref()
.map_or_else(
|| {
ActionBar::Loading(
state
.progress
.load(std::sync::atomic::Ordering::Acquire)
.into(),
)
},
|info| {
if state.sink.volume() < 0.01 {
return ActionBar::Muted;
let action = match state.current.clone() {
Current::Loading(progress) => {
ActionBar::Loading(progress.load(std::sync::atomic::Ordering::Relaxed))
}
let info = info.clone();
if state.sink.is_paused() {
Current::Track(info) => {
if state.sink.volume() < 0.01 {
ActionBar::Muted
} else if state.sink.is_paused() {
ActionBar::Paused(info)
} else {
ActionBar::Playing(info)
}
},
)
.format(state.bookmarked);
}
};
let (main, len) = action.format(state.bookmarked);
if len > width {
let chopped: String = main.graphemes(true).take(width + 1).collect();

View File

@ -1,9 +1,34 @@
use crate::ui::{self, components, window::Window};
use std::time::Duration;
use crate::{
ui::{self, components, window::Window},
Args,
};
#[derive(Copy, Clone, Debug)]
pub struct Params {
pub borderless: bool,
pub minimalist: bool,
pub delta: Duration,
}
impl From<&Args> for Params {
fn from(args: &Args) -> Self {
let delta = 1.0 / f32::from(args.fps);
let delta = Duration::from_secs_f32(delta);
Self {
delta,
minimalist: args.minimalist,
borderless: args.borderless,
}
}
}
/// The code for the terminal interface itself.
///
/// * `minimalist` - All this does is hide the bottom control bar.
pub async fn draw(state: &ui::State, window: &mut Window, params: ui::Params) -> super::Result<()> {
pub async fn draw(state: &ui::State, window: &mut Window, params: Params) -> super::Result<()> {
let action = components::action(&state, state.width);
let volume = state.sink.volume();
@ -25,7 +50,7 @@ pub async fn draw(state: &ui::State, window: &mut Window, params: ui::Params) ->
let controls = components::controls(state.width);
let menu = match (params.minimalist, &state.track) {
let menu = match (params.minimalist, &state.current) {
(true, _) => vec![action, middle],
// (false, Some(x)) => vec![x.path.clone(), action, middle, controls],
_ => vec![action, middle, controls],

View File

@ -62,7 +62,7 @@ impl PersistentVolume {
}
/// Saves `volume` to `volume.txt`.
pub async fn save(volume: f32) -> Result<()> {
pub async fn save(&self) -> Result<()> {
let config = Self::config().await?;
let path = config.join(PathBuf::from("volume.txt"));
@ -72,9 +72,7 @@ impl PersistentVolume {
clippy::cast_sign_loss,
clippy::cast_possible_truncation
)]
let percentage = (volume * 100.0).abs().round() as u16;
fs::write(path, percentage.to_string()).await?;
fs::write(path, self.inner.to_string()).await?;
Ok(())
}