Rust 赋能前端: 视频抽帧
❝如果你能想得到,就能做得到
大家好,我是柒八九。一个专注于前端开发技术/Rust
及AI
应用知识分享的Coder
❝此篇文章所涉及到的技术有
WebAssembly
Rust
wasm-bindgen
线程池
Vite+React/Vue
(下面的内容,在各种前端框架中都用)
因为,行文字数所限,有些概念可能会一带而过亦或者提供对应的学习资料。请大家酌情观看。
前言
老粉都知道,我之前接手了一个内容审读的开发需求。它是个啥呢,它需要对各种文档资源进行解析
和展示
。
在上周呢,我们写了一篇Rust 赋能前端:PDF 分页/关键词标注/转图片/抽取文本/抽取图片/翻转...,在里面介绍如何在前端环境中(React/Vue
)中使用Mupdf
,用于执行各种PDF
的操作。
在我们系统中,有一个需求就是视频抽帧。也就是对一个视频资源基于某些特征将其关键帧抽离成图片信息。然后对其进行OCR
识别,并且基于关键字标注处理。
我们今天就来讲讲如何使用WebAssembly
对视频资源进行抽帧处理。对于OCR
部分我们会单开一篇。
效果展示
可以看到,我们将一个时长快5
分多的视频,仅用时8秒(波动在5-9秒之间)就将其抽成69
个关键帧。
好了,天不早了,干点正事哇。
我们能所学到的知识点
❝
项目初始化 技术选择的初衷 Rust + WebAssembly 抽帧处理
1. 项目初始化
还是一样的套路,我们还是基于f_cli_f[1]来构建的前端Vite+React+TS
项目。
当我们通过yarn/npm
安装好对应的包时。我们就可以在pages
新建一个Video2Img
的目录,然后直接构建一个index.tsx
即可。
随后,我们在src
目录下构建一个wasm
目录来存放在前端项目中要用到的各种wasm
。针对这个例子,我们构建一个video2Img
的目录,用于存放Rust
编译后的文件。
2. 技术选择的初衷
其实呢,针对视频抽帧的需求,我们可以不用WebAssembly
来处理。直接使用浏览器原生API
就可以实现。
用原生API实现视频抽帧
const extractFrames = async (file: File) => {
const video = document.createElement('video');
const src = URL.createObjectURL(file);
video.src = src;
await video.play();
video.pause(); // 暂停播放以进行逐帧处理
const canvas = document.createElement('canvas');
const ctx = canvas.getContext('2d');
if (!ctx) return;
canvas.width = video.videoWidth;
canvas.height = video.videoHeight;
const frameArray: string[] = [];
const totalFrames = Math.floor(video.duration * frameRate);
for (let i = 0; i < totalFrames; i++) {
video.currentTime = i / frameRate; // 设置视频当前时间
await new Promise<void>((resolve) => {
video.onseeked = () => {
ctx.drawImage(video, 0, 0, canvas.width, canvas.height);
frameArray.push(canvas.toDataURL('image/jpeg'));
resolve();
};
});
}
};
上面的代码就是利用原生API实现视频抽帧的简单版本。
它的主要核心点就是
-
创建视频和画布元素:使用
<video>
元素加载视频文件,并创建<canvas>
用于捕获视频帧。 -
逐帧提取图像:
-
通过 video.currentTime
设置视频的播放时间。 -
使用 onseeked 事件 [2]在视频跳转到特定时间后,捕获当前帧。
-
-
渲染帧到画布:将视频帧绘制到画布中,然后使用
canvas.toDataURL
将帧转换为 [Base64 编码]( "Base64 编码")的 JPEG 图像。 -
异步处理:使用
await
和Promise
确保每帧的提取和处理按顺序进行。
当然,上面的代码只是一个简单版本,其实还有很多的优化空间。
例如:上面的代码在直接调用 video.play()
后暂停并逐帧处理,但没有等待视频元数据[3](如时长、帧率、宽高等)加载完成。如果不等元数据加载,视频可能还没有完全准备好,导致一些延迟。
通过监听视频的 loadedmetadata 事件[4],可以确保在开始处理之前,所有必要的元数据(如视频时长、宽度、高度)都已加载完成。这能避免因元数据未加载完而导致的时间跳转缓慢或帧提取卡顿,从而提升整体处理效率。
选择使用Rust+WebAssembly的原因
大家从上面的代码核心点可知。在处理过程中,出现了几种数据类型。
-
视频资源 -
video元素 -
canvas -
image
还记得之前我们写过宝贝,带上WebAssembly,换个姿势来优化你的前端应用其中有一节就是讲到,如果在前端绘制内容比较复杂的图片资源时,可以往Rust+WebAssembly
上靠。
还记得这两张图吗?
上面的示例,可能在有些同学眼中有点牵强。然后,我们继续来说另外一个我选择使用Rust+WebAssembly
处理视频抽帧的。
如果大家写过Rust+WebAssembly
的程序的话,想必肯定听说过大名鼎鼎的 - wasm-bindgen[5]。如果想在Rust
使用浏览器的一些API
例如(Document/Window
等)还离不开它 - web-sys[6]
然后,不知道大家注意过这个例子不 - Parallel Raytracing[7]。它是利用Rust的多线程[8]操作同一份数据并将这些数据信息绘制到一个canvas
中。(强烈建议大家在浏览器实操一下)
下面是它的效果图
上面的绘制时间,其实远远不是这个比例。
看到上面的两个的示例,这就在心底埋下了种子 - 使用Rust+WebAssembly+多线程
进行视频抽帧。
❝至于是否能成功,你不试试咋知道成不成功呢。
下面,我们来尝试用Rust+WebAssembly
实现抽帧的逻辑。
3. Rust + WebAssembly 抽帧处理
Rust项目初始化
使用cargo new --lib audio2img
的Rust
的项目。
然后,使用cargo install
安装对应的wasm-bindgen/js-sys/web-sys
等包。
因为,我们在Rust
中需要用到Document/HtmlCanvasElement
等信息,我们还需要对web-sys
做一些配置。
最后在Cargo.toml
中有如下的安装信息。
[package]
name = "audio2img"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.30"
js-sys = "0.3.69"
serde-wasm-bindgen = "0.6.5"
wasm-bindgen = "0.2.92"
wasm-bindgen-futures = "0.4.42"
[lib]
crate-type = ["cdylib"]
[dependencies.web-sys]
version = "0.3"
features = [
'Window',
'Document',
'Element',
'HtmlCanvasElement',
'CanvasRenderingContext2d',
'HtmlVideoElement',
'Event'
]
实现JS版本的平替版本(简单版本)
在上一节中我们介绍过,我们完全可以用原生js来实现抽帧处理。那么,我们所要做的就是,实现一个最简单版本的Rust
视频抽帧版本。
话不多说,我们直接上代码。在src/lib.rs
中直接操作。
use futures::channel::oneshot;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use web_sys::{ window, HtmlVideoElement, HtmlCanvasElement, CanvasRenderingContext2d };
use js_sys::Promise;
use serde_wasm_bindgen::to_value;
use std::rc::Rc;
use std::cell::RefCell;
#[wasm_bindgen]
pub async fn extract_frames_from_url(video_url: &str, frame_rate: f64) -> Result<JsValue, JsValue> {
// 创建 video 元素
let document = window().unwrap().document().unwrap();
let video_element = document.create_element("video")?.dyn_into::<HtmlVideoElement>()?;
// 设置视频来源
video_element.set_src(video_url);
video_element.set_cross_origin(Some("anonymous"));
// 加载视频元数据
let loaded_metadata_promise = Promise::new(
&mut (|resolve, _| {
let resolve = Rc::new(RefCell::new(Some(resolve)));
let onloadedmetadata_closure = Closure::wrap(
Box::new(move || {
if let Some(resolve) = resolve.borrow_mut().take() {
resolve.call0(&JsValue::NULL).unwrap();
}
}) as Box<dyn FnMut()>
);
video_element.set_onloadedmetadata(
Some(onloadedmetadata_closure.as_ref().unchecked_ref())
);
onloadedmetadata_closure.forget();
})
);
JsFuture::from(loaded_metadata_promise).await?;
// 暂停视频
video_element.pause()?;
// 创建 canvas 元素
let canvas_element = document.create_element("canvas")?.dyn_into::<HtmlCanvasElement>()?;
canvas_element.set_width(video_element.video_width() as u32);
canvas_element.set_height(video_element.video_height() as u32);
// 获取 canvas 上下文
let ctx = canvas_element.get_context("2d")?.unwrap().dyn_into::<CanvasRenderingContext2d>()?;
let mut frame_array = Vec::new();
let total_frames = (video_element.duration() * frame_rate) as i32;
for i in 0..total_frames {
video_element.set_current_time((i as f64) / frame_rate);
// 通过监听 onseeked 事件同步获取帧
let (sender, receiver) = oneshot::channel();
let sender = Rc::new(RefCell::new(Some(sender)));
let onseeked_closure = Closure::wrap(
Box::new(move || {
if let Some(sender) = sender.borrow_mut().take() {
let _ = sender.send(());
}
}) as Box<dyn FnMut()>
);
video_element.set_onseeked(Some(onseeked_closure.as_ref().unchecked_ref()));
onseeked_closure.forget();
// 等待 onseeked 事件触发
receiver.await.unwrap();
ctx.draw_image_with_html_video_element(&video_element, 0.0, 0.0).unwrap();
let frame_data = canvas_element.to_data_url().unwrap();
frame_array.push(frame_data);
}
Ok(to_value(&frame_array).unwrap())
}
由于这段代码比较简单,同时呢为了兼顾不同同学的Rust
审读能力。我们就对这段代码做一下比较详细的讲解。
主要逻辑总结
-
创建并配置 HTML 元素:创建 video
和canvas
元素,设置视频源并调整canvas
尺寸。 -
等待视频元数据加载:通过 onloadedmetadata
确保视频元数据加载完成,避免提前操作视频。 -
逐帧跳转并捕获帧: -
使用 set_current_time
来逐帧调整视频的当前时间。 -
通过监听 onseeked
事件确保每次时间跳转后处理帧。
-
-
绘制帧到 canvas:将每一帧绘制到 canvas
,然后转换为 Base64 格式的图像数据。 -
返回帧数据:将帧数据数组通过 WebAssembly
和Rust
返回给JavaScript
。
可以看到,我们除了第5步和之前的JS
逻辑不同,其他的代码思路都和之前的出奇的一致。
代码逻辑
然后,我们再对Rust
的代码做一次较为详细的解读。
-
创建
video
元素:let document = window().unwrap().document().unwrap();
let video_element = document.create_element("video")?.dyn_into::<HtmlVideoElement>()?;-
通过 window()
和document()
,在浏览器环境下创建一个 HTMLvideo
元素,用于加载和播放视频。
-
-
设置视频来源:
video_element.set_src(video_url);
video_element.set_cross_origin(Some("anonymous"));-
设置视频的 src
,从传入的video_url
加载视频。 -
设置 cross_origin
为"anonymous"
,用于跨域加载资源,确保视频可以被绘制到 canvas 中。
-
-
等待视频元数据加载完成:
let loaded_metadata_promise = Promise::new(
&mut (|resolve, _| {
let resolve = Rc::new(RefCell::new(Some(resolve)));
let onloadedmetadata_closure = Closure::wrap(
Box::new(move || {
if let Some(resolve) = resolve.borrow_mut().take() {
resolve.call0(&JsValue::NULL).unwrap();
}
}) as Box<dyn FnMut()>
);
video_element.set_onloadedmetadata(
Some(onloadedmetadata_closure.as_ref().unchecked_ref())
);
onloadedmetadata_closure.forget();
})
);
JsFuture::from(loaded_metadata_promise).await?;-
创建一个 JavaScript Promise
,并监听video
元素的onloadedmetadata
事件。 -
只有当视频元数据(例如时长、尺寸等)加载完成后,才能开始进一步操作。 -
Promise
是异步的,等到onloadedmetadata
事件触发后继续代码执行。
-
-
暂停视频播放:
video_element.pause()?;
-
暂停视频,防止自动播放,确保可以逐帧处理视频。
-
-
创建
canvas
元素并设置大小:let canvas_element = document.create_element("canvas")?.dyn_into::<HtmlCanvasElement>()?;
canvas_element.set_width(video_element.video_width() as u32);
canvas_element.set_height(video_element.video_height() as u32);-
创建一个 HTML canvas
元素,并设置其宽高与视频的宽高一致,以便绘制视频帧。
-
-
获取 canvas 的绘图上下文:
let ctx = canvas_element.get_context("2d")?.unwrap().dyn_into::<CanvasRenderingContext2d>()?;
-
获取 canvas
的 2D 绘图上下文 (CanvasRenderingContext2d
),用于在 canvas 上绘制视频帧。
-
-
逐帧处理视频:
let total_frames = (video_element.duration() * frame_rate) as i32;
for i in 0..total_frames {
video_element.set_current_time((i as f64) / frame_rate);
let (sender, receiver) = oneshot::channel();
let sender = Rc::new(RefCell::new(Some(sender)));
let onseeked_closure = Closure::wrap(
Box::new(move || {
if let Some(sender) = sender.borrow_mut().take() {
let _ = sender.send(());
}
}) as Box<dyn FnMut()>
);
video_element.set_onseeked(Some(onseeked_closure.as_ref().unchecked_ref()));
onseeked_closure.forget();
receiver.await.unwrap();
ctx.draw_image_with_html_video_element(&video_element, 0.0, 0.0).unwrap();
let frame_data = canvas_element.to_data_url().unwrap();
frame_array.push(frame_data);
}-
计算总帧数 total_frames
。 -
每次循环设置 video_element.set_current_time
调整视频的当前时间。 -
使用 oneshot::channel()
实现同步等待onseeked
事件触发,确保在时间跳转完毕后再进行下一步操作。 -
使用 ctx.draw_image_with_html_video_element
绘制视频帧到canvas
。 -
使用 canvas_element.to_data_url()
将当前帧转换为 Base64 编码的图像数据,保存到frame_array
数组中。
-
-
返回帧数组:
Ok(to_value(&frame_array).unwrap())
-
使用 serde_wasm_bindgen::to_value
将frame_array
转换为 JavaScript 值JsValue
,并返回帧数据数组。
-
涉及的重要 Rust 概念
-
wasm_bindgen:
-
宏和工具,用于在 Rust
中与JavaScript
交互,特别是在WebAssembly
中调用JavaScript
API。 -
如 #[wasm_bindgen]
标记的函数可以被JavaScript
调用。
-
-
futures::channel::oneshot:
-
Rust
的异步工具,提供了一个简单的oneshot
通道,用于线程间或异步任务之间进行一次性消息传递。在这里用于同步等待onseeked
事件触发。
-
-
Closure::wrap:
-
将 Rust
的闭包转换为JavaScript
的闭包,并传递给 DOM 事件处理器。在处理onloadedmetadata
和onseeked
时使用。
-
-
Rc 和 RefCell:
-
Rc
(Reference Counted
)是引用计数智能指针,用于在多个地方共享数据。 -
RefCell
允许在运行时进行可变借用,配合Rc
使用,解决共享状态下的可变性问题。
-
-
JsFuture::from(Promise):
-
将 JavaScript
的Promise
转换为Rust
的Future
,以便在异步代码中使用await
。
-
有些概念,例如Closure::wrap
和Rc
和 RefCell
在我们之前的Rust学习笔记中都有涉猎。
运行效果
上面的效果就是我们把编译好的Rust
代码在前端环境执行的效果。能达到抽帧的效果,但是有几点瑕疵。
-
只有在视频解析完成后,我们才会拿到最后的数据信息,也就是我们页面中有很长的一段空窗期。这很不好,我们要那种抽离出一个页面就像前端返回,要有及时性 -
处理的时间过长,对比文章刚开始,相同的视频,抽离69张图片,需要耗时13秒。这也是我们不能容忍的。
所以接下来,我们来优化上面的代码
新增callback
首先,我们来解决上面的第一个问题,我们不要在视频处理完后才向前端返回信息,我们要的是没处理完一批数据,前端就可以先渲染。也就是在文章刚开始的那个效果。
话不多说,我们直接上代码。
use futures::channel::oneshot;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use web_sys::{ window, HtmlVideoElement, HtmlCanvasElement, CanvasRenderingContext2d };
use js_sys::{ Function, Promise };
use std::rc::Rc;
use std::cell::RefCell;
#[wasm_bindgen]
pub async fn extract_frames_from_url(
video_url: &str,
frame_rate: f64,
callback: &JsValue
) -> Result<(), JsValue> {
let document = window().unwrap().document().unwrap();
let video_element = document.create_element("video")?.dyn_into::<HtmlVideoElement>()?;
video_element.set_src(video_url);
video_element.set_cross_origin(Some("anonymous"));
let loaded_metadata_promise = Promise::new(
&mut (|resolve, _| {
let resolve = Rc::new(RefCell::new(Some(resolve)));
let onloadedmetadata_closure = Closure::wrap(
Box::new(move || {
if let Some(resolve) = resolve.borrow_mut().take() {
resolve.call0(&JsValue::NULL).unwrap();
}
}) as Box<dyn FnMut()>
);
video_element.set_onloadedmetadata(
Some(onloadedmetadata_closure.as_ref().unchecked_ref())
);
onloadedmetadata_closure.forget();
})
);
JsFuture::from(loaded_metadata_promise).await?;
video_element.pause()?;
let canvas_element = document.create_element("canvas")?.dyn_into::<HtmlCanvasElement>()?;
canvas_element.set_width(video_element.video_width() as u32);
canvas_element.set_height(video_element.video_height() as u32);
let ctx = canvas_element.get_context("2d")?.unwrap().dyn_into::<CanvasRenderingContext2d>()?;
let total_frames = (video_element.duration() * frame_rate) as i32;
let onseeked_closure = Rc::new(RefCell::new(None));
let video_element_clone = video_element.clone();
// 将 JsValue 转换为 JavaScript 的回调函数
let callback_function = callback.dyn_ref::<Function>().ok_or("callback function is not valid")?;
for i in 0..total_frames {
let (sender, receiver) = oneshot::channel();
let sender = Rc::new(RefCell::new(Some(sender)));
*onseeked_closure.borrow_mut() = Some(
Closure::wrap(
Box::new({
let sender = sender.clone();
move || {
if let Some(sender) = sender.borrow_mut().take() {
let _ = sender.send(());
}
}
}) as Box<dyn FnMut()>
)
);
video_element_clone.set_onseeked(
Some(onseeked_closure.borrow().as_ref().unwrap().as_ref().unchecked_ref())
);
video_element_clone.set_current_time((i as f64) / frame_rate);
receiver.await.unwrap();
ctx.draw_image_with_html_video_element(&video_element_clone, 0.0, 0.0).unwrap();
let frame_data = canvas_element.to_data_url().unwrap();
// 每次生成frame_data后,立即调用回调函数,将frame_data传递给前端
let js_frame_data = JsValue::from_str(&frame_data);
callback_function.call1(&JsValue::NULL, &js_frame_data)?;
}
video_element.set_onseeked(None);
Ok(())
}
对比之前代码的改变
-
添加了 callback 参数:
-
新增了 callback: &JsValue
参数,允许调用 JavaScript 回调函数将每一帧的图像数据实时传递给前端,而不是像之前那样返回整个帧数组。 -
通过 callback_function.call1()
方法,在每一帧生成后立即将帧数据传递给回调函数。
-
-
去除了 frame_array:
-
之前的代码将所有帧数据保存到一个数组 frame_array
中,最后一次性返回整个数组。 -
新代码去掉了 frame_array
,改为每次生成帧数据后立即调用回调函数,因此不需要在 Rust 端保存全部帧数据。
-
-
优化 onseeked 事件处理:
-
原代码每次生成帧时都创建一个新的 Closure
监听onseeked
事件。 -
优化后, onseeked_closure
是通过Rc<RefCell<>>
来缓存的,因此可以在循环中复用它,避免每次都生成新的闭包对象。 -
这一优化减少了闭包的创建和内存分配,提升了代码的性能。
-
-
JsValue 转 Function:
-
在新代码中,通过 callback.dyn_ref::<Function>()
将传入的JsValue
转换为 JavaScript 回调函数 (Function
),并在每一帧处理完后调用该回调。 -
这一点增强了 Rust 端与 JavaScript 端的交互,提供了实时处理的能力。
-
优点
-
实时传递帧数据:
-
提升了内存效率:不再需要将所有帧数据存入一个大数组,而是每处理一帧立即发送给前端。这样减少了内存占用,尤其是对于处理长视频时,帧数据不再需要在 Rust 端缓存。 -
更流畅的用户体验:帧数据可以实时传递给前端,这意味着视频的帧捕获和渲染可以并行进行,不必等待所有帧处理完毕后再返回结果。用户可以立即看到处理中的帧。
-
-
复用闭包,减少性能开销:
-
通过 Rc<RefCell<>>
复用onseeked_closure
,减少了闭包的创建和销毁,降低了闭包生成的开销,提高了代码的性能。
-
-
灵活性增强:
-
新的 callback
参数使得这段代码更加灵活,用户可以自定义如何处理帧数据(例如显示在页面上,存储到服务器,或者进行其他操作)。通过回调机制,处理每帧数据的逻辑可以完全在前端控制。
-
-
更简洁的结构:
-
由于去掉了帧数据数组 frame_array
,整个代码逻辑更加清晰,减少了无用的变量管理,提升了可读性。
-
效果展示
可以看到,我们通过加入了callback
后,不仅在页面交互上有所优化,在渲染速度上也有一定的提升。
也就是说,我们通过上面的改动,都解决了上面的两个顽疾。
使用多线程
其实吧,在经过优化后,上面的代码已经能够达成我们的需求了。之前文章也写了,我们之所以选用Rust
来处理视频抽帧,是看中了它的多线程能力。
然后,我们就尝试往这边靠。具体实现思路呢,还是和raytrace-parallel[9]一致。
所幸,它也有完整的代码实现。然后,咱也照猫画虎实现一遍,然后如果有性能不可靠的地方,我们在见招拆招。
❝先说结果,本来我们想使用rayon[10]来初始化多个线程,并且实例化多个video实例,每个video处理一部分视频的解析处理,从而达到缩短处理时间的目的,但是呢由于
document/video
的实例无法在多线程中共享数据,然后达不到这种效果,后来不了了之。
部分代码如下所示:(这是一个不成功的案例,但是我们的思路就是分而治之)
use futures::channel::oneshot;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use web_sys::{window, HtmlVideoElement, HtmlCanvasElement, CanvasRenderingContext2d};
use js_sys::{Function, Promise};
use std::rc::Rc;
use std::cell::RefCell;
use rayon::prelude::*;
use std::sync::{Arc, Mutex};
use wasm_bindgen_rayon::init_thread_pool;
#[wasm_bindgen]
pub async fn extract_frames_from_url(
video_url: &str,
frame_rate: f64,
callback: &JsValue
) -> Result<(), JsValue> {
init_thread_pool(8).expect("Failed to initialize thread pool");
let document = Arc::new(window().unwrap().document().unwrap());
let video_element1 = document.create_element("video")?.dyn_into::<HtmlVideoElement>()?;
let video_element2 = document.create_element("video")?.dyn_into::<HtmlVideoElement>()?;
video_element1.set_src(video_url);
video_element2.set_src(video_url);
video_element1.set_cross_origin(Some("anonymous"));
video_element2.set_cross_origin(Some("anonymous"));
// 省略部分代码
video_element1.pause()?;
video_element2.pause()?;
let total_frames = (video_element1.duration() * frame_rate) as i32;
let midpoint = total_frames / 2;
let document_clone1 = Arc::clone(&document);
let document_clone2 = Arc::clone(&document);
let process_frames = move |
video_element: HtmlVideoElement,
start_frame: i32,
end_frame: i32,
document: Arc<web_sys::Document>
| {
let video_element_arc = Arc::new(Mutex::new(video_element));
let ctx = {
let canvas_element = document.create_element("canvas").unwrap().dyn_into::<HtmlCanvasElement>().unwrap();
canvas_element.set_width(video_element_arc.lock().unwrap().video_width() as u32);
canvas_element.set_height(video_element_arc.lock().unwrap().video_height() as u32);
canvas_element.get_context("2d").unwrap().unwrap().dyn_into::<CanvasRenderingContext2d>().unwrap()
};
let callback_function = Arc::new(callback.dyn_ref::<Function>().unwrap().clone());
(start_frame..end_frame).into_par_iter().for_each(|i| {
let video_element = Arc::clone(&video_element_arc);
let callback_function = Arc::clone(&callback_function);
let mut video_element = video_element.lock().unwrap();
let (sender, receiver) = oneshot::channel();
let sender = Rc::new(RefCell::new(Some(sender)));
let onseeked_closure = Closure::wrap(
Box::new({
let sender = sender.clone();
move || {
if let Some(sender) = sender.borrow_mut().take() {
let _ = sender.send(());
web_sys::console::log_1(&format!("Frame {} processed", i).into());
}
}
}) as Box<dyn FnMut()>
);
video_element.set_onseeked(Some(onseeked_closure.as_ref().unchecked_ref()));
video_element.set_current_time((i as f64) / frame_rate);
receiver.await.unwrap();
ctx.draw_image_with_html_video_element(&*video_element, 0.0, 0.0).unwrap();
let frame_data = ctx.canvas().unwrap().to_data_url().unwrap();
let js_frame_data = JsValue::from_str(&frame_data);
callback_function.call1(&JsValue::NULL, &js_frame_data).unwrap();
});
};
let task1 = process_frames(video_element1, 0, midpoint, document_clone1);
let task2 = process_frames(video_element2, midpoint, total_frames, document_clone2);
futures::join!(task1, task2);
Ok(())
}
虽然,结果很无奈,但是其实也是给了我们很多思路,就像raytrace-parallel
利用多线程实现对一批数据实现多线程处理,并且将结果返回的思路。
当后面我们遇到类似的功能,我们也是可以往这边靠拢的。
创建一个 web worker 池
下面的代码,我们是从raytrace-parallel
中cv
下来的,但是也是一个完成的代码片段,以后如果有需要,我们可以直接使用。
// 关闭编译器警告:当目标架构不是 wasm 时,忽略 Work.func 和 child_entry_point 未使用的警告
#![cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
// 演示了如何创建一个 web worker 池,以便执行类似于 `rayon` 的任务。
use std::cell::RefCell;
use std::rc::Rc;
use wasm_bindgen::prelude::*;
use web_sys::{ DedicatedWorkerGlobalScope, MessageEvent, WorkerOptions };
use web_sys::{ ErrorEvent, Event, Worker, WorkerType };
#[wasm_bindgen]
pub struct WorkerPool {
state: Rc<PoolState>,
worker_path: String,
}
struct PoolState {
workers: RefCell<Vec<Worker>>,
callback: Closure<dyn FnMut(Event)>,
}
struct Work {
func: Box<dyn FnOnce() + Send>,
}
#[wasm_bindgen]
impl WorkerPool {
/// 创建一个新的 `WorkerPool`,该池立即创建 `initial` 个 worker。
///
/// 该池可以长期使用,并且会初始填充 `initial` 个 worker。
/// 目前,除非整个池被销毁,否则不会释放或回收 worker。
///
/// # 错误
///
/// 如果在创建 JS web worker 或发送消息时发生任何错误,将返回该错误。
#[wasm_bindgen(constructor)]
pub fn new(initial: usize, worker_path: String) -> Result<WorkerPool, JsValue> {
let pool = WorkerPool {
state: Rc::new(PoolState {
workers: RefCell::new(Vec::with_capacity(initial)),
callback: Closure::new(|event: Event| {
console_log!("未处理的事件: {}", event.type_());
crate::logv(&event);
}),
}),
worker_path: worker_path.to_string(),
};
for _ in 0..initial {
let worker = pool.spawn(worker_path.clone())?;
pool.state.push(worker);
}
Ok(pool)
}
/// 无条件地生成一个新的 worker
///
/// 该 worker 不会被注册到此 `WorkerPool`,但能够为此 wasm 模块执行任务。
///
/// # 错误
///
/// 如果在创建 JS web worker 或发送消息时发生任何错误,将返回该错误。
fn spawn(&self, worker_path: String) -> Result<Worker, JsValue> {
console_log!("生成新 worker");
/// 通过在调用处,传人worker_path,来构建一个module 类型的worker
let mut opts = WorkerOptions::new();
opts.set_type(WorkerType::Module);
// 创建 Worker
let worker = Worker::new_with_options(&worker_path, &opts).map_err(|err|
JsValue::from(err)
)?;
// 在生成 worker 后,发送模块/内存以便它可以开始实例化 wasm 模块。
// 稍后,它可能会收到有关在 wasm 模块上运行代码的进一步消息。
let array = js_sys::Array::new();
array.push(&wasm_bindgen::module());
array.push(&wasm_bindgen::memory());
worker.post_message(&array)?;
Ok(worker)
}
/// 从此池中获取一个 worker,必要时生成一个新的。
///
/// 如果有可用的已生成的 web worker,这将尝试从缓存中提取一个,否则将生成一个新的 worker 并返回新生成的 worker。
///
/// # 错误
///
/// 如果在创建 JS web worker 或发送消息时发生任何错误,将返回该错误。
fn worker(&self) -> Result<Worker, JsValue> {
match self.state.workers.borrow_mut().pop() {
Some(worker) => Ok(worker),
None => self.spawn(self.worker_path.clone()),
}
}
/// 在 web worker 中执行任务 `f`,必要时生成 web worker。
///
/// 这将获取一个 web worker,然后将闭包 `f` 发送给 worker 执行。在 `f` 执行期间,该 worker 将不可用于其他任务,且不会为 worker 完成时注册回调。
///
/// # 错误
///
/// 如果在创建 JS web worker 或发送消息时发生任何错误,将返回该错误。
fn execute(&self, f: impl FnOnce() + Send + 'static) -> Result<Worker, JsValue> {
let worker = self.worker()?;
let work = Box::new(Work { func: Box::new(f) });
let ptr = Box::into_raw(work);
match worker.post_message(&JsValue::from(ptr as u32)) {
Ok(()) => Ok(worker),
Err(e) => {
unsafe {
drop(Box::from_raw(ptr));
}
Err(e)
}
}
}
/// 为指定的 `worker` 配置一个 `onmessage` 回调,以便在收到消息时回收并重新插入此池。
///
/// 当前,此 `WorkerPool` 抽象用于执行一次性任务,其中任务本身不会发送任何通知,任务完成后 worker 准备执行更多任务。
/// 此方法用于所有生成的 worker,以确保任务完成后 worker 被回收到此池中。
fn reclaim_on_message(&self, worker: Worker) {
let state = Rc::downgrade(&self.state);
let worker2 = worker.clone();
let reclaim_slot = Rc::new(RefCell::new(None));
let slot2 = reclaim_slot.clone();
let reclaim = Closure::<dyn FnMut(_)>::new(move |event: Event| {
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
console_log!("worker 中的错误: {}", error.message());
// TODO: 这可能会导致内存泄漏?目前尚不清楚如何处理 worker 中的错误。
return;
}
// 如果这是一个完成事件,可以通过清空 `slot2` 来释放自己的回调,其中包含我们的闭包。
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
if let Some(state) = state.upgrade() {
state.push(worker2.clone());
}
*slot2.borrow_mut() = None;
return;
}
console_log!("未处理的事件: {}", event.type_());
crate::logv(&event);
// TODO: 与上面类似,这里可能也存在内存泄漏?
});
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
*reclaim_slot.borrow_mut() = Some(reclaim);
}
}
impl WorkerPool {
/// 在 web worker 中执行 `f`。
///
/// 此池管理一组可供使用的 web worker,如果 worker 空闲,`f` 将被快速分配给一个 worker。如果没有空闲的 worker 可用,则会生成一个新的 web worker。
///
/// 一旦 `f` 返回,分配给 `f` 的 worker 将自动被此 `WorkerPool` 回收。此方法不提供了解 `f` 何时完成的方法,对于此类需求,你需要使用 `run_notify`。
///
/// # 错误
///
/// 如果在生成 web worker 或向其发送消息时发生错误,将返回该错误。
pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> {
let worker = self.execute(f)?;
self.reclaim_on_message(worker);
Ok(())
}
}
impl PoolState {
fn push(&self, worker: Worker) {
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
let mut workers = self.workers.borrow_mut();
for prev in workers.iter() {
let prev: &JsValue = prev;
let worker: &JsValue = &worker;
assert!(prev != worker);
}
workers.push(worker);
}
}
/// 由 `worker.js` 调用的入口点
#[wasm_bindgen(js_name = childEntryPoint)]
pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
let ptr = unsafe { Box::from_raw(ptr as *mut Work) };
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
(ptr.func)();
global.post_message(&JsValue::undefined())?;
Ok(())
}
后记
分享是一种态度。
全文完,既然看到这里了,如果觉得不错,随手点个赞和“在看”吧。
f_cli_f: https://www.npmjs.com/package/f_cli_f
[2]onseeked 事件: https://developer.mozilla.org/en-US/docs/Web/API/HTMLMediaElement/seeked_event
[3]视频元数据: https://wistia.com/learn/marketing/video-metadata
[4]loadedmetadata 事件: loadedmetadata_event
[5]wasm-bindgen: https://rustwasm.github.io/wasm-bindgen/introduction.html
[6]web-sys: https://rustwasm.github.io/wasm-bindgen/examples/dom.html
[7]Parallel Raytracing: https://rustwasm.github.io/wasm-bindgen/examples/raytrace.html
[8]Rust的多线程: https://docs.rs/threadpool/latest/threadpool/
[9]raytrace-parallel: https://wasm-bindgen.netlify.app/exbuild/raytrace-parallel/
[10]rayon: https://github.com/rayon-rs/rayon
本文由 mdnice 多平台发布