feat: make lowfi more resilient to network issues

This commit is contained in:
talwat 2024-09-28 01:59:46 +02:00
parent 672b0c5c98
commit f38e1a8a76
5 changed files with 42 additions and 15 deletions

2
Cargo.lock generated
View File

@ -982,7 +982,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "lowfi"
version = "1.0.2"
version = "1.1.0"
dependencies = [
"Inflector",
"arc-swap",

View File

@ -1,6 +1,6 @@
[package]
name = "lowfi"
version = "1.0.2"
version = "1.1.0"
edition = "2021"
description = "An extremely simple lofi player."
license = "MIT"

View File

@ -1,5 +1,3 @@
#![warn(clippy::cargo)]
use clap::{Parser, Subcommand};
mod play;

View File

@ -16,7 +16,7 @@ pub async fn play() -> eyre::Result<()> {
let (tx, rx) = mpsc::channel(8);
let player = Arc::new(Player::new().await?);
let audio = task::spawn(Player::play(Arc::clone(&player), rx));
let audio = task::spawn(Player::play(Arc::clone(&player), tx.clone(), rx));
tx.send(Messages::Init).await?;
ui::start(Arc::clone(&player), tx.clone()).await?;

View File

@ -2,7 +2,7 @@
//! This also has the code for the underlying
//! audio server which adds new tracks.
use std::{collections::VecDeque, sync::Arc};
use std::{collections::VecDeque, sync::Arc, time::Duration};
use arc_swap::ArcSwapOption;
use reqwest::Client;
@ -10,7 +10,7 @@ use rodio::{OutputStream, OutputStreamHandle, Sink};
use tokio::{
select,
sync::{
mpsc::{self, Receiver},
mpsc::{self, Receiver, Sender},
RwLock,
},
task,
@ -25,6 +25,10 @@ pub enum Messages {
/// Notifies the audio server that it should update the track.
Next,
/// This signal is only sent if a track timed out. In that case,
/// lowfi will try again and again to retrieve the track.
TryAgain,
/// Similar to Next, but specific to the first track.
Init,
@ -32,6 +36,8 @@ pub enum Messages {
Pause,
}
const TIMEOUT: Duration = Duration::from_secs(3);
/// The amount of songs to buffer up.
const BUFFER_SIZE: usize = 5;
@ -78,7 +84,14 @@ impl Player {
Ok(Self {
tracks: RwLock::new(VecDeque::with_capacity(5)),
current: ArcSwapOption::new(None),
client: Client::builder().build()?,
client: Client::builder()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
.timeout(TIMEOUT)
.build()?,
sink,
_handle: handle,
_stream,
@ -111,7 +124,11 @@ impl Player {
///
/// `rx` is used to communicate with it, for example when to
/// skip tracks or pause.
pub async fn play(queue: Arc<Self>, mut rx: Receiver<Messages>) -> eyre::Result<()> {
pub async fn play(
queue: Arc<Self>,
tx: Sender<Messages>,
mut rx: Receiver<Messages>,
) -> eyre::Result<()> {
// This is an internal channel which serves pretty much only one purpose,
// which is to notify the buffer refiller to get back to work.
// This channel is useful to prevent needing to check with some infinite loop.
@ -146,7 +163,7 @@ impl Player {
};
match msg {
Messages::Next | Messages::Init => {
Messages::Next | Messages::Init | Messages::TryAgain => {
// Skip as early as possible so that music doesn't play
// while lowfi is "loading".
queue.sink.stop();
@ -155,12 +172,24 @@ impl Player {
// This is also set by Player::next.
queue.current.store(None);
// Notify the background downloader that there's an empty spot
// in the buffer.
itx.send(()).await?;
let track = Self::next(Arc::clone(&queue)).await;
let track = Self::next(Arc::clone(&queue)).await?;
queue.sink.append(track.data);
match track {
Ok(track) => {
queue.sink.append(track.data);
// Notify the background downloader that there's an empty spot
// in the buffer.
itx.send(()).await?;
}
Err(error) => {
if !error.downcast::<reqwest::Error>()?.is_timeout() {
tokio::time::sleep(TIMEOUT).await;
}
tx.send(Messages::TryAgain).await?
}
};
}
Messages::Pause => {
if queue.sink.is_paused() {