diff --git a/src/pipeline.rs b/src/pipeline.rs index b718640..1ec213a 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1,13 +1,13 @@ +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::num::NonZero; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + use iced_wgpu::primitive::Primitive; use iced_wgpu::wgpu; -use std::{ - collections::{btree_map::Entry, BTreeMap}, - num::NonZero, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, - }, -}; + +use crate::video::Frame; #[repr(C)] struct Uniforms { @@ -203,7 +203,8 @@ impl VideoPipeline { let instances = device.create_buffer(&wgpu::BufferDescriptor { label: Some("iced_video_player uniform buffer"), - size: 256 * std::mem::size_of::() as u64, // max 256 video players per frame + size: 256 * std::mem::size_of::() as u64, /* max 256 video players per + * frame */ usage: wgpu::BufferUsages::COPY_DST | wgpu::BufferUsages::UNIFORM, mapped_at_creation: false, }); @@ -383,7 +384,7 @@ impl VideoPipeline { pub(crate) struct VideoPrimitive { video_id: u64, alive: Arc, - frame: Arc>>, + frame: Arc>, size: (u32, u32), upload_frame: bool, } @@ -392,7 +393,7 @@ impl VideoPrimitive { pub fn new( video_id: u64, alive: Arc, - frame: Arc>>, + frame: Arc>, size: (u32, u32), upload_frame: bool, ) -> Self { @@ -423,21 +424,23 @@ impl Primitive for VideoPrimitive { let pipeline = storage.get_mut::().unwrap(); if self.upload_frame { - pipeline.upload( - device, - queue, - self.video_id, - &self.alive, - self.size, - self.frame.lock().expect("lock frame mutex").as_slice(), - ); + if let Some(buffer) = self.frame.lock().expect("lock frame mutex").readable() { + pipeline.upload( + device, + queue, + self.video_id, + &self.alive, + self.size, + buffer.as_slice(), + ); + } } pipeline.prepare( queue, self.video_id, - &(*bounds - * iced::Transformation::orthographic( + &(*bounds * + iced::Transformation::orthographic( viewport.logical_size().width as _, viewport.logical_size().height as _, )), diff --git a/src/video.rs b/src/video.rs index 1d07d60..bd5dbeb 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,14 +1,15 @@ -use crate::Error; -use gstreamer as gst; -use gstreamer_app as gst_app; -use gstreamer_app::prelude::*; -use iced::widget::image as img; use std::num::NonZeroU8; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; +use gstreamer_app::prelude::*; +use iced::widget::image as img; +use {gstreamer as gst, gstreamer_app as gst_app}; + +use crate::Error; + /// Position in the media. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Position { @@ -23,21 +24,34 @@ pub enum Position { impl From for gst::GenericFormattedValue { fn from(pos: Position) -> Self { match pos { - Position::Time(t) => gst::ClockTime::from_nseconds(t.as_nanos() as _).into(), - Position::Frame(f) => gst::format::Default::from_u64(f).into(), + | Position::Time(t) => gst::ClockTime::from_nseconds(t.as_nanos() as _).into(), + | Position::Frame(f) => gst::format::Default::from_u64(f).into(), } } } impl From for Position { - fn from(t: Duration) -> Self { - Position::Time(t) - } + fn from(t: Duration) -> Self { Position::Time(t) } } impl From for Position { - fn from(f: u64) -> Self { - Position::Frame(f) + fn from(f: u64) -> Self { Position::Frame(f) } +} + +#[derive(Debug)] +pub(crate) struct Frame(gst::Sample); +impl Frame { + pub fn new() -> Self { Self(gst::Sample::builder().build()) } + pub fn store(&mut self, sample: gst::Sample) -> Option<()> { + if sample.buffer().is_some() { + self.0 = sample; + Some(()) + } else { + None + } + } + pub fn readable(&self) -> Option> { + self.0.buffer().map(|x| x.map_readable().ok()).flatten() } } @@ -57,7 +71,7 @@ pub(crate) struct Internal { pub(crate) speed: f64, pub(crate) sync_av: bool, - pub(crate) frame: Arc>>, + pub(crate) frame: Arc>, pub(crate) upload_frame: Arc, pub(crate) last_frame_time: Arc>, pub(crate) looping: bool, @@ -76,32 +90,36 @@ impl Internal { // gstreamer complains if the start & end value types aren't the same match &position { - Position::Time(_) => self.source.seek( - self.speed, - gst::SeekFlags::FLUSH - | if accurate { - gst::SeekFlags::ACCURATE - } else { - gst::SeekFlags::empty() - }, - gst::SeekType::Set, - gst::GenericFormattedValue::from(position), - gst::SeekType::Set, - gst::ClockTime::NONE, - )?, - Position::Frame(_) => self.source.seek( - self.speed, - gst::SeekFlags::FLUSH - | if accurate { - gst::SeekFlags::ACCURATE - } else { - gst::SeekFlags::empty() - }, - gst::SeekType::Set, - gst::GenericFormattedValue::from(position), - gst::SeekType::Set, - gst::format::Default::NONE, - )?, + | Position::Time(_) => { + self.source.seek( + self.speed, + gst::SeekFlags::FLUSH | + if accurate { + gst::SeekFlags::ACCURATE + } else { + gst::SeekFlags::empty() + }, + gst::SeekType::Set, + gst::GenericFormattedValue::from(position), + gst::SeekType::Set, + gst::ClockTime::NONE, + )? + }, + | Position::Frame(_) => { + self.source.seek( + self.speed, + gst::SeekFlags::FLUSH | + if accurate { + gst::SeekFlags::ACCURATE + } else { + gst::SeekFlags::empty() + }, + gst::SeekType::Set, + gst::GenericFormattedValue::from(position), + gst::SeekType::Set, + gst::format::Default::NONE, + )? + }, }; Ok(()) @@ -150,7 +168,8 @@ impl Internal { }) .unwrap(/* state was changed in ctor; state errors caught there */); - // Set restart_stream flag to make the stream restart on the next Message::NextFrame + // Set restart_stream flag to make the stream restart on the next + // Message::NextFrame if self.is_eos && !paused { self.restart_stream = true; } @@ -160,12 +179,13 @@ impl Internal { self.source.state(gst::ClockTime::ZERO).1 == gst::State::Paused } - /// Syncs audio with video when there is (inevitably) latency presenting the frame. + /// Syncs audio with video when there is (inevitably) latency presenting the + /// frame. pub(crate) fn set_av_offset(&mut self, offset: Duration) { if self.sync_av { self.sync_av_counter += 1; - self.sync_av_avg = self.sync_av_avg * (self.sync_av_counter - 1) / self.sync_av_counter - + offset.as_nanos() as u64 / self.sync_av_counter; + self.sync_av_avg = self.sync_av_avg * (self.sync_av_counter - 1) / self.sync_av_counter + + offset.as_nanos() as u64 / self.sync_av_counter; if self.sync_av_counter % 128 == 0 { self.source .set_property("av-offset", -(self.sync_av_avg as i64)); @@ -174,7 +194,8 @@ impl Internal { } } -/// A multimedia video loaded from a URI (e.g., a local file path or HTTP stream). +/// A multimedia video loaded from a URI (e.g., a local file path or HTTP +/// stream). #[derive(Debug)] pub struct Video(pub(crate) RwLock); @@ -191,8 +212,8 @@ impl Drop for Video { if let Some(worker) = inner.worker.take() { if let Err(err) = worker.join() { match err.downcast_ref::() { - Some(e) => log::error!("Video thread panicked: {e}"), - None => log::error!("Video thread panicked with unknown reason"), + | Some(e) => log::error!("Video thread panicked: {e}"), + | None => log::error!("Video thread panicked with unknown reason"), } } } @@ -205,7 +226,12 @@ impl Video { pub fn new(uri: &url::Url) -> Result { gst::init()?; - let pipeline = format!("playbin uri=\"{}\" text-sink=\"appsink name=iced_text sync=true caps=text/x-raw\" video-sink=\"videoscale ! videoconvert ! appsink name=iced_video drop=true caps=video/x-raw,format=NV12,pixel-aspect-ratio=1/1\"", uri.as_str()); + let pipeline = format!( + "playbin uri=\"{}\" text-sink=\"appsink name=iced_text sync=true caps=text/x-raw\" \ + video-sink=\"videoscale ! videoconvert ! appsink name=iced_video drop=true \ + caps=video/x-raw,format=NV12,pixel-aspect-ratio=1/1\"", + uri.as_str() + ); let pipeline = gst::parse::launch(pipeline.as_ref())? .downcast::() .map_err(|_| Error::Cast)?; @@ -222,7 +248,7 @@ impl Video { let video_sink = video_sink.downcast::().unwrap(); let text_sink: gst::Element = pipeline.property("text-sink"); - //let pad = text_sink.pads().get(0).cloned().unwrap(); + // let pad = text_sink.pads().get(0).cloned().unwrap(); let text_sink = text_sink.downcast::().unwrap(); Self::from_gst_pipeline(pipeline, video_sink, Some(text_sink)) @@ -274,10 +300,10 @@ impl Video { let framerate = cleanup!(s.get::("framerate").map_err(|_| Error::Caps))?; let framerate = framerate.numer() as f64 / framerate.denom() as f64; - if framerate.is_nan() - || framerate.is_infinite() - || framerate < 0.0 - || framerate.abs() < f64::EPSILON + if framerate.is_nan() || + framerate.is_infinite() || + framerate < 0.0 || + framerate.abs() < f64::EPSILON { let _ = pipeline.set_state(gst::State::Null); return Err(Error::Framerate(framerate)); @@ -293,11 +319,7 @@ impl Video { let sync_av = pipeline.has_property("av-offset", None); // NV12 = 12bpp - let frame = Arc::new(Mutex::new(vec![ - 0u8; - (width as usize * height as usize * 3) - .div_ceil(2) - ])); + let frame = Arc::new(Mutex::new(Frame::new())); let upload_frame = Arc::new(AtomicBool::new(false)); let alive = Arc::new(AtomicBool::new(true)); let last_frame_time = Arc::new(Mutex::new(Instant::now())); @@ -336,12 +358,11 @@ impl Video { let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let pts = buffer.pts().unwrap_or_default(); - let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - - let mut frame = frame_ref.lock().map_err(|_| gst::FlowError::Error)?; - let frame_len = frame.len(); - frame.copy_from_slice(&map.as_slice()[..frame_len]); - + { + let mut frame_guard = + frame_ref.lock().map_err(|_| gst::FlowError::Error)?; + *frame_guard = Frame(sample); + } upload_frame_ref.swap(true, Ordering::SeqCst); if let Some(at) = clear_subtitles_at { @@ -412,9 +433,7 @@ impl Video { }))) } - pub(crate) fn read(&self) -> impl Deref + '_ { - self.0.read().expect("lock") - } + pub(crate) fn read(&self) -> impl Deref + '_ { self.0.read().expect("lock") } pub(crate) fn write(&self) -> impl DerefMut + '_ { self.0.write().expect("lock") @@ -425,81 +444,60 @@ impl Video { } /// Get the size/resolution of the video as `(width, height)`. - pub fn size(&self) -> (i32, i32) { - (self.read().width, self.read().height) - } + pub fn size(&self) -> (i32, i32) { (self.read().width, self.read().height) } /// Get the framerate of the video as frames per second. - pub fn framerate(&self) -> f64 { - self.read().framerate - } + pub fn framerate(&self) -> f64 { self.read().framerate } /// Set the volume multiplier of the audio. /// `0.0` = 0% volume, `1.0` = 100% volume. /// - /// This uses a linear scale, for example `0.5` is perceived as half as loud. + /// This uses a linear scale, for example `0.5` is perceived as half as + /// loud. pub fn set_volume(&mut self, volume: f64) { self.get_mut().source.set_property("volume", volume); - self.set_muted(self.muted()); // for some reason gstreamer unmutes when changing volume? + self.set_muted(self.muted()); // for some reason gstreamer unmutes when + // changing volume? } /// Get the volume multiplier of the audio. - pub fn volume(&self) -> f64 { - self.read().source.property("volume") - } + pub fn volume(&self) -> f64 { self.read().source.property("volume") } /// Set if the audio is muted or not, without changing the volume. - pub fn set_muted(&mut self, muted: bool) { - self.get_mut().source.set_property("mute", muted); - } + pub fn set_muted(&mut self, muted: bool) { self.get_mut().source.set_property("mute", muted); } /// Get if the audio is muted or not. - pub fn muted(&self) -> bool { - self.read().source.property("mute") - } + pub fn muted(&self) -> bool { self.read().source.property("mute") } /// Get if the stream ended or not. - pub fn eos(&self) -> bool { - self.read().is_eos - } + pub fn eos(&self) -> bool { self.read().is_eos } /// Get if the media will loop or not. - pub fn looping(&self) -> bool { - self.read().looping - } + pub fn looping(&self) -> bool { self.read().looping } /// Set if the media will loop or not. - pub fn set_looping(&mut self, looping: bool) { - self.get_mut().looping = looping; - } + pub fn set_looping(&mut self, looping: bool) { self.get_mut().looping = looping; } /// Set if the media is paused or not. - pub fn set_paused(&mut self, paused: bool) { - self.get_mut().set_paused(paused) - } + pub fn set_paused(&mut self, paused: bool) { self.get_mut().set_paused(paused) } /// Get if the media is paused or not. - pub fn paused(&self) -> bool { - self.read().paused() - } + pub fn paused(&self) -> bool { self.read().paused() } /// Jumps to a specific position in the media. - /// Passing `true` to the `accurate` parameter will result in more accurate seeking, - /// however, it is also slower. For most seeks (e.g., scrubbing) this is not needed. + /// Passing `true` to the `accurate` parameter will result in more accurate + /// seeking, however, it is also slower. For most seeks (e.g., + /// scrubbing) this is not needed. pub fn seek(&mut self, position: impl Into, accurate: bool) -> Result<(), Error> { self.get_mut().seek(position, accurate) } /// Set the playback speed of the media. /// The default speed is `1.0`. - pub fn set_speed(&mut self, speed: f64) -> Result<(), Error> { - self.get_mut().set_speed(speed) - } + pub fn set_speed(&mut self, speed: f64) -> Result<(), Error> { self.get_mut().set_speed(speed) } /// Get the current playback speed. - pub fn speed(&self) -> f64 { - self.read().speed - } + pub fn speed(&self) -> f64 { self.read().speed } /// Get the current playback position in time. pub fn position(&self) -> Duration { @@ -512,14 +510,11 @@ impl Video { } /// Get the media duration. - pub fn duration(&self) -> Duration { - self.read().duration - } + pub fn duration(&self) -> Duration { self.read().duration } - /// Restarts a stream; seeks to the first frame and unpauses, sets the `eos` flag to false. - pub fn restart_stream(&mut self) -> Result<(), Error> { - self.get_mut().restart_stream() - } + /// Restarts a stream; seeks to the first frame and unpauses, sets the `eos` + /// flag to false. + pub fn restart_stream(&mut self) -> Result<(), Error> { self.get_mut().restart_stream() } /// Set the subtitle URL to display. pub fn set_subtitle_url(&mut self, url: &url::Url) -> Result<(), Error> { @@ -537,14 +532,14 @@ impl Video { } /// Get the underlying GStreamer pipeline. - pub fn pipeline(&self) -> gst::Pipeline { - self.read().source.clone() - } + pub fn pipeline(&self) -> gst::Pipeline { self.read().source.clone() } - /// Generates a list of thumbnails based on a set of positions in the media, downscaled by a given factor. + /// Generates a list of thumbnails based on a set of positions in the media, + /// downscaled by a given factor. /// /// Slow; only needs to be called once for each instance. - /// It's best to call this at the very start of playback, otherwise the position may shift. + /// It's best to call this at the very start of playback, otherwise the + /// position may shift. pub fn thumbnails( &mut self, positions: I, @@ -574,15 +569,12 @@ impl Video { while !inner.upload_frame.load(Ordering::SeqCst) { std::hint::spin_loop(); } + let frame = inner.frame.lock().map_err(|_| Error::Lock)?; + let frame = frame.readable().ok_or(Error::Lock)?; Ok(img::Handle::from_rgba( inner.width as u32 / downscale, inner.height as u32 / downscale, - yuv_to_rgba( - &inner.frame.lock().map_err(|_| Error::Lock)?, - width as _, - height as _, - downscale, - ), + yuv_to_rgba(frame.as_slice(), width as _, height as _, downscale), )) }) .collect() @@ -618,7 +610,7 @@ fn yuv_to_rgba(yuv: &[u8], width: u32, height: u32, downscale: u32) -> Vec { rgba.push(r as u8); rgba.push(g as u8); rgba.push(b as u8); - rgba.push(0xFF); + rgba.push(0xff); } }