use background thread to pull frames

This commit is contained in:
jazzfool 2024-10-01 14:44:22 +10:00
parent 4ab464c7d5
commit be2b7d9257
2 changed files with 50 additions and 33 deletions

View file

@ -44,7 +44,8 @@ pub(crate) struct Internal {
pub(crate) bus: gst::Bus, pub(crate) bus: gst::Bus,
pub(crate) source: gst::Pipeline, pub(crate) source: gst::Pipeline,
pub(crate) app_sink: gst_app::AppSink, pub(crate) alive: Arc<AtomicBool>,
pub(crate) worker: Option<std::thread::JoinHandle<()>>,
pub(crate) width: i32, pub(crate) width: i32,
pub(crate) height: i32, pub(crate) height: i32,
@ -53,7 +54,7 @@ pub(crate) struct Internal {
pub(crate) speed: f64, pub(crate) speed: f64,
pub(crate) frame: Arc<Mutex<Vec<u8>>>, pub(crate) frame: Arc<Mutex<Vec<u8>>>,
pub(crate) upload_frame: AtomicBool, pub(crate) upload_frame: Arc<AtomicBool>,
pub(crate) paused: bool, pub(crate) paused: bool,
pub(crate) muted: bool, pub(crate) muted: bool,
pub(crate) looping: bool, pub(crate) looping: bool,
@ -136,27 +137,6 @@ impl Internal {
self.restart_stream = true; self.restart_stream = true;
} }
} }
pub(crate) fn read_frame(&self) -> Result<(), gst::FlowError> {
if self.source.state(None).1 != gst::State::Playing {
return Ok(());
}
let sample = self
.app_sink
.pull_sample()
.map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
let mut frame = self.frame.lock().map_err(|_| gst::FlowError::Error)?;
let frame_len = frame.len();
frame.copy_from_slice(&map.as_slice()[..frame_len]);
self.upload_frame.swap(true, Ordering::SeqCst);
Ok(())
}
} }
/// A multimedia video loaded from a URI (e.g., a local file path or HTTP stream). /// A multimedia video loaded from a URI (e.g., a local file path or HTTP stream).
@ -164,11 +144,17 @@ pub struct Video(pub(crate) RefCell<Internal>);
impl Drop for Video { impl Drop for Video {
fn drop(&mut self) { fn drop(&mut self) {
self.0 let inner = self.0.get_mut();
.borrow()
inner
.source .source
.set_state(gst::State::Null) .set_state(gst::State::Null)
.expect("failed to set state"); .expect("failed to set state");
inner.alive.store(false, Ordering::SeqCst);
if let Some(worker) = inner.worker.take() {
worker.join().expect("failed to stop video thread");
}
} }
} }
@ -232,14 +218,45 @@ impl Video {
); );
// NV12 = 12bpp // NV12 = 12bpp
let frame = vec![0u8; (width as usize * height as usize * 3).div_ceil(2)]; let frame = Arc::new(Mutex::new(vec![
0u8;
(width as usize * height as usize * 3)
.div_ceil(2)
]));
let upload_frame = Arc::new(AtomicBool::new(false));
let alive = Arc::new(AtomicBool::new(true));
let frame_ref = Arc::clone(&frame);
let upload_frame_ref = Arc::clone(&upload_frame);
let alive_ref = Arc::clone(&alive);
let worker = std::thread::spawn(move || {
while alive_ref.load(Ordering::SeqCst) {
if let Err(gst::FlowError::Error) = (|| -> Result<(), gst::FlowError> {
let sample = app_sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
let mut frame = frame_ref.lock().map_err(|_| gst::FlowError::Error)?;
let frame_len = frame.len();
frame.copy_from_slice(&map.as_slice()[..frame_len]);
upload_frame_ref.swap(true, Ordering::SeqCst);
Ok(())
})() {
log::error!("error pulling frame");
}
}
});
Ok(Video(RefCell::new(Internal { Ok(Video(RefCell::new(Internal {
id, id,
bus: pipeline.bus().unwrap(), bus: pipeline.bus().unwrap(),
source: pipeline, source: pipeline,
app_sink, alive,
worker: Some(worker),
width, width,
height, height,
@ -247,8 +264,8 @@ impl Video {
duration, duration,
speed: 1.0, speed: 1.0,
frame: Arc::new(Mutex::new(frame)), frame,
upload_frame: AtomicBool::new(false), upload_frame,
paused: false, paused: false,
muted: false, muted: false,
looping: false, looping: false,
@ -382,7 +399,10 @@ impl Video {
.iter() .iter()
.map(|&pos| { .map(|&pos| {
inner.seek(pos, true)?; inner.seek(pos, true)?;
inner.read_frame().map_err(|_| Error::Sync)?; inner.upload_frame.store(false, Ordering::SeqCst);
while !inner.upload_frame.load(Ordering::SeqCst) {
std::hint::spin_loop();
}
Ok(img::Handle::from_rgba( Ok(img::Handle::from_rgba(
inner.width as _, inner.width as _,
inner.height as _, inner.height as _,
@ -400,8 +420,6 @@ impl Video {
self.set_muted(muted); self.set_muted(muted);
self.seek(pos, true)?; self.seek(pos, true)?;
self.0.borrow().read_frame().map_err(|_| Error::Sync)?;
out out
} }
} }

View file

@ -143,7 +143,6 @@ where
_viewport: &iced::Rectangle, _viewport: &iced::Rectangle,
) { ) {
let inner = self.video.0.borrow_mut(); let inner = self.video.0.borrow_mut();
let _ = inner.read_frame();
// bounds based on `Image::draw` // bounds based on `Image::draw`
let image_size = iced::Size::new(inner.width as f32, inner.height as f32); let image_size = iced::Size::new(inner.width as f32, inner.height as f32);