feat: support for mpris (#14)

* feat: initial support for mpris

* feat: overhaul app flow, making audio server control main thread instead of the ui thread

* fix: remove useless extra thread

* fix: last touches

* fix: call interface with minimalist flag

* fix: fix oversight when silencing alsa
This commit is contained in:
Tal 2024-10-06 17:46:47 +02:00 committed by GitHub
parent baa2e095d9
commit 901bf0e871
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1102 additions and 200 deletions

666
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -16,6 +16,9 @@ documentation = "https://github.com/talwat/lowfi"
homepage = "https://github.com/talwat/lowfi"
repository = "https://github.com/talwat/lowfi"
[features]
mpris = ["dep:mpris-server"]
[dependencies]
# Basics
clap = { version = "4.5.18", features = ["derive", "cargo"] }
@ -37,7 +40,8 @@ bytes = "1.7.2"
# Misc
scraper = "0.20.0"
rodio = { version = "0.19.0", features = ["symphonia-mp3"], default-features = false }
crossterm = "0.28.1"
crossterm = { version = "0.28.1", features = ["event-stream"] }
Inflector = "0.11.4"
lazy_static = "1.5.0"
libc = "0.2.159"
mpris-server = { version = "0.8.1", optional = true }

View File

@ -12,15 +12,14 @@ use crate::Args;
/// it when the frontend quits.
pub async fn play(args: Args) -> eyre::Result<()> {
let (tx, rx) = mpsc::channel(8);
let player = Arc::new(Player::new(!args.alternate).await?);
let ui = task::spawn(ui::start(Arc::clone(&player), tx.clone(), args));
let player = Arc::new(Player::new(args.alternate).await?);
let audio = task::spawn(Player::play(Arc::clone(&player), tx.clone(), rx));
tx.send(Messages::Init).await?;
ui::start(Arc::clone(&player), tx.clone(), args).await?;
audio.abort();
Player::play(Arc::clone(&player), tx.clone(), rx).await?;
player.sink.stop();
ui.abort();
Ok(())
}

View File

@ -23,12 +23,20 @@ use crate::tracks::{DecodedTrack, Track, TrackInfo};
pub mod downloader;
pub mod ui;
#[cfg(feature = "mpris")]
pub mod mpris;
/// Handles communication between the frontend & audio player.
#[derive(PartialEq)]
#[derive(PartialEq, Debug)]
pub enum Messages {
/// Notifies the audio server that it should update the track.
Next,
/// Special in that this isn't sent in a "client to server" sort of way,
/// but rather is sent by a child of the server when a song has not only
/// been requested but also downloaded aswell.
NewSong,
/// This signal is only sent if a track timed out. In that case,
/// lowfi will try again and again to retrieve the track.
TryAgain,
@ -37,10 +45,13 @@ pub enum Messages {
Init,
/// Pauses the [Sink]. This will also unpause it if it is paused.
Pause,
PlayPause,
/// Change the volume of playback.
ChangeVolume(f32),
/// Quits gracefully.
Quit,
}
const TIMEOUT: Duration = Duration::from_secs(8);
@ -54,9 +65,14 @@ pub struct Player {
pub sink: Sink,
/// The [`TrackInfo`] of the current track.
/// This is [`None`] when lowfi is buffering.
/// This is [`None`] when lowfi is buffering/loading.
pub current: ArcSwapOption<TrackInfo>,
/// This is the MPRIS server, which is initialized later on in the
/// user interface.
#[cfg(feature = "mpris")]
pub mpris: tokio::sync::OnceCell<mpris_server::Server<mpris::Player>>,
/// The tracks, which is a [VecDeque] that holds
/// *undecoded* [Track]s.
tracks: RwLock<VecDeque<Track>>,
@ -105,6 +121,18 @@ impl Player {
Ok((stream, handle))
}
/// Just a shorthand for setting `current`.
async fn set_current(&self, info: TrackInfo) -> eyre::Result<()> {
self.current.store(Some(Arc::new(info)));
Ok(())
}
/// A shorthand for checking if `self.current` is [Some].
pub fn current_exists(&self) -> bool {
self.current.load().is_some()
}
/// Initializes the entire player, including audio devices & sink.
///
/// `silent` can control whether alsa's output should be redirected,
@ -133,33 +161,75 @@ impl Player {
sink,
_handle: handle,
_stream,
#[cfg(feature = "mpris")]
mpris: tokio::sync::OnceCell::new(),
};
Ok(player)
}
/// Just a shorthand for setting `current`.
async fn set_current(&self, info: TrackInfo) -> eyre::Result<()> {
self.current.store(Some(Arc::new(info)));
Ok(())
}
/// This will play the next track, as well as refilling the buffer in the background.
pub async fn next(queue: Arc<Self>) -> eyre::Result<DecodedTrack> {
let track = match queue.tracks.write().await.pop_front() {
pub async fn next(&self) -> eyre::Result<DecodedTrack> {
let track = match self.tracks.write().await.pop_front() {
Some(x) => x,
// If the queue is completely empty, then fallback to simply getting a new track.
// This is relevant particularly at the first song.
None => Track::random(&queue.client).await?,
None => Track::random(&self.client).await?,
};
let decoded = track.decode()?;
queue.set_current(decoded.info.clone()).await?;
Ok(decoded)
}
/// This basically just calls [`Player::next`], and then appends the new track to the player.
///
/// This also notifies the background thread to get to work, and will send `TryAgain`
/// if it fails. This functions purpose is to be called in the background, so that
/// when the audio server recieves a `Next` signal it will still be able to respond to other
/// signals while it's loading.
async fn handle_next(
player: Arc<Self>,
itx: Sender<()>,
tx: Sender<Messages>,
) -> eyre::Result<()> {
// Serves as an indicator that the queue is "loading".
player.current.store(None);
player.sink.stop();
let track = player.next().await;
match track {
Ok(track) => {
// Set the current track.
player.set_current(track.info.clone()).await?;
// Actually start playing it, this is done later so that the amount
// or times where "loading" appears briefly even though the track is
// from the buffer is minimized.
player.sink.append(track.data);
// Notify the background downloader that there's an empty spot
// in the buffer.
itx.send(()).await?;
// Notify the audio server that the next song has actually been downloaded.
tx.send(Messages::NewSong).await?
}
Err(error) => {
if !error.downcast::<reqwest::Error>()?.is_timeout() {
tokio::time::sleep(TIMEOUT).await;
}
tx.send(Messages::TryAgain).await?
}
};
Ok(())
}
/// This is the main "audio server".
///
/// `rx` & `tx` are used to communicate with it, for example when to
@ -171,50 +241,48 @@ impl Player {
) -> eyre::Result<()> {
// `itx` is used to notify the `Downloader` when it needs to download new tracks.
let (downloader, itx) = Downloader::new(player.clone());
downloader.start().await;
let downloader = downloader.start().await;
// Start buffering tracks immediately.
itx.send(()).await?;
loop {
let clone = Arc::clone(&player);
let msg = select! {
Some(x) = rx.recv() => x,
let msg = select! {
biased;
Some(x) = rx.recv() => x,
// This future will finish only at the end of the current track.
Ok(_) = task::spawn_blocking(move || clone.sink.sleep_until_end()) => Messages::Next,
// The condition is a kind-of hack which gets around the quirks
// of `sleep_until_end`.
//
// That's because `sleep_until_end` will return instantly if the sink
// is uninitialized. That's why we put a check to make sure that we're
// only considering it if the sink is empty, but a song is specified by the
// player.
//
// This makes sense since at the end of a song, the sink will be empty,
// but `player.current` still has yet to be cycled.
//
// This is in contrast to a typical `Next` signal, where the sink will
// not be empty.
Ok(_) = task::spawn_blocking(move || clone.sink.sleep_until_end()),
if player.sink.empty() && player.current_exists() => Messages::Next,
};
match msg {
Messages::Next | Messages::Init | Messages::TryAgain => {
// Skip as early as possible so that music doesn't play
// while lowfi is "loading".
player.sink.stop();
// This basically just prevents `Next` while a song is still currently loading.
if msg == Messages::Next && !player.current_exists() {
continue;
}
// Serves as an indicator that the queue is "loading".
// This is also set by Player::next.
player.current.store(None);
let track = Self::next(Arc::clone(&player)).await;
match track {
Ok(track) => {
player.sink.append(track.data);
// Notify the background downloader that there's an empty spot
// in the buffer.
itx.send(()).await?;
}
Err(error) => {
if !error.downcast::<reqwest::Error>()?.is_timeout() {
tokio::time::sleep(TIMEOUT).await;
}
tx.send(Messages::TryAgain).await?
}
};
// Handle the rest of the signal in the background,
// as to not block the main audio thread.
task::spawn(Self::handle_next(player.clone(), itx.clone(), tx.clone()));
}
Messages::Pause => {
Messages::PlayPause => {
if player.sink.is_paused() {
player.sink.play();
} else {
@ -226,7 +294,16 @@ impl Player {
.sink
.set_volume((player.sink.volume() + change).clamp(0.0, 1.0));
}
// This basically just continues, but more importantly, it'll re-evaluate
// the select macro at the beginning of the loop.
// See the top section to find out why this matters.
Messages::NewSong => continue,
Messages::Quit => break,
}
}
downloader.abort();
Ok(())
}
}

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
task,
task::{self, JoinHandle},
};
use crate::tracks::Track;
@ -34,7 +34,7 @@ impl Downloader {
}
/// Actually starts & consumes the [Downloader].
pub async fn start(mut self) {
pub async fn start(mut self) -> JoinHandle<()> {
task::spawn(async move {
// Loop through each update notification.
while self.rx.recv().await == Some(()) {
@ -47,6 +47,6 @@ impl Downloader {
self.player.tracks.write().await.push_back(track);
}
}
});
})
}
}

214
src/player/mpris.rs Normal file
View File

@ -0,0 +1,214 @@
use std::sync::Arc;
use mpris_server::{
zbus::{fdo, Result},
LoopStatus, Metadata, PlaybackRate, PlaybackStatus, PlayerInterface, RootInterface, Time,
TrackId, Volume,
};
use tokio::sync::mpsc::Sender;
use super::Messages;
const ERROR: fdo::Error = fdo::Error::Failed(String::new());
/// The actual MPRIS server.
pub struct Player {
pub player: Arc<super::Player>,
pub sender: Sender<Messages>,
}
impl RootInterface for Player {
async fn raise(&self) -> fdo::Result<()> {
Err(ERROR)
}
async fn quit(&self) -> fdo::Result<()> {
self.sender.send(Messages::Quit).await.map_err(|_| ERROR)
}
async fn can_quit(&self) -> fdo::Result<bool> {
Ok(true)
}
async fn fullscreen(&self) -> fdo::Result<bool> {
Ok(false)
}
async fn set_fullscreen(&self, _: bool) -> Result<()> {
Ok(())
}
async fn can_set_fullscreen(&self) -> fdo::Result<bool> {
Ok(false)
}
async fn can_raise(&self) -> fdo::Result<bool> {
Ok(false)
}
async fn has_track_list(&self) -> fdo::Result<bool> {
Ok(false)
}
async fn identity(&self) -> fdo::Result<String> {
Ok("lowfi".to_string())
}
async fn desktop_entry(&self) -> fdo::Result<String> {
Ok("dev.talwat.lowfi".to_string())
}
async fn supported_uri_schemes(&self) -> fdo::Result<Vec<String>> {
Ok(vec!["https".to_string()])
}
async fn supported_mime_types(&self) -> fdo::Result<Vec<String>> {
Ok(vec!["audio/mpeg".to_string()])
}
}
impl PlayerInterface for Player {
async fn next(&self) -> fdo::Result<()> {
self.sender.send(Messages::Next).await.map_err(|_| ERROR)
}
async fn previous(&self) -> fdo::Result<()> {
Err(ERROR)
}
async fn pause(&self) -> fdo::Result<()> {
self.sender
.send(Messages::PlayPause)
.await
.map_err(|_| ERROR)
}
async fn play_pause(&self) -> fdo::Result<()> {
self.sender
.send(Messages::PlayPause)
.await
.map_err(|_| ERROR)
}
async fn stop(&self) -> fdo::Result<()> {
self.play_pause().await
}
async fn play(&self) -> fdo::Result<()> {
self.play_pause().await
}
async fn seek(&self, _offset: Time) -> fdo::Result<()> {
Err(ERROR)
}
async fn set_position(&self, _track_id: TrackId, _position: Time) -> fdo::Result<()> {
Err(ERROR)
}
async fn open_uri(&self, _uri: String) -> fdo::Result<()> {
Err(ERROR)
}
async fn playback_status(&self) -> fdo::Result<PlaybackStatus> {
Ok(if !self.player.current_exists() {
PlaybackStatus::Stopped
} else if self.player.sink.is_paused() {
PlaybackStatus::Paused
} else {
PlaybackStatus::Playing
})
}
async fn loop_status(&self) -> fdo::Result<LoopStatus> {
Err(ERROR)
}
async fn set_loop_status(&self, _loop_status: LoopStatus) -> Result<()> {
Ok(())
}
async fn rate(&self) -> fdo::Result<PlaybackRate> {
Ok(self.player.sink.speed().into())
}
async fn set_rate(&self, rate: PlaybackRate) -> Result<()> {
self.player.sink.set_speed(rate as f32);
Ok(())
}
async fn shuffle(&self) -> fdo::Result<bool> {
Ok(true)
}
async fn set_shuffle(&self, _shuffle: bool) -> Result<()> {
Ok(())
}
async fn metadata(&self) -> fdo::Result<Metadata> {
let metadata = match self.player.current.load().as_ref() {
Some(track) => {
let mut metadata = Metadata::builder().title(track.name.clone()).build();
metadata.set_length(
track
.duration
.and_then(|x| Some(Time::from_micros(x.as_micros() as i64))),
);
metadata
}
None => Metadata::new(),
};
Ok(metadata)
}
async fn volume(&self) -> fdo::Result<Volume> {
Ok(self.player.sink.volume().into())
}
async fn set_volume(&self, volume: Volume) -> Result<()> {
self.player.sink.set_volume(volume as f32);
Ok(())
}
async fn position(&self) -> fdo::Result<Time> {
Ok(Time::from_micros(
self.player.sink.get_pos().as_micros() as i64
))
}
async fn minimum_rate(&self) -> fdo::Result<PlaybackRate> {
Ok(0.2)
}
async fn maximum_rate(&self) -> fdo::Result<PlaybackRate> {
Ok(3.0)
}
async fn can_go_next(&self) -> fdo::Result<bool> {
Ok(true)
}
async fn can_go_previous(&self) -> fdo::Result<bool> {
Ok(false)
}
async fn can_play(&self) -> fdo::Result<bool> {
Ok(true)
}
async fn can_pause(&self) -> fdo::Result<bool> {
Ok(true)
}
async fn can_seek(&self) -> fdo::Result<bool> {
Ok(false)
}
async fn can_control(&self) -> fdo::Result<bool> {
Ok(true)
}
}

View File

@ -11,24 +11,21 @@ use std::{
use crate::Args;
use super::Player;
use crossterm::{
cursor::{Hide, MoveTo, MoveToColumn, MoveUp, Show},
event::{
self, KeyCode, KeyModifiers, KeyboardEnhancementFlags, PopKeyboardEnhancementFlags,
PushKeyboardEnhancementFlags,
self, EventStream, KeyCode, KeyModifiers, KeyboardEnhancementFlags,
PopKeyboardEnhancementFlags, PushKeyboardEnhancementFlags,
},
style::{Print, Stylize},
terminal::{self, Clear, ClearType, EnterAlternateScreen, LeaveAlternateScreen},
};
use lazy_static::lazy_static;
use tokio::{
sync::mpsc::Sender,
task::{self},
time::sleep,
};
use super::Messages;
use futures::{FutureExt, StreamExt};
use lazy_static::lazy_static;
use tokio::{sync::mpsc::Sender, task, time::sleep};
use super::{Messages, Player};
mod components;
@ -53,7 +50,65 @@ lazy_static! {
static ref VOLUME_TIMER: AtomicUsize = AtomicUsize::new(0);
}
/// The code for the interface itself.
async fn input(sender: Sender<Messages>) -> eyre::Result<()> {
let mut reader = EventStream::new();
loop {
let Some(Ok(event::Event::Key(event))) = reader.next().fuse().await else {
continue;
};
let messages = match event.code {
// Arrow key volume controls.
KeyCode::Up => Messages::ChangeVolume(0.1),
KeyCode::Right => Messages::ChangeVolume(0.01),
KeyCode::Down => Messages::ChangeVolume(-0.1),
KeyCode::Left => Messages::ChangeVolume(-0.01),
KeyCode::Char(character) => match character.to_ascii_lowercase() {
// Ctrl+C
'c' if event.modifiers == KeyModifiers::CONTROL => Messages::Quit,
// Quit
'q' => Messages::Quit,
// Skip/Next
's' | 'n' => Messages::Next,
// Pause
'p' => Messages::PlayPause,
// Volume up & down
'+' | '=' => Messages::ChangeVolume(0.1),
'-' | '_' => Messages::ChangeVolume(-0.1),
_ => continue,
},
// Media keys
KeyCode::Media(media) => match media {
event::MediaKeyCode::Play => Messages::PlayPause,
event::MediaKeyCode::Pause => Messages::PlayPause,
event::MediaKeyCode::PlayPause => Messages::PlayPause,
event::MediaKeyCode::Stop => Messages::PlayPause,
event::MediaKeyCode::TrackNext => Messages::Next,
event::MediaKeyCode::LowerVolume => Messages::ChangeVolume(-0.1),
event::MediaKeyCode::RaiseVolume => Messages::ChangeVolume(0.1),
event::MediaKeyCode::MuteVolume => Messages::ChangeVolume(-1.0),
_ => continue,
},
_ => continue,
};
// If it's modifying the volume, then we'll set the `VOLUME_TIMER` to 1
// so that the UI thread will know that it should show the audio bar.
if let Messages::ChangeVolume(_) = messages {
VOLUME_TIMER.store(1, Ordering::Relaxed);
}
sender.send(messages).await?;
}
}
/// The code for the terminal interface itself.
///
/// `volume_timer` is a bit strange, but it tracks how long the `volume` bar
/// has been displayed for, so that it's only displayed for a certain amount of frames.
@ -105,93 +160,92 @@ async fn interface(player: Arc<Player>, minimalist: bool) -> eyre::Result<()> {
}
}
#[cfg(feature = "mpris")]
async fn mpris(
player: Arc<Player>,
sender: Sender<Messages>,
) -> mpris_server::Server<crate::player::mpris::Player> {
mpris_server::Server::new("lowfi", crate::player::mpris::Player { player, sender })
.await
.unwrap()
}
pub struct Environment {
enhancement: bool,
alternate: bool,
}
impl Environment {
pub fn ready(alternate: bool) -> eyre::Result<Self> {
crossterm::execute!(stdout(), Hide)?;
if alternate {
crossterm::execute!(stdout(), EnterAlternateScreen, MoveTo(0, 0))?;
}
terminal::enable_raw_mode()?;
let enhancement = terminal::supports_keyboard_enhancement()?;
if enhancement {
crossterm::execute!(
stdout(),
PushKeyboardEnhancementFlags(KeyboardEnhancementFlags::DISAMBIGUATE_ESCAPE_CODES)
)?;
}
Ok(Self {
enhancement,
alternate,
})
}
pub fn cleanup(&self) -> eyre::Result<()> {
if self.alternate {
crossterm::execute!(stdout(), LeaveAlternateScreen)?;
}
crossterm::execute!(stdout(), Clear(ClearType::FromCursorDown), Show)?;
if self.enhancement {
crossterm::execute!(stdout(), PopKeyboardEnhancementFlags)?;
}
terminal::disable_raw_mode()?;
eprintln!("bye! :)");
Ok(())
}
}
impl Drop for Environment {
fn drop(&mut self) {
// Well, we're dropping it, so it doesn't really matter if there's an error.
let _ = self.cleanup();
}
}
/// Initializes the UI, this will also start taking input from the user.
///
/// `alternate` controls whether to use [EnterAlternateScreen] in order to hide
/// previous terminal history.
pub async fn start(queue: Arc<Player>, sender: Sender<Messages>, args: Args) -> eyre::Result<()> {
crossterm::execute!(stdout(), Hide)?;
pub async fn start(player: Arc<Player>, sender: Sender<Messages>, args: Args) -> eyre::Result<()> {
let environment = Environment::ready(args.alternate)?;
if args.alternate {
crossterm::execute!(stdout(), EnterAlternateScreen, MoveTo(0, 0))?;
#[cfg(feature = "mpris")]
{
player
.mpris
.get_or_init(|| mpris(player.clone(), sender.clone()))
.await;
}
terminal::enable_raw_mode()?;
let enhancement = terminal::supports_keyboard_enhancement()?;
let interface = task::spawn(interface(Arc::clone(&player), args.minimalist));
if enhancement {
crossterm::execute!(
stdout(),
PushKeyboardEnhancementFlags(KeyboardEnhancementFlags::DISAMBIGUATE_ESCAPE_CODES)
)?;
}
input(sender.clone()).await?;
interface.abort();
task::spawn(interface(Arc::clone(&queue), args.minimalist));
loop {
let event::Event::Key(event) = event::read()? else {
continue;
};
let messages = match event.code {
// Arrow key volume controls.
KeyCode::Up => Messages::ChangeVolume(0.1),
KeyCode::Right => Messages::ChangeVolume(0.01),
KeyCode::Down => Messages::ChangeVolume(-0.1),
KeyCode::Left => Messages::ChangeVolume(-0.01),
KeyCode::Char(character) => match character.to_ascii_lowercase() {
// Ctrl+C
'c' if event.modifiers == KeyModifiers::CONTROL => break,
// Quit
'q' => break,
// Skip/Next
's' | 'n' if !queue.current.load().is_none() => Messages::Next,
// Pause
'p' => Messages::Pause,
// Volume up & down
'+' | '=' => Messages::ChangeVolume(0.1),
'-' | '_' => Messages::ChangeVolume(-0.1),
_ => continue,
},
// Media keys
KeyCode::Media(media) => match media {
event::MediaKeyCode::Play => Messages::Pause,
event::MediaKeyCode::Pause => Messages::Pause,
event::MediaKeyCode::PlayPause => Messages::Pause,
event::MediaKeyCode::Stop => Messages::Pause,
event::MediaKeyCode::TrackNext => Messages::Next,
event::MediaKeyCode::LowerVolume => Messages::ChangeVolume(-0.1),
event::MediaKeyCode::RaiseVolume => Messages::ChangeVolume(0.1),
event::MediaKeyCode::MuteVolume => Messages::ChangeVolume(-1.0),
_ => continue,
},
_ => continue,
};
// If it's modifying the volume, then we'll set the `VOLUME_TIMER` to 1
// so that the ui thread will know that it should show the audio bar.
if let Messages::ChangeVolume(_) = messages {
VOLUME_TIMER.store(1, Ordering::Relaxed);
}
sender.send(messages).await?;
}
if args.alternate {
crossterm::execute!(stdout(), LeaveAlternateScreen)?;
}
crossterm::execute!(stdout(), Clear(ClearType::FromCursorDown), Show)?;
if enhancement {
crossterm::execute!(stdout(), PopKeyboardEnhancementFlags)?;
}
terminal::disable_raw_mode()?;
environment.cleanup()?;
Ok(())
}

View File

@ -13,7 +13,7 @@ pub fn format_duration(duration: &Duration) -> String {
}
/// Creates the progress bar, as well as all the padding needed.
pub fn progress_bar(player: &Arc<Player>, width: usize) -> String {
pub fn progress_bar(player: &Player, width: usize) -> String {
let mut duration = Duration::new(0, 0);
let elapsed = player.sink.get_pos();
@ -80,7 +80,7 @@ impl ActionBar {
/// Creates the top/action bar, which has the name of the track and it's status.
/// This also creates all the needed padding.
pub fn action(player: &Arc<Player>, width: usize) -> String {
pub fn action(player: &Player, width: usize) -> String {
let (main, len) = player
.current
.load()