chore: restructure and clean up

This commit is contained in:
Tal 2025-07-02 19:36:53 +02:00
parent b6a81c9634
commit 6fadfe6304
9 changed files with 194 additions and 166 deletions

View File

@ -119,14 +119,7 @@ pub async fn play(args: Args) -> eyre::Result<()> {
tx.send(Messages::Init).await?;
// Actually starts the player.
Player::play(
Arc::clone(&player),
tx.clone(),
rx,
args.buffer_size,
args.debug,
)
.await?;
Player::play(Arc::clone(&player), tx.clone(), rx, args.debug).await?;
// Save the volume.txt file for the next session.
PersistentVolume::save(player.sink.volume()).await?;

View File

@ -22,7 +22,6 @@ use tokio::{
RwLock,
},
task,
time::sleep,
};
#[cfg(feature = "mpris")]
@ -31,11 +30,14 @@ use mpris_server::{PlaybackStatus, PlayerInterface, Property};
use crate::{
messages::Messages,
play::{PersistentVolume, SendableOutputStream},
tracks::{self, bookmark, list::List},
tracks::{self, list::List},
Args,
};
pub mod audio;
pub mod bookmark;
pub mod downloader;
pub mod queue;
pub mod ui;
#[cfg(feature = "mpris")]
@ -55,6 +57,9 @@ pub struct Player {
/// [rodio]'s [`Sink`] which can control playback.
pub sink: Sink,
/// The internal buffer size.
pub buffer_size: usize,
/// Whether the current track has been bookmarked.
bookmarked: AtomicBool,
@ -85,46 +90,6 @@ pub struct Player {
}
impl Player {
/// This gets the output stream while also shutting up alsa with [libc].
/// Uses raw libc calls, and therefore is functional only on Linux.
#[cfg(target_os = "linux")]
fn silent_get_output_stream() -> eyre::Result<(OutputStream, OutputStreamHandle)> {
use libc::freopen;
use std::ffi::CString;
// Get the file descriptor to stderr from libc.
extern "C" {
static stderr: *mut libc::FILE;
}
// This is a bit of an ugly hack that basically just uses `libc` to redirect alsa's
// output to `/dev/null` so that it wont be shoved down our throats.
// The mode which to redirect terminal output with.
let mode = CString::new("w")?;
// First redirect to /dev/null, which basically silences alsa.
let null = CString::new("/dev/null")?;
// SAFETY: Simple enough to be impossible to fail. Hopefully.
unsafe {
freopen(null.as_ptr(), mode.as_ptr(), stderr);
}
// Make the OutputStream while stderr is still redirected to /dev/null.
let (stream, handle) = OutputStream::try_default()?;
// Redirect back to the current terminal, so that other output isn't silenced.
let tty = CString::new("/dev/tty")?;
// SAFETY: See the first call to `freopen`.
unsafe {
freopen(tty.as_ptr(), mode.as_ptr(), stderr);
}
Ok((stream, handle))
}
/// Just a shorthand for setting `current`.
fn set_current(&self, info: tracks::Info) {
self.current.store(Some(Arc::new(info)));
@ -153,7 +118,7 @@ impl Player {
// We should only shut up alsa forcefully on Linux if we really have to.
#[cfg(target_os = "linux")]
let (stream, handle) = if !args.alternate && !args.debug {
Self::silent_get_output_stream()?
audio::silent_get_output_stream()?
} else {
OutputStream::try_default()?
};
@ -178,6 +143,7 @@ impl Player {
let player = Self {
tracks: RwLock::new(VecDeque::with_capacity(args.buffer_size)),
buffer_size: args.buffer_size,
current: ArcSwapOption::new(None),
client,
sink,
@ -190,80 +156,6 @@ impl Player {
Ok((player, SendableOutputStream(stream)))
}
/// This will play the next track, as well as refilling the buffer in the background.
///
/// This will also set `current` to the newly loaded song.
pub async fn next(&self) -> Result<tracks::Decoded, tracks::TrackError> {
// TODO: Consider replacing this with `unwrap_or_else` when async closures are stablized.
let track = self.tracks.write().await.pop_front();
let track = if let Some(track) = track {
track
} else {
// If the queue is completely empty, then fallback to simply getting a new track.
// This is relevant particularly at the first song.
// Serves as an indicator that the queue is "loading".
// We're doing it here so that we don't get the "loading" display
// for only a frame in the other case that the buffer is not empty.
self.current.store(None);
self.list.random(&self.client).await?
};
let decoded = track.decode()?;
// Set the current track.
self.set_current(decoded.info.clone());
Ok(decoded)
}
/// This basically just calls [`Player::next`], and then appends the new track to the player.
///
/// This also notifies the background thread to get to work, and will send `TryAgain`
/// if it fails. This functions purpose is to be called in the background, so that
/// when the audio server recieves a `Next` signal it will still be able to respond to other
/// signals while it's loading.
///
/// This also sends the `NewSong` signal to `tx` apon successful completion.
async fn handle_next(
player: Arc<Self>,
itx: Sender<()>,
tx: Sender<Messages>,
debug: bool,
) -> eyre::Result<()> {
// Stop the sink.
player.sink.stop();
let track = player.next().await;
match track {
Ok(track) => {
// Start playing the new track.
player.sink.append(track.data);
// Notify the background downloader that there's an empty spot
// in the buffer.
Downloader::notify(&itx).await?;
// Notify the audio server that the next song has actually been downloaded.
tx.send(Messages::NewSong).await?;
}
Err(error) => {
if !error.is_timeout() {
if debug {
panic!("{:?}", error)
}
sleep(TIMEOUT).await;
}
tx.send(Messages::TryAgain).await?;
}
};
Ok(())
}
/// This is the main "audio server".
///
/// `rx` & `tx` are used to communicate with it, for example when to
@ -275,7 +167,6 @@ impl Player {
player: Arc<Self>,
tx: Sender<Messages>,
mut rx: Receiver<Messages>,
buf_size: usize,
debug: bool,
) -> eyre::Result<()> {
// Initialize the mpris player.
@ -292,7 +183,7 @@ impl Player {
})?;
// `itx` is used to notify the `Downloader` when it needs to download new tracks.
let downloader = Downloader::new(Arc::clone(&player), buf_size);
let downloader = Downloader::new(Arc::clone(&player));
let (itx, downloader) = downloader.start(debug);
// Start buffering tracks immediately.
@ -345,7 +236,7 @@ impl Player {
// Handle the rest of the signal in the background,
// as to not block the main audio server thread.
task::spawn(Self::handle_next(
task::spawn(Self::next(
Arc::clone(&player),
itx.clone(),
tx.clone(),

42
src/player/audio.rs Normal file
View File

@ -0,0 +1,42 @@
#[cfg(target_os = "linux")]
use rodio::{OutputStream, OutputStreamHandle};
/// This gets the output stream while also shutting up alsa with [libc].
/// Uses raw libc calls, and therefore is functional only on Linux.
#[cfg(target_os = "linux")]
pub fn silent_get_output_stream() -> eyre::Result<(OutputStream, OutputStreamHandle)> {
use libc::freopen;
use std::ffi::CString;
// Get the file descriptor to stderr from libc.
extern "C" {
static stderr: *mut libc::FILE;
}
// This is a bit of an ugly hack that basically just uses `libc` to redirect alsa's
// output to `/dev/null` so that it wont be shoved down our throats.
// The mode which to redirect terminal output with.
let mode = CString::new("w")?;
// First redirect to /dev/null, which basically silences alsa.
let null = CString::new("/dev/null")?;
// SAFETY: Simple enough to be impossible to fail. Hopefully.
unsafe {
freopen(null.as_ptr(), mode.as_ptr(), stderr);
}
// Make the OutputStream while stderr is still redirected to /dev/null.
let (stream, handle) = OutputStream::try_default()?;
// Redirect back to the current terminal, so that other output isn't silenced.
let tty = CString::new("/dev/tty")?;
// SAFETY: See the first call to `freopen`.
unsafe {
freopen(tty.as_ptr(), mode.as_ptr(), stderr);
}
Ok((stream, handle))
}

View File

@ -24,9 +24,6 @@ pub struct Downloader {
/// A copy of the internal sender, which can be useful for keeping
/// track of it.
tx: Sender<()>,
/// The size of the internal download buffer.
buf_size: usize,
}
impl Downloader {
@ -40,40 +37,41 @@ impl Downloader {
///
/// This also sends a [`Sender`] which can be used to notify
/// when the downloader needs to begin downloading more tracks.
pub fn new(player: Arc<Player>, buf_size: usize) -> Self {
pub fn new(player: Arc<Player>) -> Self {
let (tx, rx) = mpsc::channel(8);
Self {
player,
rx,
tx,
buf_size,
Self { player, rx, tx }
}
/// Push a new, random track onto the internal buffer.
pub async fn push_buffer(&self, debug: bool) {
let data = self.player.list.random(&self.player.client).await;
match data {
Ok(track) => self.player.tracks.write().await.push_back(track),
Err(error) if !error.is_timeout() => {
if debug {
panic!("{}", error)
}
sleep(TIMEOUT).await;
}
_ => {}
}
}
/// Actually starts & consumes the [Downloader].
pub fn start(mut self, debug: bool) -> (Sender<()>, JoinHandle<()>) {
(
self.tx,
task::spawn(async move {
// Loop through each update notification.
while self.rx.recv().await == Some(()) {
// For each update notification, we'll push tracks until the buffer is completely full.
while self.player.tracks.read().await.len() < self.buf_size {
let data = self.player.list.random(&self.player.client).await;
match data {
Ok(track) => self.player.tracks.write().await.push_back(track),
Err(error) if !error.is_timeout() => {
if debug {
panic!("{}", error)
}
let tx = self.tx.clone();
sleep(TIMEOUT).await;
}
_ => {}
}
}
let handle = task::spawn(async move {
// Loop through each update notification.
while self.rx.recv().await == Some(()) {
// For each update notification, we'll push tracks until the buffer is completely full.
while self.player.tracks.read().await.len() < self.player.buffer_size {
self.push_buffer(debug).await;
}
}),
)
}
});
return (tx, handle);
}
}

81
src/player/queue.rs Normal file
View File

@ -0,0 +1,81 @@
use std::sync::Arc;
use tokio::{sync::mpsc::Sender, time::sleep};
use crate::{
messages::Messages,
player::{downloader::Downloader, Player, TIMEOUT},
tracks,
};
impl Player {
/// Fetches the next track from the queue, or a random track if the queue is empty.
/// This will also set the current track to the fetched track's info.
async fn fetch(&self) -> Result<tracks::Decoded, tracks::TrackError> {
// TODO: Consider replacing this with `unwrap_or_else` when async closures are stablized.
let track = self.tracks.write().await.pop_front();
let track = if let Some(track) = track {
track
} else {
// If the queue is completely empty, then fallback to simply getting a new track.
// This is relevant particularly at the first song.
// Serves as an indicator that the queue is "loading".
// We're doing it here so that we don't get the "loading" display
// for only a frame in the other case that the buffer is not empty.
self.current.store(None);
self.list.random(&self.client).await?
};
let decoded = track.decode()?;
// Set the current track.
self.set_current(decoded.info.clone());
Ok(decoded)
}
/// Gets, decodes, and plays the next track in the queue while also handling the downloader.
///
/// This functions purpose is to be called in the background, so that when the audio server recieves a
/// `Next` signal it will still be able to respond to other signals while it's loading.
///
/// This also sends the either a `NewSong` or `TryAgain` signal to `tx`.
pub async fn next(
player: Arc<Self>,
itx: Sender<()>,
tx: Sender<Messages>,
debug: bool,
) -> eyre::Result<()> {
// Stop the sink.
player.sink.stop();
let track = player.fetch().await;
match track {
Ok(track) => {
// Start playing the new track.
player.sink.append(track.data);
// Notify the background downloader that there's an empty spot
// in the buffer.
Downloader::notify(&itx).await?;
// Notify the audio server that the next song has actually been downloaded.
tx.send(Messages::NewSong).await?;
}
Err(error) => {
if !error.is_timeout() {
if debug {
panic!("{:?}", error)
}
sleep(TIMEOUT).await;
}
tx.send(Messages::TryAgain).await?;
}
};
Ok(())
}
}

View File

@ -28,6 +28,7 @@ use crossterm::{
};
use lazy_static::lazy_static;
use thiserror::Error;
use tokio::{sync::mpsc::Sender, task, time::sleep};
use unicode_segmentation::UnicodeSegmentation;
@ -36,6 +37,20 @@ use super::{Messages, Player};
mod components;
mod input;
/// The error type for the UI, which is used to handle errors that occur
/// while drawing the UI or handling input.
#[derive(Debug, Error)]
pub enum UIError {
#[error("unable to convert number")]
Conversion(#[from] std::num::TryFromIntError),
#[error("unable to write output")]
Write(#[from] std::io::Error),
#[error("sending message to backend failed")]
Communication(#[from] tokio::sync::mpsc::error::SendError<Messages>),
}
/// How long the audio bar will be visible for when audio is adjusted.
/// This is in frames.
const AUDIO_BAR_DURATION: usize = 10;
@ -100,7 +115,7 @@ impl Window {
}
/// Actually draws the window, with each element in `content` being on a new line.
pub fn draw(&mut self, content: Vec<String>, space: bool) -> eyre::Result<()> {
pub fn draw(&mut self, content: Vec<String>, space: bool) -> eyre::Result<(), UIError> {
let len: u16 = content.len().try_into()?;
// Note that this will have a trailing newline, which we use later.
@ -151,7 +166,7 @@ async fn interface(
borderless: bool,
fps: u8,
width: usize,
) -> eyre::Result<()> {
) -> eyre::Result<(), UIError> {
let mut window = Window::new(width, borderless);
loop {
@ -207,7 +222,7 @@ pub struct Environment {
impl Environment {
/// This prepares the terminal, returning an [Environment] helpful
/// for cleaning up afterwards.
pub fn ready(alternate: bool) -> eyre::Result<Self> {
pub fn ready(alternate: bool) -> eyre::Result<Self, UIError> {
let mut lock = stdout().lock();
crossterm::execute!(lock, Hide)?;
@ -234,7 +249,7 @@ impl Environment {
/// Uses the information collected from initialization to safely close down
/// the terminal & restore it to it's previous state.
pub fn cleanup(&self) -> eyre::Result<()> {
pub fn cleanup(&self) -> eyre::Result<(), UIError> {
let mut lock = stdout().lock();
if self.alternate {
@ -267,7 +282,11 @@ impl Drop for Environment {
///
/// `alternate` controls whether to use [`EnterAlternateScreen`] in order to hide
/// previous terminal history.
pub async fn start(player: Arc<Player>, sender: Sender<Messages>, args: Args) -> eyre::Result<()> {
pub async fn start(
player: Arc<Player>,
sender: Sender<Messages>,
args: Args,
) -> eyre::Result<(), UIError> {
let environment = Environment::ready(args.alternate)?;
let interface = task::spawn(interface(
Arc::clone(&player),

View File

@ -5,10 +5,13 @@ use crossterm::event::{self, EventStream, KeyCode, KeyEventKind, KeyModifiers};
use futures::{FutureExt as _, StreamExt as _};
use tokio::sync::mpsc::Sender;
use crate::player::{ui, Messages};
use crate::player::{
ui::{self, UIError},
Messages,
};
/// Starts the listener to recieve input from the terminal for various events.
pub async fn listen(sender: Sender<Messages>) -> eyre::Result<()> {
pub async fn listen(sender: Sender<Messages>) -> eyre::Result<(), UIError> {
let mut reader = EventStream::new();
loop {

View File

@ -25,9 +25,10 @@ use tokio::io;
use unicode_segmentation::UnicodeSegmentation;
use url::form_urlencoded;
pub mod bookmark;
pub mod list;
/// The error type for the track system, which is used to handle errors that occur
/// while downloading, decoding, or playing tracks.
#[derive(Debug, Error)]
pub enum TrackError {
#[error("timeout")]