feat: add audio waiter

This commit is contained in:
Tal 2025-11-17 22:28:43 +01:00
parent 9439866f52
commit b035061fd0
12 changed files with 294 additions and 148 deletions

11
Cargo.lock generated
View File

@ -110,6 +110,12 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arrayvec"
version = "0.7.6"
@ -1342,9 +1348,9 @@ dependencies = [
[[package]]
name = "indenter"
version = "0.3.3"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
checksum = "964de6e86d545b246d84badc0fef527924ace5134f30641c203ef52ba83f58d5"
[[package]]
name = "indexmap"
@ -1485,6 +1491,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
name = "lowfi"
version = "1.7.2"
dependencies = [
"arc-swap",
"bytes",
"clap",
"convert_case 0.8.0",

View File

@ -30,6 +30,7 @@ thiserror = "2.0.12"
# Async
tokio = { version = "1.41.1", features = ["macros", "rt-multi-thread", "fs"], default-features = false }
arc-swap = "1.7.1"
futures = "0.3.31"
# Data

View File

@ -1,3 +1,12 @@
use std::{
sync::{atomic::AtomicBool, Arc},
thread::sleep,
time::Duration,
};
use rodio::Sink;
use tokio::sync::mpsc;
/// This gets the output stream while also shutting up alsa with [libc].
/// Uses raw libc calls, and therefore is functional only on Linux.
#[cfg(target_os = "linux")]
@ -38,3 +47,19 @@ pub fn silent_get_output_stream() -> eyre::Result<rodio::OutputStream, crate::Er
Ok(stream)
}
static LISTEN: AtomicBool = AtomicBool::new(false);
pub fn playing(status: bool) {
LISTEN.store(status, std::sync::atomic::Ordering::Relaxed);
}
pub fn waiter(sink: Arc<Sink>, tx: mpsc::Sender<crate::Message>) -> crate::Result<()> {
loop {
sleep(Duration::from_millis(100));
sink.sleep_until_end();
if LISTEN.load(std::sync::atomic::Ordering::Relaxed) {
tx.blocking_send(crate::Message::Next)?;
}
}
}

View File

@ -46,14 +46,4 @@ pub enum Error {
#[error("ui failure")]
UI(#[from] ui::Error),
#[cfg(feature = "mpris")]
#[error("mpris bus error")]
ZBus(#[from] mpris_server::zbus::Error),
// TODO: This has a terrible error message, mainly because I barely understand
// what this error even represents. What does fdo mean?!?!? Why, MPRIS!?!?
#[cfg(feature = "mpris")]
#[error("mpris fdo (zbus interface) error")]
Fdo(#[from] mpris_server::zbus::fdo::Error),
}

View File

@ -15,12 +15,12 @@ pub mod audio;
pub mod bookmark;
pub mod download;
pub mod player;
#[allow(clippy::all, clippy::pedantic, clippy::nursery, clippy::restriction)]
#[cfg(feature = "scrape")]
mod scrapers;
pub mod tracks;
pub mod volume;
#[cfg(feature = "scrape")]
mod scrapers;
#[cfg(feature = "scrape")]
use crate::scrapers::Source;
@ -108,7 +108,11 @@ async fn main() -> eyre::Result<()> {
}
} else {
let player = Player::init(args).await?;
player.run().await?;
let environment = player.environment();
let result = player.run().await;
environment.cleanup(result.is_ok())?;
result?;
};
Ok(())

View File

@ -1,17 +1,22 @@
/// Handles communication between different parts of the program.
#[allow(dead_code, reason = "this code may not be dead depending on features")]
#[derive(PartialEq, Debug, Clone)]
pub enum Message {
/// Notifies the audio server that it should update the track.
/// Deliberate user request to go to the next song.
Next,
/// When a track is loaded after a caller previously being told to wait.
/// Sent by the audio waiter whenever it believes a track has ended.
End,
/// When a track is loaded after the caller previously being told to wait.
/// If a track is taken from the queue, then there is no waiting, so this
/// is never actually sent.
Loaded,
/// Similar to Next, but specific to the first track.
Init,
/// Unpause the [Sink].
#[allow(dead_code, reason = "this code may not be dead depending on features")]
Play,
/// Pauses the [Sink].
@ -23,6 +28,9 @@ pub enum Message {
/// Change the volume of playback.
ChangeVolume(f32),
/// Set the volume of playback, rather than changing it.
SetVolume(f32),
/// Bookmark the current track.
Bookmark,

View File

@ -1,11 +1,15 @@
use std::sync::Arc;
use tokio::sync::{
broadcast,
mpsc::{self, Receiver, Sender},
use tokio::{
sync::{
broadcast,
mpsc::{self, Receiver},
},
task::JoinHandle,
};
use crate::{
audio,
bookmark::Bookmarks,
download::{self, Downloader},
tracks::{self, List},
@ -20,26 +24,36 @@ pub enum Current {
Track(tracks::Info),
}
impl Current {
pub fn loading(&self) -> bool {
return matches!(self, Current::Loading(_));
}
}
pub struct Player {
downloader: download::Handle,
volume: PersistentVolume,
bookmarks: Bookmarks,
sink: Arc<rodio::Sink>,
rx: Receiver<crate::Message>,
broadcast: broadcast::Sender<ui::Update>,
current: Current,
_ui: ui::Handle,
_tx: Sender<crate::Message>,
ui: ui::Handle,
waiter: JoinHandle<crate::Result<()>>,
_stream: rodio::OutputStream,
}
impl Drop for Player {
fn drop(&mut self) {
self.sink.stop();
self.waiter.abort();
}
}
impl Player {
pub fn environment(&self) -> ui::Environment {
self.ui.environment
}
pub async fn set_current(&mut self, current: Current) -> crate::Result<()> {
self.current = current.clone();
self.update(ui::Update::Track(current)).await?;
@ -61,7 +75,7 @@ impl Player {
pub async fn init(args: crate::Args) -> crate::Result<Self> {
#[cfg(target_os = "linux")]
let mut stream = audio::silent_get_output_stream()?;
let mut stream = crate::audio::silent_get_output_stream()?;
#[cfg(not(target_os = "linux"))]
let mut stream = rodio::OutputStreamBuilder::open_default_stream()?;
stream.log_on_drop(false);
@ -72,15 +86,18 @@ impl Player {
let (utx, urx) = broadcast::channel(8);
let current = Current::Loading(download::progress());
let state = ui::State::initial(sink.clone(), &args, current.clone());
let list = List::load(args.track_list.as_ref()).await?;
let state = ui::State::initial(sink.clone(), &args, current.clone(), list.name.clone());
let ui = ui::Handle::init(tx.clone(), urx, state.clone(), &args).await?;
let volume = PersistentVolume::load().await?;
sink.set_volume(volume.float());
let bookmarks = Bookmarks::load().await?;
let list = List::load(args.track_list.as_ref()).await?;
let downloader = Downloader::init(args.buffer_size, list, tx.clone()).await;
let clone = sink.clone();
let waiter = tokio::task::spawn_blocking(move || audio::waiter(clone, tx));
Ok(Self {
current,
downloader,
@ -88,16 +105,15 @@ impl Player {
rx,
sink,
bookmarks,
volume,
_ui: ui,
ui,
waiter,
_stream: stream,
_tx: tx,
})
}
pub async fn close(&self) -> crate::Result<()> {
self.bookmarks.save().await?;
self.volume.save().await?;
PersistentVolume::save(self.sink.volume() as f32).await?;
Ok(())
}
@ -113,26 +129,29 @@ impl Player {
pub async fn run(mut self) -> crate::Result<()> {
while let Some(message) = self.rx.recv().await {
match message {
Message::Next | Message::Init | Message::Loaded => {
Message::Next | Message::Init | Message::Loaded | Message::End => {
if message == Message::Next && self.current.loading() {
continue;
}
audio::playing(false);
self.sink.stop();
match self.downloader.track().await {
download::Output::Loading(progress) => {
self.set_current(Current::Loading(progress)).await?
self.set_current(Current::Loading(progress)).await?;
}
download::Output::Queued(queued) => {
self.play(queued).await?;
audio::playing(true);
}
download::Output::Queued(queued) => self.play(queued).await?,
};
}
Message::Play => {
self.sink.play();
// #[cfg(feature = "mpris")]
// mpris.playback(PlaybackStatus::Playing).await?;
}
Message::Pause => {
self.sink.pause();
// #[cfg(feature = "mpris")]
// mpris.playback(PlaybackStatus::Paused).await?;
}
Message::PlayPause => {
if self.sink.is_paused() {
@ -140,32 +159,43 @@ impl Player {
} else {
self.sink.pause();
}
// #[cfg(feature = "mpris")]
// mpris
// .playback(mpris.player().playback_status().await?)
// .await?;
}
Message::ChangeVolume(change) => {
self.sink.set_volume(self.sink.volume() + change);
// #[cfg(feature = "mpris")]
// mpris
// .changed(vec![Property::Volume(player.sink.volume().into())])
// .await?;
self.sink
.set_volume((self.sink.volume() + change).clamp(0.0, 1.0));
self.update(ui::Update::Volume).await?;
}
Message::SetVolume(set) => {
self.sink.set_volume(set.clamp(0.0, 1.0));
self.update(ui::Update::Volume).await?;
}
Message::Bookmark => {
let Current::Track(current) = &self.current else {
continue;
};
self.bookmarks.bookmark(current).await?;
let bookmarked = self.bookmarks.bookmark(current).await?;
self.update(ui::Update::Bookmarked(bookmarked)).await?;
}
Message::Quit => break,
}
#[cfg(feature = "mpris")]
match message {
Message::ChangeVolume(_) | Message::SetVolume(_) => {
self.ui.mpris.update_volume().await?
}
Message::Play | Message::Pause | Message::PlayPause => {
self.ui.mpris.update_playback().await?
}
Message::Init | Message::Loaded | Message::Next => {
self.ui.mpris.update_metadata().await?
}
_ => (),
}
}
// self.close().await?;
self.close().await?;
Ok(())
}
}

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use crate::{
player::Current,
ui::{self, environment::Environment, window::Window},
ui::{self, window::Window},
Args,
};
use tokio::{
@ -12,10 +12,14 @@ use tokio::{
};
mod components;
mod environment;
pub use environment::Environment;
mod input;
mod interface;
mod window;
#[cfg(feature = "mpris")]
pub mod mpris;
type Result<T> = std::result::Result<T, Error>;
/// The error type for the UI, which is used to handle errors that occur
@ -33,6 +37,14 @@ pub enum Error {
#[error("sharing state between backend and frontend failed")]
UiSend(#[from] tokio::sync::broadcast::error::SendError<Update>),
#[cfg(feature = "mpris")]
#[error("mpris bus error")]
ZBus(#[from] mpris_server::zbus::Error),
#[cfg(feature = "mpris")]
#[error("mpris fdo (zbus interface) error")]
Fdo(#[from] mpris_server::zbus::fdo::Error),
}
#[derive(Clone)]
@ -40,17 +52,19 @@ pub struct State {
pub sink: Arc<rodio::Sink>,
pub current: Current,
pub bookmarked: bool,
list: String,
timer: Option<Instant>,
width: usize,
}
impl State {
pub fn initial(sink: Arc<rodio::Sink>, args: &Args, current: Current) -> Self {
pub fn initial(sink: Arc<rodio::Sink>, args: &Args, current: Current, list: String) -> Self {
let width = 21 + args.width.min(32) * 2;
Self {
width,
sink,
current,
list,
bookmarked: false,
timer: None,
}
@ -61,6 +75,7 @@ impl State {
pub enum Update {
Track(Current),
Bookmarked(bool),
Volume,
Quit,
}
@ -70,10 +85,11 @@ struct Tasks {
input: JoinHandle<Result<()>>,
}
#[derive(Debug)]
pub struct Handle {
tasks: Tasks,
_environment: Environment,
pub environment: Environment,
#[cfg(feature = "mpris")]
pub mpris: mpris::Server,
}
impl Drop for Handle {
@ -99,6 +115,7 @@ impl Handle {
match message {
Update::Track(track) => state.current = track,
Update::Bookmarked(bookmarked) => state.bookmarked = bookmarked,
Update::Volume => state.timer = Some(Instant::now()),
Update::Quit => break,
}
};
@ -117,7 +134,9 @@ impl Handle {
) -> Result<Self> {
let environment = Environment::ready(args.alternate)?;
Ok(Self {
_environment: environment,
#[cfg(feature = "mpris")]
mpris: mpris::Server::new(state.clone(), tx.clone(), updater.resubscribe()).await?,
environment,
tasks: Tasks {
render: tokio::spawn(Self::ui(updater, state, interface::Params::from(args))),
input: tokio::spawn(input::listen(tx)),

View File

@ -1,4 +1,4 @@
use std::io::stdout;
use std::{io::stdout, panic};
use crossterm::{
cursor::{Hide, MoveTo, Show},
@ -8,7 +8,7 @@ use crossterm::{
/// Represents the terminal environment, and is used to properly
/// initialize and clean up the terminal.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct Environment {
/// Whether keyboard enhancements are enabled.
enhancement: bool,
@ -38,15 +38,22 @@ impl Environment {
)?;
}
Ok(Self {
let environment = Self {
enhancement,
alternate,
})
};
panic::set_hook(Box::new(move |info| {
let _ = environment.cleanup(false);
eprintln!("panic: {}", info);
}));
Ok(environment)
}
/// Uses the information collected from initialization to safely close down
/// the terminal & restore it to it's previous state.
pub fn cleanup(&self) -> super::Result<()> {
pub fn cleanup(&self, elegant: bool) -> super::Result<()> {
let mut lock = stdout().lock();
if self.alternate {
@ -60,16 +67,10 @@ impl Environment {
}
terminal::disable_raw_mode()?;
eprintln!("bye! :)");
if elegant {
eprintln!("bye! :)");
}
Ok(())
}
}
impl Drop for Environment {
/// Just a wrapper for [`Environment::cleanup`] which ignores any errors thrown.
fn drop(&mut self) {
// Well, we're dropping it, so it doesn't really matter if there's an error.
let _ = self.cleanup();
}
}

View File

@ -31,17 +31,17 @@ impl From<&Args> for Params {
pub async fn draw(state: &mut ui::State, window: &mut Window, params: Params) -> super::Result<()> {
let action = components::action(&state, state.width);
let volume = state.sink.volume();
let percentage = format!("{}%", (volume * 100.0).round().abs());
let middle = match state.timer {
Some(timer) => {
if timer.elapsed() > Duration::from_secs(3) {
let volume = state.sink.volume();
let percentage = format!("{}%", (volume * 100.0).round().abs());
if timer.elapsed() > Duration::from_secs(1) {
state.timer = None;
};
components::progress_bar(&state, state.width - 16)
components::audio_bar(state.width - 17, volume, &percentage)
}
None => components::audio_bar(state.width - 17, volume, &percentage),
None => components::progress_bar(&state, state.width - 16),
};
let controls = components::controls(state.width);

View File

@ -1,27 +1,62 @@
//! Contains the code for the MPRIS server & other helper functions.
use std::{env, process, sync::Arc};
use std::{
env,
hash::{DefaultHasher, Hash, Hasher},
process,
sync::Arc,
};
use arc_swap::ArcSwap;
use mpris_server::{
zbus::{self, fdo, Result},
LoopStatus, Metadata, PlaybackRate, PlaybackStatus, PlayerInterface, Property, RootInterface,
Time, TrackId, Volume,
};
use tokio::sync::mpsc::Sender;
use rodio::Sink;
use tokio::sync::{broadcast, mpsc};
use super::ui;
use super::Message;
use crate::{player::Current, ui::Update};
use crate::{ui, Message};
const ERROR: fdo::Error = fdo::Error::Failed(String::new());
struct Sender {
inner: mpsc::Sender<Message>,
}
impl Sender {
pub fn new(inner: mpsc::Sender<Message>) -> Self {
Self { inner }
}
pub async fn send(&self, message: Message) -> fdo::Result<()> {
self.inner
.send(message)
.await
.map_err(|x| fdo::Error::Failed(x.to_string()))
}
pub async fn zbus(&self, message: Message) -> zbus::Result<()> {
self.inner
.send(message)
.await
.map_err(|x| zbus::Error::Failure(x.to_string()))
}
}
impl Into<fdo::Error> for crate::Error {
fn into(self) -> fdo::Error {
fdo::Error::Failed(self.to_string())
}
}
/// The actual MPRIS player.
pub struct Player {
/// A reference to the [`super::Player`] itself.
pub player: Arc<super::Player>,
/// The audio server sender, which is used to communicate with
/// the audio sender for skips and a few other inputs.
pub sender: Sender<Message>,
sink: Arc<Sink>,
current: ArcSwap<Current>,
list: String,
sender: Sender,
}
impl RootInterface for Player {
@ -30,10 +65,7 @@ impl RootInterface for Player {
}
async fn quit(&self) -> fdo::Result<()> {
self.sender
.send(Message::Quit)
.await
.map_err(|_error| ERROR)
self.sender.send(Message::Quit).await
}
async fn can_quit(&self) -> fdo::Result<bool> {
@ -79,10 +111,7 @@ impl RootInterface for Player {
impl PlayerInterface for Player {
async fn next(&self) -> fdo::Result<()> {
self.sender
.send(Message::Next)
.await
.map_err(|_error| ERROR)
self.sender.send(Message::Next).await
}
async fn previous(&self) -> fdo::Result<()> {
@ -90,17 +119,11 @@ impl PlayerInterface for Player {
}
async fn pause(&self) -> fdo::Result<()> {
self.sender
.send(Message::Pause)
.await
.map_err(|_error| ERROR)
self.sender.send(Message::Pause).await
}
async fn play_pause(&self) -> fdo::Result<()> {
self.sender
.send(Message::PlayPause)
.await
.map_err(|_error| ERROR)
self.sender.send(Message::PlayPause).await
}
async fn stop(&self) -> fdo::Result<()> {
@ -108,10 +131,7 @@ impl PlayerInterface for Player {
}
async fn play(&self) -> fdo::Result<()> {
self.sender
.send(Message::Play)
.await
.map_err(|_error| ERROR)
self.sender.send(Message::Play).await
}
async fn seek(&self, _offset: Time) -> fdo::Result<()> {
@ -127,9 +147,9 @@ impl PlayerInterface for Player {
}
async fn playback_status(&self) -> fdo::Result<PlaybackStatus> {
Ok(if !self.player.current_exists() {
Ok(if self.current.load().loading() {
PlaybackStatus::Stopped
} else if self.player.sink.is_paused() {
} else if self.sink.is_paused() {
PlaybackStatus::Paused
} else {
PlaybackStatus::Playing
@ -145,11 +165,11 @@ impl PlayerInterface for Player {
}
async fn rate(&self) -> fdo::Result<PlaybackRate> {
Ok(self.player.sink.speed().into())
Ok(self.sink.speed().into())
}
async fn set_rate(&self, rate: PlaybackRate) -> Result<()> {
self.player.sink.set_speed(rate as f32);
self.sink.set_speed(rate as f32);
Ok(())
}
@ -162,15 +182,23 @@ impl PlayerInterface for Player {
}
async fn metadata(&self) -> fdo::Result<Metadata> {
let metadata = self
.player
.current
.load()
.as_ref()
.map_or_else(Metadata::new, |track| {
Ok(match self.current.load().as_ref() {
Current::Loading(_) => Metadata::new(),
Current::Track(track) => {
let mut hasher = DefaultHasher::new();
track.path.hash(&mut hasher);
let id = mpris_server::zbus::zvariant::ObjectPath::try_from(format!(
"/com/talwat/lowfi/{}/{}",
self.list,
hasher.finish()
))
.unwrap();
let mut metadata = Metadata::builder()
.title(track.display_name.clone())
.album(self.player.list.name.clone())
.trackid(id)
.title(track.display.clone())
.album(self.list.clone())
.build();
metadata.set_length(
@ -180,26 +208,20 @@ impl PlayerInterface for Player {
);
metadata
});
Ok(metadata)
}
})
}
async fn volume(&self) -> fdo::Result<Volume> {
Ok(self.player.sink.volume().into())
Ok(self.sink.volume().into())
}
async fn set_volume(&self, volume: Volume) -> Result<()> {
self.player.set_volume(volume as f32);
ui::flash_audio();
Ok(())
self.sender.zbus(Message::SetVolume(volume as f32)).await
}
async fn position(&self) -> fdo::Result<Time> {
Ok(Time::from_micros(
self.player.sink.get_pos().as_micros() as i64
))
Ok(Time::from_micros(self.sink.get_pos().as_micros() as i64))
}
async fn minimum_rate(&self) -> fdo::Result<PlaybackRate> {
@ -240,22 +262,47 @@ impl PlayerInterface for Player {
pub struct Server {
/// The inner MPRIS server.
inner: mpris_server::Server<Player>,
/// Broadcast reciever.
reciever: broadcast::Receiver<Update>,
}
impl Server {
/// Shorthand to emit a `PropertiesChanged` signal, like when pausing/unpausing.
pub async fn changed(
&self,
&mut self,
properties: impl IntoIterator<Item = mpris_server::Property> + Send + Sync,
) -> zbus::Result<()> {
self.inner.properties_changed(properties).await
) -> ui::Result<()> {
while let Ok(update) = self.reciever.try_recv() {
if let Update::Track(current) = update {
self.player().current.swap(Arc::new(current));
}
}
self.inner.properties_changed(properties).await?;
Ok(())
}
pub async fn update_volume(&mut self) -> ui::Result<()> {
self.changed(vec![Property::Volume(self.player().sink.volume().into())])
.await?;
Ok(())
}
/// Shorthand to emit a `PropertiesChanged` signal, specifically about playback.
pub async fn playback(&self, new: PlaybackStatus) -> zbus::Result<()> {
self.inner
.properties_changed(vec![Property::PlaybackStatus(new)])
.await
pub async fn update_playback(&mut self) -> ui::Result<()> {
let status = self.player().playback_status().await?;
self.changed(vec![Property::PlaybackStatus(status)]).await?;
Ok(())
}
pub async fn update_metadata(&mut self) -> ui::Result<()> {
let metadata = self.player().metadata().await?;
self.changed(vec![Property::Metadata(metadata)]).await?;
Ok(())
}
/// Shorthand to get the inner mpris player object.
@ -265,17 +312,30 @@ impl Server {
/// Creates a new MPRIS server.
pub async fn new(
player: Arc<super::Player>,
sender: Sender<Message>,
) -> eyre::Result<Self, zbus::Error> {
state: ui::State,
sender: mpsc::Sender<Message>,
reciever: broadcast::Receiver<Update>,
) -> ui::Result<Server> {
let suffix = if env::var("LOWFI_FIXED_MPRIS_NAME").is_ok_and(|x| x == "1") {
String::from("lowfi")
} else {
format!("lowfi.{}.instance{}", player.list.name, process::id())
format!("lowfi.{}.instance{}", state.list, process::id())
};
let server = mpris_server::Server::new(&suffix, Player { player, sender }).await?;
let server = mpris_server::Server::new(
&suffix,
Player {
sender: Sender::new(sender),
sink: state.sink,
current: ArcSwap::new(Arc::new(state.current)),
list: state.list,
},
)
.await?;
Ok(Self { inner: server })
Ok(Self {
inner: server,
reciever,
})
}
}

View File

@ -62,7 +62,7 @@ impl PersistentVolume {
}
/// Saves `volume` to `volume.txt`.
pub async fn save(&self) -> Result<()> {
pub async fn save(volume: f32) -> Result<()> {
let config = Self::config().await?;
let path = config.join(PathBuf::from("volume.txt"));
@ -72,7 +72,8 @@ impl PersistentVolume {
clippy::cast_sign_loss,
clippy::cast_possible_truncation
)]
fs::write(path, self.inner.to_string()).await?;
let percentage = (volume * 100.0).abs().round() as u16;
fs::write(path, percentage.to_string()).await?;
Ok(())
}