From e0d13792e28610c1dae49e565d94947a553bf88a Mon Sep 17 00:00:00 2001 From: talwat <83217276+talwat@users.noreply.github.com> Date: Wed, 25 Sep 2024 14:39:26 +0200 Subject: [PATCH] feat: improve inevitable communication between frontend & backend --- src/main.rs | 2 -- src/player.rs | 58 +++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/main.rs b/src/main.rs index eee1a4e..8e257b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,3 @@ -use std::process::exit; - use clap::{Parser, Subcommand}; mod scrape; diff --git a/src/player.rs b/src/player.rs index 5f895e6..ef2171e 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,10 +1,12 @@ -use std::{collections::VecDeque, sync::Arc, time::Duration}; +use std::{collections::VecDeque, sync::Arc}; use reqwest::Client; -use rodio::{source::SineWave, Decoder, OutputStream, OutputStreamHandle, Sink, Source}; +use rodio::{Decoder, OutputStream, Sink}; use tokio::{ - sync::RwLock, - task, time::sleep, + select, sync::{ + mpsc::{self, Receiver}, + RwLock, + }, task }; /// The amount of songs to buffer up. @@ -12,6 +14,11 @@ const BUFFER_SIZE: usize = 5; use crate::tracks::Track; +/// Handles communication between the frontend & audio player. +pub enum Messages { + Skip, +} + /// Main struct responsible for queuing up tracks. /// /// Internally tracks are stored in an [Arc], @@ -21,9 +28,6 @@ pub struct Queue { tracks: Arc>>, } -unsafe impl Send for Queue {} -unsafe impl Sync for Queue {} - impl Queue { pub async fn new() -> Self { Self { @@ -57,28 +61,48 @@ impl Queue { Ok(track) } - pub async fn play(self, sink: Sink) -> eyre::Result<()> { - let client = Client::builder().build()?; + /// This is the main "audio server". + /// + /// `rx` is used to communicate with it, for example when to + /// skip tracks or pause. + pub async fn play( + self, + sink: Sink, + client: Client, + mut rx: Receiver + ) -> eyre::Result<()> { let sink = Arc::new(sink); loop { - sink.stop(); + let clone = sink.clone(); + let msg = select! { + Some(x) = rx.recv() => x, - let track = self.next(&client).await?; - sink.append(Decoder::new(track.data)?); + // This future will finish only at the end of the current track. + Ok(()) = task::spawn_blocking(move || clone.sleep_until_end()) => Messages::Skip, + }; - let sink = sink.clone(); - task::spawn_blocking(move || sink.sleep_until_end()).await?; + match msg { + Messages::Skip => { + sink.stop(); + + let track = self.next(&client).await?; + sink.append(Decoder::new(track.data)?); + } + } } } } pub async fn play() -> eyre::Result<()> { let queue = Queue::new().await; - let (stream, handle) = OutputStream::try_default()?; + let (tx, rx) = mpsc::channel(8); + let (_stream, handle) = OutputStream::try_default()?; let sink = Sink::try_new(&handle)?; + let client = Client::builder().build()?; - let audio = task::spawn(queue.clone().play(sink)); + let audio = task::spawn(queue.clone().play(sink, client.clone(), rx)); + tx.send(Messages::Skip).await?; // This is responsible for the initial track being played. crossterm::terminal::enable_raw_mode()?; @@ -88,6 +112,8 @@ pub async fn play() -> eyre::Result<()> { crossterm::event::KeyCode::Char(x) => { if x == 'q' { break 'a; + } else if x == 's' { + tx.send(Messages::Skip).await?; } } _ => (),