这篇文章将为大家详细讲解有关如何用nodejs源码分析线程,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
创新互联是一家专业提供港闸企业网站建设,专注与成都做网站、网站制作、成都h5网站建设、小程序制作等业务。10年已为港闸众多企业、政府机构等服务。创新互联专业网站制作公司优惠进行中。
我们先看一下一般的使用例子。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 做点耗时的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
我们先分析一下这个代码的意思。因为上面的代码在主线程和子线程都会被执行一遍。所以首先通过isMainThread判断当前是主线程还是子线程。主线程的话,就创建一个子线程,然后监听子线程发过来的消息。子线程的话,首先执行业务相关的代码,还可以监听主线程传过来的消息。下面我们开始分析源码。分析完,会对上面的代码有更多的理解。
首先我们从worker_threads模块开始分析。这是一个c++模块。我们看一下他导出的功能。require("work_threads")的时候就是引用了InitWorker函数导出的功能。
void InitWorker(Local
翻译成js大概是
function c++Worker(object) {
// 关联起来,后续在js层调用c++层函数时,取出来,拿到c++层真正的worker对象
object[0] = this;
...
}
function New(object) {
const worker = new c++Worker(object);
}
function Worker() {
New(this);
}
Worker.prototype = {
startThread,StartThread,
StopThread: StopThread,
...
}
module.exports = {
Worker: Worker,
getEnvMessagePort: GetEnvMessagePort,
isMainThread: true | false
...
}
了解work_threads模块导出的功能后,我们看new Worker的时候的逻辑。根据上面代码导出的逻辑,我们知道这时候首先会新建一个c++对象。对应上面的Worker函数中的this。然后执行New回调,并传入tihs。我们看New函数的逻辑。我们省略一系列的参数处理,主要代码如下。
// args.This()就是我们刚才传进来的this
Worker* worker = new Worker(env, args.This(),
url, per_isolate_opts,
std::move(exec_argv_out));
我们再看Worker类。
Worker::Worker(Environment* env,
Local
新建一个Worker,结构如下
了解了new Worker的逻辑后,我们看在js层是如何使用的。我们看js层Worker类的构造函数。
constructor(filename, options = {}) {
super();
// 忽略一系列参数处理,new Worker就是上面提到的c++层的
this[kHandle] = new Worker(url, options.execArgv, parseResourceLimits(options.resourceLimits));
// messagePort就是上面图中的messagePort,指向_parent_port
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (data) => this[kOnMessage](data));
// 开始接收消息,我们这里不深入messagePort,后续单独分析
this[kPort].start();
// 申请一个通信管道,两个端口
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
// 向另一端发送消息
this[kPort].postMessage({
argv,
type: messageTypes.LOAD_SCRIPT,
filename,
doEval: !!options.eval,
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
publicPort: port2,
manifestSrc: getOptionValue('--experimental-policy') ?
require('internal/process/policy').src :
null,
hasStdin: !!options.stdin
}, [port2]);
// 开启线程
this[kHandle].startThread();
}
上面的代码主要逻辑如下
1 保存messagePort,然后给messagePort的对端(看上面的图)发送消息,但是这时候还没有接收者,所以消息会缓存到MessagePortData,即child_port_data_ 中。
2 申请一个通信管道,用于主线程和子线程通信。_parent_port和child_port是给nodejs使用的,新申请的管道是给用户使用的。
3 创建子线程。
我们看创建线程的时候,做了什么。
void Worker::StartThread(const FunctionCallbackInfo& args) {
Worker* w;
// 解包出对应的Worker对象
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
// 新建一个子线程,然后执行Run函数,从此在子线程里执行
uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
w->Run();
}, static_cast(w))
}
我们继续看Run
void Worker::Run() {
{
// 新建一个env
env_.reset(new Environment(data.isolate_data_.get(),
context,
std::move(argv_),
std::move(exec_argv_),
Environment::kNoFlags,
thread_id_));
// 初始化libuv,往libuv注册
env_->InitializeLibuv(start_profiler_idle_notifier_);
// 创建一个MessagePort
CreateEnvMessagePort(env_.get());
// 执行internal/main/worker_thread.js
StartExecution(env_.get(), "internal/main/worker_thread");
// 开始事件循环
do {
uv_run(&data.loop_, UV_RUN_DEFAULT);
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
if (more && !is_stopped()) continue;
more = uv_loop_alive(&data.loop_);
} while (more == true && !is_stopped());
}
}
我们分步骤分析上面的代码
1 CreateEnvMessagePort
void Worker::CreateEnvMessagePort(Environment* env) {
child_port_ = MessagePort::New(env,
env->context(),
std::move(child_port_data_));
if (child_port_ != nullptr)
env->set_message_port(child_port_->object(isolate_));
}
child_port_data_这个变量我们应该很熟悉,在这里首先申请一个新的端口。负责端口中数据管理的对象是child_port_data_。然后在env缓存起来。一会要用。
2 执行internal/main/worker_thread.js
// 设置process对象
patchProcessObject();
// 获取刚才缓存的端口
onst port = getEnvMessagePort();
port.on('message', (message) => {
// 加载脚本
if (message.type === LOAD_SCRIPT) {
const {
argv,
cwdCounter,
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
const CJSLoader = require('internal/modules/cjs/loader');
loadPreloadModules();
/*
由主线程申请的MessageChannel管道中,某一端的端口,
设置publicWorker的parentPort字段,publicWorker就是worker_threads导出的对象,后面需要用
*/
publicWorker.parentPort = publicPort;
// 执行时使用的数据
publicWorker.workerData = workerData;
// 通知主线程,正在执行脚本
port.postMessage({ type: UP_AND_RUNNING });
// 执行new Worker(filename)时传入的文件
CJSLoader.Module.runMain(filename);
})
// 开始接收消息
port.start()
这时候我们再回头看一下,我们调用new Worker(filename),然后在子线程里执行我们的filename时的场景。我们再次回顾前面的代码。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 做点耗时的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
我们知道isMainThread在子线程里是false,parentPort 就是就是messageChannel中的一端。所以parentPort.postMessage给对端发送消息,就是给主线程发送消息,我们再看看worker.postMessage('Hello, world!')。
postMessage(...args) {
this[kPublicPort].postMessage(...args);
}
kPublicPort指向的就是messageChannel的另一端。即给子线程发送消息。那么on('message')就是接收对端发过来的消息。
关于如何用nodejs源码分析线程就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
本文名称:如何用nodejs源码分析线程
网页地址:http://lswzjz.com/article/pdosod.html