diff --git a/src/pipeline.rs b/src/pipeline.rs index 1ec213a..b718640 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1,13 +1,13 @@ -use std::collections::btree_map::Entry; -use std::collections::BTreeMap; -use std::num::NonZero; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; - use iced_wgpu::primitive::Primitive; use iced_wgpu::wgpu; - -use crate::video::Frame; +use std::{ + collections::{btree_map::Entry, BTreeMap}, + num::NonZero, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, +}; #[repr(C)] struct Uniforms { @@ -203,8 +203,7 @@ impl VideoPipeline { let instances = device.create_buffer(&wgpu::BufferDescriptor { label: Some("iced_video_player uniform buffer"), - size: 256 * std::mem::size_of::() as u64, /* max 256 video players per - * frame */ + size: 256 * std::mem::size_of::() as u64, // max 256 video players per frame usage: wgpu::BufferUsages::COPY_DST | wgpu::BufferUsages::UNIFORM, mapped_at_creation: false, }); @@ -384,7 +383,7 @@ impl VideoPipeline { pub(crate) struct VideoPrimitive { video_id: u64, alive: Arc, - frame: Arc>, + frame: Arc>>, size: (u32, u32), upload_frame: bool, } @@ -393,7 +392,7 @@ impl VideoPrimitive { pub fn new( video_id: u64, alive: Arc, - frame: Arc>, + frame: Arc>>, size: (u32, u32), upload_frame: bool, ) -> Self { @@ -424,23 +423,21 @@ impl Primitive for VideoPrimitive { let pipeline = storage.get_mut::().unwrap(); if self.upload_frame { - if let Some(buffer) = self.frame.lock().expect("lock frame mutex").readable() { - pipeline.upload( - device, - queue, - self.video_id, - &self.alive, - self.size, - buffer.as_slice(), - ); - } + pipeline.upload( + device, + queue, + self.video_id, + &self.alive, + self.size, + self.frame.lock().expect("lock frame mutex").as_slice(), + ); } pipeline.prepare( queue, self.video_id, - &(*bounds * - iced::Transformation::orthographic( + &(*bounds + * iced::Transformation::orthographic( viewport.logical_size().width as _, viewport.logical_size().height as _, )), diff --git a/src/video.rs b/src/video.rs index bd5dbeb..1d07d60 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,15 +1,14 @@ +use crate::Error; +use gstreamer as gst; +use gstreamer_app as gst_app; +use gstreamer_app::prelude::*; +use iced::widget::image as img; use std::num::NonZeroU8; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; -use gstreamer_app::prelude::*; -use iced::widget::image as img; -use {gstreamer as gst, gstreamer_app as gst_app}; - -use crate::Error; - /// Position in the media. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Position { @@ -24,34 +23,21 @@ pub enum Position { impl From for gst::GenericFormattedValue { fn from(pos: Position) -> Self { match pos { - | Position::Time(t) => gst::ClockTime::from_nseconds(t.as_nanos() as _).into(), - | Position::Frame(f) => gst::format::Default::from_u64(f).into(), + Position::Time(t) => gst::ClockTime::from_nseconds(t.as_nanos() as _).into(), + Position::Frame(f) => gst::format::Default::from_u64(f).into(), } } } impl From for Position { - fn from(t: Duration) -> Self { Position::Time(t) } + fn from(t: Duration) -> Self { + Position::Time(t) + } } impl From for Position { - fn from(f: u64) -> Self { Position::Frame(f) } -} - -#[derive(Debug)] -pub(crate) struct Frame(gst::Sample); -impl Frame { - pub fn new() -> Self { Self(gst::Sample::builder().build()) } - pub fn store(&mut self, sample: gst::Sample) -> Option<()> { - if sample.buffer().is_some() { - self.0 = sample; - Some(()) - } else { - None - } - } - pub fn readable(&self) -> Option> { - self.0.buffer().map(|x| x.map_readable().ok()).flatten() + fn from(f: u64) -> Self { + Position::Frame(f) } } @@ -71,7 +57,7 @@ pub(crate) struct Internal { pub(crate) speed: f64, pub(crate) sync_av: bool, - pub(crate) frame: Arc>, + pub(crate) frame: Arc>>, pub(crate) upload_frame: Arc, pub(crate) last_frame_time: Arc>, pub(crate) looping: bool, @@ -90,36 +76,32 @@ impl Internal { // gstreamer complains if the start & end value types aren't the same match &position { - | Position::Time(_) => { - self.source.seek( - self.speed, - gst::SeekFlags::FLUSH | - if accurate { - gst::SeekFlags::ACCURATE - } else { - gst::SeekFlags::empty() - }, - gst::SeekType::Set, - gst::GenericFormattedValue::from(position), - gst::SeekType::Set, - gst::ClockTime::NONE, - )? - }, - | Position::Frame(_) => { - self.source.seek( - self.speed, - gst::SeekFlags::FLUSH | - if accurate { - gst::SeekFlags::ACCURATE - } else { - gst::SeekFlags::empty() - }, - gst::SeekType::Set, - gst::GenericFormattedValue::from(position), - gst::SeekType::Set, - gst::format::Default::NONE, - )? - }, + Position::Time(_) => self.source.seek( + self.speed, + gst::SeekFlags::FLUSH + | if accurate { + gst::SeekFlags::ACCURATE + } else { + gst::SeekFlags::empty() + }, + gst::SeekType::Set, + gst::GenericFormattedValue::from(position), + gst::SeekType::Set, + gst::ClockTime::NONE, + )?, + Position::Frame(_) => self.source.seek( + self.speed, + gst::SeekFlags::FLUSH + | if accurate { + gst::SeekFlags::ACCURATE + } else { + gst::SeekFlags::empty() + }, + gst::SeekType::Set, + gst::GenericFormattedValue::from(position), + gst::SeekType::Set, + gst::format::Default::NONE, + )?, }; Ok(()) @@ -168,8 +150,7 @@ impl Internal { }) .unwrap(/* state was changed in ctor; state errors caught there */); - // Set restart_stream flag to make the stream restart on the next - // Message::NextFrame + // Set restart_stream flag to make the stream restart on the next Message::NextFrame if self.is_eos && !paused { self.restart_stream = true; } @@ -179,13 +160,12 @@ impl Internal { self.source.state(gst::ClockTime::ZERO).1 == gst::State::Paused } - /// Syncs audio with video when there is (inevitably) latency presenting the - /// frame. + /// Syncs audio with video when there is (inevitably) latency presenting the frame. pub(crate) fn set_av_offset(&mut self, offset: Duration) { if self.sync_av { self.sync_av_counter += 1; - self.sync_av_avg = self.sync_av_avg * (self.sync_av_counter - 1) / self.sync_av_counter + - offset.as_nanos() as u64 / self.sync_av_counter; + self.sync_av_avg = self.sync_av_avg * (self.sync_av_counter - 1) / self.sync_av_counter + + offset.as_nanos() as u64 / self.sync_av_counter; if self.sync_av_counter % 128 == 0 { self.source .set_property("av-offset", -(self.sync_av_avg as i64)); @@ -194,8 +174,7 @@ impl Internal { } } -/// A multimedia video loaded from a URI (e.g., a local file path or HTTP -/// stream). +/// A multimedia video loaded from a URI (e.g., a local file path or HTTP stream). #[derive(Debug)] pub struct Video(pub(crate) RwLock); @@ -212,8 +191,8 @@ impl Drop for Video { if let Some(worker) = inner.worker.take() { if let Err(err) = worker.join() { match err.downcast_ref::() { - | Some(e) => log::error!("Video thread panicked: {e}"), - | None => log::error!("Video thread panicked with unknown reason"), + Some(e) => log::error!("Video thread panicked: {e}"), + None => log::error!("Video thread panicked with unknown reason"), } } } @@ -226,12 +205,7 @@ impl Video { pub fn new(uri: &url::Url) -> Result { gst::init()?; - let pipeline = format!( - "playbin uri=\"{}\" text-sink=\"appsink name=iced_text sync=true caps=text/x-raw\" \ - video-sink=\"videoscale ! videoconvert ! appsink name=iced_video drop=true \ - caps=video/x-raw,format=NV12,pixel-aspect-ratio=1/1\"", - uri.as_str() - ); + let pipeline = format!("playbin uri=\"{}\" text-sink=\"appsink name=iced_text sync=true caps=text/x-raw\" video-sink=\"videoscale ! videoconvert ! appsink name=iced_video drop=true caps=video/x-raw,format=NV12,pixel-aspect-ratio=1/1\"", uri.as_str()); let pipeline = gst::parse::launch(pipeline.as_ref())? .downcast::() .map_err(|_| Error::Cast)?; @@ -248,7 +222,7 @@ impl Video { let video_sink = video_sink.downcast::().unwrap(); let text_sink: gst::Element = pipeline.property("text-sink"); - // let pad = text_sink.pads().get(0).cloned().unwrap(); + //let pad = text_sink.pads().get(0).cloned().unwrap(); let text_sink = text_sink.downcast::().unwrap(); Self::from_gst_pipeline(pipeline, video_sink, Some(text_sink)) @@ -300,10 +274,10 @@ impl Video { let framerate = cleanup!(s.get::("framerate").map_err(|_| Error::Caps))?; let framerate = framerate.numer() as f64 / framerate.denom() as f64; - if framerate.is_nan() || - framerate.is_infinite() || - framerate < 0.0 || - framerate.abs() < f64::EPSILON + if framerate.is_nan() + || framerate.is_infinite() + || framerate < 0.0 + || framerate.abs() < f64::EPSILON { let _ = pipeline.set_state(gst::State::Null); return Err(Error::Framerate(framerate)); @@ -319,7 +293,11 @@ impl Video { let sync_av = pipeline.has_property("av-offset", None); // NV12 = 12bpp - let frame = Arc::new(Mutex::new(Frame::new())); + let frame = Arc::new(Mutex::new(vec![ + 0u8; + (width as usize * height as usize * 3) + .div_ceil(2) + ])); let upload_frame = Arc::new(AtomicBool::new(false)); let alive = Arc::new(AtomicBool::new(true)); let last_frame_time = Arc::new(Mutex::new(Instant::now())); @@ -358,11 +336,12 @@ impl Video { let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let pts = buffer.pts().unwrap_or_default(); - { - let mut frame_guard = - frame_ref.lock().map_err(|_| gst::FlowError::Error)?; - *frame_guard = Frame(sample); - } + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + + let mut frame = frame_ref.lock().map_err(|_| gst::FlowError::Error)?; + let frame_len = frame.len(); + frame.copy_from_slice(&map.as_slice()[..frame_len]); + upload_frame_ref.swap(true, Ordering::SeqCst); if let Some(at) = clear_subtitles_at { @@ -433,7 +412,9 @@ impl Video { }))) } - pub(crate) fn read(&self) -> impl Deref + '_ { self.0.read().expect("lock") } + pub(crate) fn read(&self) -> impl Deref + '_ { + self.0.read().expect("lock") + } pub(crate) fn write(&self) -> impl DerefMut + '_ { self.0.write().expect("lock") @@ -444,60 +425,81 @@ impl Video { } /// Get the size/resolution of the video as `(width, height)`. - pub fn size(&self) -> (i32, i32) { (self.read().width, self.read().height) } + pub fn size(&self) -> (i32, i32) { + (self.read().width, self.read().height) + } /// Get the framerate of the video as frames per second. - pub fn framerate(&self) -> f64 { self.read().framerate } + pub fn framerate(&self) -> f64 { + self.read().framerate + } /// Set the volume multiplier of the audio. /// `0.0` = 0% volume, `1.0` = 100% volume. /// - /// This uses a linear scale, for example `0.5` is perceived as half as - /// loud. + /// This uses a linear scale, for example `0.5` is perceived as half as loud. pub fn set_volume(&mut self, volume: f64) { self.get_mut().source.set_property("volume", volume); - self.set_muted(self.muted()); // for some reason gstreamer unmutes when - // changing volume? + self.set_muted(self.muted()); // for some reason gstreamer unmutes when changing volume? } /// Get the volume multiplier of the audio. - pub fn volume(&self) -> f64 { self.read().source.property("volume") } + pub fn volume(&self) -> f64 { + self.read().source.property("volume") + } /// Set if the audio is muted or not, without changing the volume. - pub fn set_muted(&mut self, muted: bool) { self.get_mut().source.set_property("mute", muted); } + pub fn set_muted(&mut self, muted: bool) { + self.get_mut().source.set_property("mute", muted); + } /// Get if the audio is muted or not. - pub fn muted(&self) -> bool { self.read().source.property("mute") } + pub fn muted(&self) -> bool { + self.read().source.property("mute") + } /// Get if the stream ended or not. - pub fn eos(&self) -> bool { self.read().is_eos } + pub fn eos(&self) -> bool { + self.read().is_eos + } /// Get if the media will loop or not. - pub fn looping(&self) -> bool { self.read().looping } + pub fn looping(&self) -> bool { + self.read().looping + } /// Set if the media will loop or not. - pub fn set_looping(&mut self, looping: bool) { self.get_mut().looping = looping; } + pub fn set_looping(&mut self, looping: bool) { + self.get_mut().looping = looping; + } /// Set if the media is paused or not. - pub fn set_paused(&mut self, paused: bool) { self.get_mut().set_paused(paused) } + pub fn set_paused(&mut self, paused: bool) { + self.get_mut().set_paused(paused) + } /// Get if the media is paused or not. - pub fn paused(&self) -> bool { self.read().paused() } + pub fn paused(&self) -> bool { + self.read().paused() + } /// Jumps to a specific position in the media. - /// Passing `true` to the `accurate` parameter will result in more accurate - /// seeking, however, it is also slower. For most seeks (e.g., - /// scrubbing) this is not needed. + /// Passing `true` to the `accurate` parameter will result in more accurate seeking, + /// however, it is also slower. For most seeks (e.g., scrubbing) this is not needed. pub fn seek(&mut self, position: impl Into, accurate: bool) -> Result<(), Error> { self.get_mut().seek(position, accurate) } /// Set the playback speed of the media. /// The default speed is `1.0`. - pub fn set_speed(&mut self, speed: f64) -> Result<(), Error> { self.get_mut().set_speed(speed) } + pub fn set_speed(&mut self, speed: f64) -> Result<(), Error> { + self.get_mut().set_speed(speed) + } /// Get the current playback speed. - pub fn speed(&self) -> f64 { self.read().speed } + pub fn speed(&self) -> f64 { + self.read().speed + } /// Get the current playback position in time. pub fn position(&self) -> Duration { @@ -510,11 +512,14 @@ impl Video { } /// Get the media duration. - pub fn duration(&self) -> Duration { self.read().duration } + pub fn duration(&self) -> Duration { + self.read().duration + } - /// Restarts a stream; seeks to the first frame and unpauses, sets the `eos` - /// flag to false. - pub fn restart_stream(&mut self) -> Result<(), Error> { self.get_mut().restart_stream() } + /// Restarts a stream; seeks to the first frame and unpauses, sets the `eos` flag to false. + pub fn restart_stream(&mut self) -> Result<(), Error> { + self.get_mut().restart_stream() + } /// Set the subtitle URL to display. pub fn set_subtitle_url(&mut self, url: &url::Url) -> Result<(), Error> { @@ -532,14 +537,14 @@ impl Video { } /// Get the underlying GStreamer pipeline. - pub fn pipeline(&self) -> gst::Pipeline { self.read().source.clone() } + pub fn pipeline(&self) -> gst::Pipeline { + self.read().source.clone() + } - /// Generates a list of thumbnails based on a set of positions in the media, - /// downscaled by a given factor. + /// Generates a list of thumbnails based on a set of positions in the media, downscaled by a given factor. /// /// Slow; only needs to be called once for each instance. - /// It's best to call this at the very start of playback, otherwise the - /// position may shift. + /// It's best to call this at the very start of playback, otherwise the position may shift. pub fn thumbnails( &mut self, positions: I, @@ -569,12 +574,15 @@ impl Video { while !inner.upload_frame.load(Ordering::SeqCst) { std::hint::spin_loop(); } - let frame = inner.frame.lock().map_err(|_| Error::Lock)?; - let frame = frame.readable().ok_or(Error::Lock)?; Ok(img::Handle::from_rgba( inner.width as u32 / downscale, inner.height as u32 / downscale, - yuv_to_rgba(frame.as_slice(), width as _, height as _, downscale), + yuv_to_rgba( + &inner.frame.lock().map_err(|_| Error::Lock)?, + width as _, + height as _, + downscale, + ), )) }) .collect() @@ -610,7 +618,7 @@ fn yuv_to_rgba(yuv: &[u8], width: u32, height: u32, downscale: u32) -> Vec { rgba.push(r as u8); rgba.push(g as u8); rgba.push(b as u8); - rgba.push(0xff); + rgba.push(0xFF); } }