rewrite in GStreamer

This commit is contained in:
jazzfool 2020-08-22 22:35:34 +10:00
parent 7d05e3f36b
commit 8361c52d67
4 changed files with 309 additions and 223 deletions

View file

@ -1,247 +1,319 @@
use ffmpeg_next as ffmpeg;
use iced::{image, time, Command, Image, Subscription};
use std::sync::{Arc, Mutex};
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use iced::{image as img, Image, Subscription};
use num_traits::ToPrimitive;
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("{0}")]
Glib(#[from] glib::Error),
#[error("{0}")]
Bool(#[from] glib::BoolError),
#[error("failed to get the gstreamer bus")]
Bus,
#[error("{0}")]
StateChange(#[from] gst::StateChangeError),
#[error("failed to cast gstreamer element")]
Cast,
#[error("{0}")]
Io(#[from] std::io::Error),
#[error("invalid URI")]
Uri,
#[error("failed to get media capabilities")]
Caps,
#[error("failed to query media duration or position")]
Duration,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum VideoPlayerMessage {
NextFrame,
BufferingComplete {
fully_buffered: bool,
packet_count: usize,
},
}
/// Video player component which can playback videos from files or URLs.
/// Video player which handles multimedia playback.
pub struct VideoPlayer {
/// When the number of remaining buffered frames goes below this number, buffering automatically begins. Default is 100.
pub buffer_threshold: usize,
/// Number of packets (not frames) to read on each buffer. Default is 1000.
pub buffer_size: usize,
/// Whether the video is paused or not.
pub paused: bool,
bus: gst::Bus,
source: gst::Bin,
frame: Option<image::Handle>,
path: std::path::PathBuf,
buffered: Arc<Mutex<Vec<image::Handle>>>,
buffering: bool,
fully_buffered: bool,
current_frame: usize,
packet_count: usize,
width: i32,
height: i32,
framerate: f64,
duration: std::time::Duration,
framerate: f32,
frame_rx: crossbeam_channel::Receiver<img::Handle>,
frame: Option<img::Handle>,
pause: bool,
}
impl Drop for VideoPlayer {
fn drop(&mut self) {
self.source
.set_state(gst::State::Null)
.expect("failed to set state");
}
}
impl VideoPlayer {
pub fn new<P: AsRef<std::path::Path>>(path: &P) -> Result<Self, ffmpeg::Error> {
let video_data = VideoData::new(path)?;
let framerate = video_data
.decoder
.frame_rate()
.expect("failed to get framerate");
/// Create a new video player from a given video which loads from `uri`.
pub fn new(uri: &url::Url) -> Result<Self, Error> {
gst::init()?;
let buffered = Vec::new();
let (frame_tx, frame_rx) = crossbeam_channel::unbounded();
let source = gst::parse_launch(&format!("playbin uri=\"{}\" video-sink=\"videoconvert ! videoscale ! appsink name=app_sink caps=video/x-raw,format=BGRA,pixel-aspect-ratio=1/1\"", uri.as_str()))?;
let source = source.downcast::<gst::Bin>().unwrap();
let video_sink: gst::Element = source
.get_property("video-sink")
.unwrap()
.get()
.unwrap()
.unwrap();
let pad = video_sink.get_pads().get(0).cloned().unwrap();
let pad = pad.dynamic_cast::<gst::GhostPad>().unwrap();
let bin = pad
.get_parent_element()
.unwrap()
.downcast::<gst::Bin>()
.unwrap();
let app_sink = bin.get_by_name("app_sink").unwrap();
let app_sink = app_sink.downcast::<gst_app::AppSink>().unwrap();
app_sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
let pad = sink.get_static_pad("sink").ok_or(gst::FlowError::Error)?;
let caps = pad.get_current_caps().ok_or(gst::FlowError::Error)?;
let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?;
let width = s
.get::<i32>("width")
.map_err(|_| gst::FlowError::Error)?
.ok_or(gst::FlowError::Error)?;
let height = s
.get::<i32>("height")
.map_err(|_| gst::FlowError::Error)?
.ok_or(gst::FlowError::Error)?;
frame_tx
.send(img::Handle::from_pixels(
width as _,
height as _,
map.as_slice().to_owned(),
))
.map_err(|_| gst::FlowError::Error)?;
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
source.set_state(gst::State::Playing)?;
// wait for up to 1 second until the decoder gets the source capabilities
source.get_state(gst::ClockTime::from_seconds(1)).0?;
// extract resolution and framerate
// TODO(jazzfool): maybe we want to extract some other information too?
let caps = pad.get_current_caps().ok_or(Error::Caps)?;
let s = caps.get_structure(0).ok_or(Error::Caps)?;
let width = s
.get::<i32>("width")
.map_err(|_| Error::Caps)?
.ok_or(Error::Caps)?;
let height = s
.get::<i32>("height")
.map_err(|_| Error::Caps)?
.ok_or(Error::Caps)?;
let framerate = s
.get::<gst::Fraction>("framerate")
.map_err(|_| Error::Caps)?
.ok_or(Error::Caps)?;
let duration = std::time::Duration::from_nanos(
source
.query_duration::<gst::ClockTime>()
.ok_or(Error::Duration)?
.nanoseconds()
.ok_or(Error::Duration)?,
);
Ok(VideoPlayer {
buffer_threshold: 100,
buffer_size: 1000,
paused: false,
bus: source.get_bus().unwrap(),
source,
width,
height,
framerate: num_rational::Rational::new(
*framerate.numer() as _,
*framerate.denom() as _,
)
.to_f64().unwrap(/* if the video framerate is bad then it would've been implicitly caught far earlier */),
duration,
frame_rx,
frame: None,
path: path.as_ref().to_owned(),
buffered: Arc::new(Mutex::new(buffered)),
buffering: false,
fully_buffered: false,
current_frame: 0,
packet_count: 0,
framerate: framerate.0 as f32 / framerate.1 as f32,
pause: false,
})
}
fn buffer(&mut self) -> Command<VideoPlayerMessage> {
assert!(self.buffer_size > 0);
if !self.fully_buffered && !self.buffering {
self.buffering = true;
let path = self.path.clone();
let buffered = Arc::clone(&self.buffered);
let buffer_size = self.buffer_size;
let prev_packet_count = self.packet_count;
Command::perform(
tokio::spawn(async move {
let mut video_data =
VideoData::new(&path).expect("failed to initialize decoder");
let mut packet_count = 0;
for (stream, packet) in video_data.ictx.packets().skip(prev_packet_count) {
if packet_count >= buffer_size {
return (false, prev_packet_count + packet_count - 1);
}
if stream.index() == video_data.video_stream_index {
video_data.decoder.send_packet(&packet).unwrap();
let mut decoded = ffmpeg::util::frame::Video::empty();
while video_data.decoder.receive_frame(&mut decoded).is_ok() {
let mut rgb = ffmpeg::util::frame::Video::empty();
video_data
.scaler
.run(&decoded, &mut rgb)
.expect("failed to convert frame color space");
buffered
.lock()
.expect("failed to unlock buffered frames for buffering")
.push(image::Handle::from_pixels(
rgb.width(),
rgb.height(),
rgb.data(0).to_owned(),
));
}
}
packet_count += 1;
}
(true, prev_packet_count + packet_count - 1)
}),
|o| {
let (fully_buffered, packet_count) = o.expect("async error");
VideoPlayerMessage::BufferingComplete {
fully_buffered,
packet_count,
}
},
)
} else {
Command::none()
}
/// Get the size/resolution of the video as `(width, height)`.
pub fn size(&self) -> (i32, i32) {
(self.width, self.height)
}
/// Whether buffering is currently taking place in another thread.
pub fn is_buffering(&self) -> bool {
self.buffering
/// Get the framerate of the video as frames per second.
pub fn framerate(&self) -> f64 {
self.framerate
}
/// Returns the number of buffered frames.
pub fn buffered_frames_len(&self) -> usize {
self.buffered
.lock()
.expect("failed to lock buffered frames")
.len()
}
/// Returns a list of all the buffered frames as Iced image handles.
/// Set the volume multiplier of the audio.
/// `0.0` = 0% volume, `1.0` = 100% volume.
///
/// This may block if buffering is occurring.
pub fn buffered_frames(&self) -> Vec<image::Handle> {
self.buffered
.lock()
.expect("failed to lock buffered frames")
.clone() // image::Handle data is stored in Arc, so this isn't heavy
/// This uses a linear scale, for example `0.5` is perceived as half as loud.
pub fn set_volume(&mut self, volume: f64) {
self.source.set_property("volume", &volume).unwrap(/* this property is guaranteed to exist */);
}
/// Seeks to a specified frame number.
/// Set if the audio is muted or not, without changing the volume.
pub fn set_muted(&mut self, mute: bool) {
self.source.set_property("mute", &mute).unwrap();
}
/// Get if the audio is muted or not.
pub fn muted(&self) -> bool {
// guaranteed to be a boolean
self.source
.get_property("mute")
.unwrap()
.get()
.unwrap()
.unwrap()
}
/// Set if the media is paused or not.
pub fn set_paused(&mut self, pause: bool) {
self.pause = pause;
self.source
.set_state(if pause {
gst::State::Paused
} else {
gst::State::Playing
})
.unwrap(/* state was changed in ctor; state errors caught there */);
}
/// Get if the media is paused or not.
pub fn paused(&self) -> bool {
self.pause
}
/// Jumps to a specific time in the media.
/// The seeking is not perfectly accurate.
///
/// Panics if `frame >= buffered_frames_len()`
pub fn seek(&mut self, frame: usize) {
assert!(frame < self.buffered_frames_len());
self.current_frame = frame;
/// The position is converted to nanoseconds, so any duration with values more significant that nanoseconds is truncated.
pub fn seek(&mut self, position: std::time::Duration) -> Result<(), Error> {
self.source.seek_simple(
gst::SeekFlags::empty(),
gst::GenericFormattedValue::Time(gst::ClockTime::from_nseconds(
position.as_nanos() as _
)),
)?;
Ok(())
}
pub fn update(&mut self, message: VideoPlayerMessage) -> Command<VideoPlayerMessage> {
/// Get the current playback position.
pub fn position(&self) -> Option<std::time::Duration> {
std::time::Duration::from_nanos(
self.source
.query_position::<gst::ClockTime>()?
.nanoseconds()?,
)
.into()
}
/// Get the media duration.
pub fn duration(&self) -> std::time::Duration {
self.duration
}
pub fn update(&mut self, message: VideoPlayerMessage) {
match message {
VideoPlayerMessage::NextFrame => {
if self.paused {
return Command::none();
}
let (next_frame, len) = {
let buffered = self
.buffered
.lock()
.expect("failed to unlock buffered frames");
(buffered.get(self.current_frame).cloned(), buffered.len())
};
if let Some(img) = next_frame {
self.frame = Some(img.clone());
if self.current_frame < len - 1 {
self.current_frame += 1;
if len - self.current_frame < self.buffer_threshold {
self.buffer()
} else {
Command::none()
}
} else {
Command::none()
for msg in self.bus.iter() {
if let gst::MessageView::Error(err) = msg.view() {
panic!("{:#?}", err);
}
} else {
// no more frames
self.buffer()
}
}
VideoPlayerMessage::BufferingComplete {
fully_buffered,
packet_count,
} => {
self.buffering = false;
self.fully_buffered = fully_buffered;
self.packet_count = packet_count;
Command::none()
if let Ok(frame) = self.frame_rx.try_recv() {
self.frame = Some(frame);
}
}
}
}
pub fn subscription(&self) -> Subscription<VideoPlayerMessage> {
if !self.paused {
time::every(Duration::from_secs_f32(1.0 / self.framerate))
if !self.pause {
time::every(Duration::from_secs_f64(1.0 / self.framerate))
.map(|_| VideoPlayerMessage::NextFrame)
} else {
Subscription::none()
}
}
pub fn view(&mut self) -> Image {
Image::new(
self.frame
.clone()
.unwrap_or_else(|| image::Handle::from_pixels(0, 0, vec![])),
)
.into()
/// Get the image handle of the current frame.
pub fn frame_image(&self) -> img::Handle {
self.frame
.clone()
.unwrap_or_else(|| img::Handle::from_pixels(0, 0, vec![]))
}
/// Wrap the output of `frame_image` in an `Image` widget.
pub fn frame_view(&mut self) -> Image {
Image::new(self.frame_image())
}
}
struct VideoData {
ictx: ffmpeg::format::context::Input,
video_stream_index: usize,
decoder: ffmpeg::codec::decoder::Video,
scaler: ffmpeg::software::scaling::Context,
}
mod time {
use iced::futures;
impl VideoData {
fn new<P: AsRef<std::path::Path>>(path: &P) -> Result<Self, ffmpeg::Error> {
ffmpeg::init()?;
pub fn every(duration: std::time::Duration) -> iced::Subscription<std::time::Instant> {
iced::Subscription::from_recipe(Every(duration))
}
let ictx = ffmpeg::format::input(path)?;
let input = ictx.streams().best(ffmpeg::media::Type::Video).unwrap();
let video_stream_index = input.index();
let decoder = input.codec().decoder().video()?;
struct Every(std::time::Duration);
let scaler = ffmpeg::software::scaling::Context::get(
decoder.format(),
decoder.width(),
decoder.height(),
ffmpeg::format::Pixel::BGRA,
decoder.width(),
decoder.height(),
ffmpeg::software::scaling::Flags::BILINEAR,
)?;
impl<H, I> iced_native::subscription::Recipe<H, I> for Every
where
H: std::hash::Hasher,
{
type Output = std::time::Instant;
Ok(VideoData {
ictx,
video_stream_index,
decoder,
scaler,
})
fn hash(&self, state: &mut H) {
use std::hash::Hash;
std::any::TypeId::of::<Self>().hash(state);
self.0.hash(state);
}
fn stream(
self: Box<Self>,
_input: futures::stream::BoxStream<'static, I>,
) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::stream::StreamExt;
tokio::time::interval(self.0)
.map(|_| std::time::Instant::now())
.boxed()
}
}
}