From 8361c52d675712a94684b86a8820e0669ca0ae4c Mon Sep 17 00:00:00 2001 From: jazzfool Date: Sat, 22 Aug 2020 22:35:34 +1000 Subject: [PATCH] rewrite in GStreamer --- Cargo.toml | 14 +- README.md | 37 ++-- examples/minimal.rs | 25 +-- src/lib.rs | 456 +++++++++++++++++++++++++------------------- 4 files changed, 309 insertions(+), 223 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b6c757b..588d512 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,14 @@ authors = ["jazzfool"] edition = "2018" [dependencies] -iced = { git = "https://github.com/hecrj/iced", features = ["image", "tokio"] } -ffmpeg-next = "4" -tokio = { version = "0.2", features = ["rt-threaded", "macros"] } +iced = { version = "0.1", features = ["image", "tokio"] } +iced_native = "0.2" +gstreamer = "0.16" +gstreamer-app = "0.16" # appsink +glib = "0.10" # gobject traits and error type +tokio = { version = "0.2", features = ["time", "stream"] }# time subscription (every frame) +thiserror = "1" +crossbeam-channel = "0.4" +url = "2" # media uri +num-rational = "0.3" # framerates come in rationals +num-traits = "0.2" # convert rationals to floats (ToPrimitive) diff --git a/README.md b/README.md index d2069db..abfc30e 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,26 @@ # Iced Video Player Widget -Composable component to play videos in any Iced application. +Composable component to play videos in any Iced application built on the excellent GStreamer library. ## Overview +In general, this supports anything that [`gstreamer/playbin`](https://gstreamer.freedesktop.org/documentation/playback/playbin.html?gi-language=c) supports. + Features: - Load video files from any file path **or URL** (support for streaming over network). -- Non-blocking (off-thread) automatic buffering. -- Programmatic play/pause/jump. -- Small (around 250 lines). +- Video buffering when streaming on a network. +- Audio support. +- Programmatic control. +- Small (around 300 lines). -Limitations (to be hopefully fixed): -- Cannot load in-memory video data. -- Audio playback is not supported. -- Buffering does not support seeking arbitrarily - you can only seek to buffered frames. -- FFmpeg is a heavy dependency and overkill (open to recommendations for similar *cross-platform* Rust libraries). +Limitations (hopefully to be fixed): +- GStreamer is a bit annoying to set up on Windows. -The player **does not** come with any surrounding GUI controls, but they should be quite easy to implement should you need them; -- Play/pause/stop can just be buttons. -- Seeking can be a slider with an overlay of the thumbnail at the seek time. -Specifically, the player exposes the buffered frames as images which can be used as thumbnails. -Through the same API, you can show the user which portions of the video have been buffered. +This is a "composable" instead of a `iced::Widget`. This is because `Widget`s don't support subscriptions (yet?). Once Iced gets animation support (i.e. widgets scheduling a time to update), this can become a widget. + +The player **does not** come with any surrounding GUI controls, but they should be quite easy to implement should you need them. ## Example Usage @@ -51,7 +49,7 @@ impl Application for App { fn new(_flags: ()) -> (Self, Command) { ( App { - video: VideoPlayer::new(&"my_video.mp4").unwrap(), + video: VideoPlayer::new(&url::Url::parse("file:///C:/my_video.mp4").unwrap()).unwrap(), }, Command::none(), ) @@ -63,8 +61,9 @@ impl Application for App { fn update(&mut self, message: Message) -> Command { match message { - Message::VideoPlayerMessage(msg) => self.video.update(msg).map(Message::VideoPlayerMessage), + Message::VideoPlayerMessage(msg) => self.video.update(msg), } + Command::none() } fn subscription(&self) -> Subscription { @@ -72,11 +71,15 @@ impl Application for App { } fn view(&mut self) -> Element { - self.video.view() + self.video.frame_view().into() } } ``` +## Building + +Follow the [GStreamer build instructions](https://github.com/sdroege/gstreamer-rs#installation). This should be able to compile on MSVC, MinGW, Linux, and MacOS. + ## License Licensed under either diff --git a/examples/minimal.rs b/examples/minimal.rs index 109244c..90bb42d 100644 --- a/examples/minimal.rs +++ b/examples/minimal.rs @@ -25,10 +25,15 @@ impl Application for App { ( App { video: VideoPlayer::new( - &std::path::PathBuf::from(file!()) - .parent() - .unwrap() - .join("../.media/test.mp4"), + &url::Url::from_file_path( + std::path::PathBuf::from(file!()) + .parent() + .unwrap() + .join("../.media/test.mp4") + .canonicalize() + .unwrap(), + ) + .unwrap(), ) .unwrap(), pause_btn: Default::default(), @@ -44,14 +49,12 @@ impl Application for App { fn update(&mut self, message: Message) -> Command { match message { Message::TogglePause => { - self.video.paused = !self.video.paused; - - Command::none() - } - Message::VideoPlayerMessage(msg) => { - self.video.update(msg).map(Message::VideoPlayerMessage) + self.video.set_paused(!self.video.paused()); } + Message::VideoPlayerMessage(msg) => self.video.update(msg), } + + Command::none() } fn subscription(&self) -> Subscription { @@ -60,7 +63,7 @@ impl Application for App { fn view(&mut self) -> Element { Column::new() - .push(self.video.view()) + .push(self.video.frame_view()) .push( Button::new(&mut self.pause_btn, Text::new("Toggle Pause")) .on_press(Message::TogglePause), diff --git a/src/lib.rs b/src/lib.rs index 6eb8b6b..c4a1a44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,247 +1,319 @@ -use ffmpeg_next as ffmpeg; -use iced::{image, time, Command, Image, Subscription}; -use std::sync::{Arc, Mutex}; +use gst::prelude::*; +use gstreamer as gst; +use gstreamer_app as gst_app; +use iced::{image as img, Image, Subscription}; +use num_traits::ToPrimitive; use std::time::Duration; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("{0}")] + Glib(#[from] glib::Error), + #[error("{0}")] + Bool(#[from] glib::BoolError), + #[error("failed to get the gstreamer bus")] + Bus, + #[error("{0}")] + StateChange(#[from] gst::StateChangeError), + #[error("failed to cast gstreamer element")] + Cast, + #[error("{0}")] + Io(#[from] std::io::Error), + #[error("invalid URI")] + Uri, + #[error("failed to get media capabilities")] + Caps, + #[error("failed to query media duration or position")] + Duration, +} #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum VideoPlayerMessage { NextFrame, - BufferingComplete { - fully_buffered: bool, - packet_count: usize, - }, } -/// Video player component which can playback videos from files or URLs. +/// Video player which handles multimedia playback. pub struct VideoPlayer { - /// When the number of remaining buffered frames goes below this number, buffering automatically begins. Default is 100. - pub buffer_threshold: usize, - /// Number of packets (not frames) to read on each buffer. Default is 1000. - pub buffer_size: usize, - /// Whether the video is paused or not. - pub paused: bool, + bus: gst::Bus, + source: gst::Bin, - frame: Option, - path: std::path::PathBuf, - buffered: Arc>>, - buffering: bool, - fully_buffered: bool, - current_frame: usize, - packet_count: usize, + width: i32, + height: i32, + framerate: f64, + duration: std::time::Duration, - framerate: f32, + frame_rx: crossbeam_channel::Receiver, + frame: Option, + pause: bool, +} + +impl Drop for VideoPlayer { + fn drop(&mut self) { + self.source + .set_state(gst::State::Null) + .expect("failed to set state"); + } } impl VideoPlayer { - pub fn new>(path: &P) -> Result { - let video_data = VideoData::new(path)?; - let framerate = video_data - .decoder - .frame_rate() - .expect("failed to get framerate"); + /// Create a new video player from a given video which loads from `uri`. + pub fn new(uri: &url::Url) -> Result { + gst::init()?; - let buffered = Vec::new(); + let (frame_tx, frame_rx) = crossbeam_channel::unbounded(); + + let source = gst::parse_launch(&format!("playbin uri=\"{}\" video-sink=\"videoconvert ! videoscale ! appsink name=app_sink caps=video/x-raw,format=BGRA,pixel-aspect-ratio=1/1\"", uri.as_str()))?; + let source = source.downcast::().unwrap(); + + let video_sink: gst::Element = source + .get_property("video-sink") + .unwrap() + .get() + .unwrap() + .unwrap(); + let pad = video_sink.get_pads().get(0).cloned().unwrap(); + let pad = pad.dynamic_cast::().unwrap(); + let bin = pad + .get_parent_element() + .unwrap() + .downcast::() + .unwrap(); + + let app_sink = bin.get_by_name("app_sink").unwrap(); + let app_sink = app_sink.downcast::().unwrap(); + + app_sink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?; + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + + let pad = sink.get_static_pad("sink").ok_or(gst::FlowError::Error)?; + + let caps = pad.get_current_caps().ok_or(gst::FlowError::Error)?; + let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?; + let width = s + .get::("width") + .map_err(|_| gst::FlowError::Error)? + .ok_or(gst::FlowError::Error)?; + let height = s + .get::("height") + .map_err(|_| gst::FlowError::Error)? + .ok_or(gst::FlowError::Error)?; + + frame_tx + .send(img::Handle::from_pixels( + width as _, + height as _, + map.as_slice().to_owned(), + )) + .map_err(|_| gst::FlowError::Error)?; + + Ok(gst::FlowSuccess::Ok) + }) + .build(), + ); + + source.set_state(gst::State::Playing)?; + + // wait for up to 1 second until the decoder gets the source capabilities + source.get_state(gst::ClockTime::from_seconds(1)).0?; + + // extract resolution and framerate + // TODO(jazzfool): maybe we want to extract some other information too? + let caps = pad.get_current_caps().ok_or(Error::Caps)?; + let s = caps.get_structure(0).ok_or(Error::Caps)?; + let width = s + .get::("width") + .map_err(|_| Error::Caps)? + .ok_or(Error::Caps)?; + let height = s + .get::("height") + .map_err(|_| Error::Caps)? + .ok_or(Error::Caps)?; + let framerate = s + .get::("framerate") + .map_err(|_| Error::Caps)? + .ok_or(Error::Caps)?; + + let duration = std::time::Duration::from_nanos( + source + .query_duration::() + .ok_or(Error::Duration)? + .nanoseconds() + .ok_or(Error::Duration)?, + ); Ok(VideoPlayer { - buffer_threshold: 100, - buffer_size: 1000, - paused: false, + bus: source.get_bus().unwrap(), + source, + width, + height, + framerate: num_rational::Rational::new( + *framerate.numer() as _, + *framerate.denom() as _, + ) + .to_f64().unwrap(/* if the video framerate is bad then it would've been implicitly caught far earlier */), + duration, + + frame_rx, frame: None, - path: path.as_ref().to_owned(), - buffered: Arc::new(Mutex::new(buffered)), - buffering: false, - fully_buffered: false, - current_frame: 0, - packet_count: 0, - - framerate: framerate.0 as f32 / framerate.1 as f32, + pause: false, }) } - fn buffer(&mut self) -> Command { - assert!(self.buffer_size > 0); - - if !self.fully_buffered && !self.buffering { - self.buffering = true; - let path = self.path.clone(); - let buffered = Arc::clone(&self.buffered); - let buffer_size = self.buffer_size; - let prev_packet_count = self.packet_count; - Command::perform( - tokio::spawn(async move { - let mut video_data = - VideoData::new(&path).expect("failed to initialize decoder"); - - let mut packet_count = 0; - for (stream, packet) in video_data.ictx.packets().skip(prev_packet_count) { - if packet_count >= buffer_size { - return (false, prev_packet_count + packet_count - 1); - } - - if stream.index() == video_data.video_stream_index { - video_data.decoder.send_packet(&packet).unwrap(); - let mut decoded = ffmpeg::util::frame::Video::empty(); - while video_data.decoder.receive_frame(&mut decoded).is_ok() { - let mut rgb = ffmpeg::util::frame::Video::empty(); - video_data - .scaler - .run(&decoded, &mut rgb) - .expect("failed to convert frame color space"); - buffered - .lock() - .expect("failed to unlock buffered frames for buffering") - .push(image::Handle::from_pixels( - rgb.width(), - rgb.height(), - rgb.data(0).to_owned(), - )); - } - } - packet_count += 1; - } - - (true, prev_packet_count + packet_count - 1) - }), - |o| { - let (fully_buffered, packet_count) = o.expect("async error"); - VideoPlayerMessage::BufferingComplete { - fully_buffered, - packet_count, - } - }, - ) - } else { - Command::none() - } + /// Get the size/resolution of the video as `(width, height)`. + pub fn size(&self) -> (i32, i32) { + (self.width, self.height) } - /// Whether buffering is currently taking place in another thread. - pub fn is_buffering(&self) -> bool { - self.buffering + /// Get the framerate of the video as frames per second. + pub fn framerate(&self) -> f64 { + self.framerate } - /// Returns the number of buffered frames. - pub fn buffered_frames_len(&self) -> usize { - self.buffered - .lock() - .expect("failed to lock buffered frames") - .len() - } - - /// Returns a list of all the buffered frames as Iced image handles. + /// Set the volume multiplier of the audio. + /// `0.0` = 0% volume, `1.0` = 100% volume. /// - /// This may block if buffering is occurring. - pub fn buffered_frames(&self) -> Vec { - self.buffered - .lock() - .expect("failed to lock buffered frames") - .clone() // image::Handle data is stored in Arc, so this isn't heavy + /// This uses a linear scale, for example `0.5` is perceived as half as loud. + pub fn set_volume(&mut self, volume: f64) { + self.source.set_property("volume", &volume).unwrap(/* this property is guaranteed to exist */); } - /// Seeks to a specified frame number. + /// Set if the audio is muted or not, without changing the volume. + pub fn set_muted(&mut self, mute: bool) { + self.source.set_property("mute", &mute).unwrap(); + } + + /// Get if the audio is muted or not. + pub fn muted(&self) -> bool { + // guaranteed to be a boolean + self.source + .get_property("mute") + .unwrap() + .get() + .unwrap() + .unwrap() + } + + /// Set if the media is paused or not. + pub fn set_paused(&mut self, pause: bool) { + self.pause = pause; + self.source + .set_state(if pause { + gst::State::Paused + } else { + gst::State::Playing + }) + .unwrap(/* state was changed in ctor; state errors caught there */); + } + + /// Get if the media is paused or not. + pub fn paused(&self) -> bool { + self.pause + } + + /// Jumps to a specific time in the media. + /// The seeking is not perfectly accurate. /// - /// Panics if `frame >= buffered_frames_len()` - pub fn seek(&mut self, frame: usize) { - assert!(frame < self.buffered_frames_len()); - self.current_frame = frame; + /// The position is converted to nanoseconds, so any duration with values more significant that nanoseconds is truncated. + pub fn seek(&mut self, position: std::time::Duration) -> Result<(), Error> { + self.source.seek_simple( + gst::SeekFlags::empty(), + gst::GenericFormattedValue::Time(gst::ClockTime::from_nseconds( + position.as_nanos() as _ + )), + )?; + Ok(()) } - pub fn update(&mut self, message: VideoPlayerMessage) -> Command { + /// Get the current playback position. + pub fn position(&self) -> Option { + std::time::Duration::from_nanos( + self.source + .query_position::()? + .nanoseconds()?, + ) + .into() + } + + /// Get the media duration. + pub fn duration(&self) -> std::time::Duration { + self.duration + } + + pub fn update(&mut self, message: VideoPlayerMessage) { match message { VideoPlayerMessage::NextFrame => { - if self.paused { - return Command::none(); - } - - let (next_frame, len) = { - let buffered = self - .buffered - .lock() - .expect("failed to unlock buffered frames"); - (buffered.get(self.current_frame).cloned(), buffered.len()) - }; - - if let Some(img) = next_frame { - self.frame = Some(img.clone()); - - if self.current_frame < len - 1 { - self.current_frame += 1; - if len - self.current_frame < self.buffer_threshold { - self.buffer() - } else { - Command::none() - } - } else { - Command::none() + for msg in self.bus.iter() { + if let gst::MessageView::Error(err) = msg.view() { + panic!("{:#?}", err); } - } else { - // no more frames - self.buffer() } - } - VideoPlayerMessage::BufferingComplete { - fully_buffered, - packet_count, - } => { - self.buffering = false; - self.fully_buffered = fully_buffered; - self.packet_count = packet_count; - Command::none() + + if let Ok(frame) = self.frame_rx.try_recv() { + self.frame = Some(frame); + } } } } pub fn subscription(&self) -> Subscription { - if !self.paused { - time::every(Duration::from_secs_f32(1.0 / self.framerate)) + if !self.pause { + time::every(Duration::from_secs_f64(1.0 / self.framerate)) .map(|_| VideoPlayerMessage::NextFrame) } else { Subscription::none() } } - pub fn view(&mut self) -> Image { - Image::new( - self.frame - .clone() - .unwrap_or_else(|| image::Handle::from_pixels(0, 0, vec![])), - ) - .into() + /// Get the image handle of the current frame. + pub fn frame_image(&self) -> img::Handle { + self.frame + .clone() + .unwrap_or_else(|| img::Handle::from_pixels(0, 0, vec![])) + } + + /// Wrap the output of `frame_image` in an `Image` widget. + pub fn frame_view(&mut self) -> Image { + Image::new(self.frame_image()) } } -struct VideoData { - ictx: ffmpeg::format::context::Input, - video_stream_index: usize, - decoder: ffmpeg::codec::decoder::Video, - scaler: ffmpeg::software::scaling::Context, -} +mod time { + use iced::futures; -impl VideoData { - fn new>(path: &P) -> Result { - ffmpeg::init()?; + pub fn every(duration: std::time::Duration) -> iced::Subscription { + iced::Subscription::from_recipe(Every(duration)) + } - let ictx = ffmpeg::format::input(path)?; - let input = ictx.streams().best(ffmpeg::media::Type::Video).unwrap(); - let video_stream_index = input.index(); - let decoder = input.codec().decoder().video()?; + struct Every(std::time::Duration); - let scaler = ffmpeg::software::scaling::Context::get( - decoder.format(), - decoder.width(), - decoder.height(), - ffmpeg::format::Pixel::BGRA, - decoder.width(), - decoder.height(), - ffmpeg::software::scaling::Flags::BILINEAR, - )?; + impl iced_native::subscription::Recipe for Every + where + H: std::hash::Hasher, + { + type Output = std::time::Instant; - Ok(VideoData { - ictx, - video_stream_index, - decoder, - scaler, - }) + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::().hash(state); + self.0.hash(state); + } + + fn stream( + self: Box, + _input: futures::stream::BoxStream<'static, I>, + ) -> futures::stream::BoxStream<'static, Self::Output> { + use futures::stream::StreamExt; + + tokio::time::interval(self.0) + .map(|_| std::time::Instant::now()) + .boxed() + } } }