积分4 / 贡献0

提问0答案被采纳0文章2

[经验分享] Worker源码走读

中移-zhangzhou 显示全部楼层 发表于 2024-7-4 11:02:25
# 前言

Worker主要作用是为应用程序提供一个多线程的运行环境,可满足应用程序在执行过程中与主线程分离,在后台线程中运行一个脚本操作耗时操作,极大避免类似于计算密集型或高延迟的任务阻塞主线程的运行。在js中也有Worker实现多线程,OpenHarmony中的Worker从原理到用法也是对js中的Worker继承。

# 构造初始化

```
##worker.cpp

//ThreadWorker
napi_value Worker::ThreadWorkerConstructor(napi_env env, napi_callback_info cbinfo)
{
    if (CanCreateWorker(env, WorkerVersion::NEW)) {
        return Constructor(env, cbinfo, false, WorkerVersion::NEW);
    }
    return nullptr;
}

//Worker(deprecated)
napi_value Worker::WorkerConstructor(napi_env env, napi_callback_info cbinfo)
{
    if (CanCreateWorker(env, WorkerVersion::OLD)) {
        return Constructor(env, cbinfo, false, WorkerVersion::OLD);
    }
    return nullptr;
}
```

```
napi_value Worker::Constructor(napi_env env, napi_callback_info cbinfo, bool limitSign, WorkerVersion version)
{
    //参数校验
    ......
  
    HILOG_INFO("worker:: Worker start constructor");
    Worker* worker = nullptr;
    if (limitSign) {
        //RestrictedWorker的处理逻辑
        ......
    } else {
        //最大worker数
        int maxWorkers = (version == WorkerVersion::NEW) ? MAX_THREAD_WORKERS : MAX_WORKERS;
    #if defined(OHOS_PLATFORM)
        maxWorkers = OHOS::system::GetIntParameter<int>("persist.commonlibrary.maxworkers", maxWorkers);
    #endif
        std::lock_guard<std::mutex> lock(g_workersMutex);
        // 当前worker数达到最大限制worker数,抛出异常
        if (static_cast<int>(g_workers.size()) >= maxWorkers) {
            HILOG_ERROR("worker:: the number of workers exceeds the maximum");
            ErrorHelper::ThrowError(env,
                ErrorHelper::ERR_WORKER_INITIALIZATION, "the number of workers exceeds the maximum.");
            return nullptr;
        }

        // 2. new worker instance
        worker = new Worker(env, nullptr);
       ......
        // worker实例加入list
        g_workers.push_back(worker);
    }
    ......

    // 3. execute in thread
    char* script = NapiHelper::GetString(env, args[0]);
    if (script == nullptr) {
        HILOG_ERROR("worker:: the file path is invaild, maybe path is null");
        ErrorHelper::ThrowError(env,
            ErrorHelper::ERR_WORKER_INVALID_FILEPATH, "the file path is invaild, maybe path is null.");
        return nullptr;
    }
    napi_wrap(
        env, thisVar, worker,
        // js对象准备gc时,终止worker
        [](napi_env env, void* data, void* hint) {
            Worker* worker = reinterpret_cast<Worker*>(data);
            {
                std::lock_guard<std::recursive_mutex> lock(worker->liveStatusLock_);
                if (worker->UpdateHostState(INACTIVE)) {
#if defined(ENABLE_WORKER_EVENTHANDLER)
                    if (!worker->isMainThreadWorker_) {
                        worker->CloseHostHandle();
                    }
#else
                    worker->CloseHostHandle();
#endif
                    worker->ReleaseHostThreadContent();
                }
                if (!worker->IsRunning()) {
                    HILOG_DEBUG("worker:: worker is not in running");
                    return;
                }
                worker->TerminateInner();
            }
        },
        nullptr, &worker->workerRef_);
    // worker线程启动执行
    worker->StartExecuteInThread(env, script);
    return thisVar;
}

```

启动worker线程

```
void Worker::StartExecuteInThread(napi_env env, const char* script)
{
    HILOG_INFO("worker:: Start execute in the thread!");
    // 1. init hostHandle in host loop
    uv_loop_t* loop = NapiHelper::GetLibUV(env);
    if (loop == nullptr) {
        ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_NOT_RUNNING, "engine loop is null");
        CloseHelp:eletePointer(script, true);
        return;
    }
    GetContainerScopeId(env);
//初始化uv句柄,宿主的loop handle
#if defined(ENABLE_WORKER_EVENTHANDLER)
    if (!OHOS::AppExecFwk::EventRunner::IsAppMainThread()) {
        isMainThreadWorker_ = false;
        InitHostHandle(loop);
    } else {
        HILOG_DEBUG("worker:: eventrunner should be nullptr if the current thread is not the main thread");
    }
#else
    InitHostHandle(loop);
#endif

    // 2. copy the script
    //寻找worker线程文件
    script_ = std::string(script);
    // isBundle : FA mode and BundlePack.
    bool isBundle = reinterpret_cast<NativeEngine*>(env)->GetIsBundle();
    // if worker file is packed in har, need find moduleName in hostVM, and concat new recordName.
    bool isHar = script_.find_first_of(PathHelper::NAME_SPACE_TAG) == 0;
    if ((isHar && script_.find(PathHelper:REFIX_BUNDLE) == std::string::npos) ||
        (!isBundle && script_.find_first_of(PathHelper:OINT_TAG) == 0)) {
        PathHelper::ConcatFileNameForWorker(env, script_, fileName_, isRelativePath_);
        HILOG_DEBUG("worker:: Concated worker recordName: %{public}s, fileName: %{public}s",
                    script_.c_str(), fileName_.c_str());
    }
    // check the path is vaild.
    bool isNormalizedOhmUrlPack = reinterpret_cast<NativeEngine*>(env)->GetIsNormalizedOhmUrlPack();
    if (!isNormalizedOhmUrlPack && !isBundle) {
        if (!PathHelper::CheckWorkerPath(env, script_, isHar, isRelativePath_)) {
            HILOG_ERROR("worker:: the file path is invaild, can't find the file : %{public}s.", script);
            CloseHelp:eletePointer(script, true);
            ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_INVALID_FILEPATH,
                "the file path is invaild, can't find the file.");
            return;
        }
    }

    // 3. create WorkerRunner to Execute
    if (!runner_) {
        // 创建WorkerRunner线程,线程启动后回调ExecuteThread
        runner_ = std::make_unique<WorkerRunner>(WorkerStartCallback(ExecuteInThread, this));
    }
    if (runner_) {
        //Worker线程启动
        runner_->Execute(); // start a new thread
    } else {
        HILOG_ERROR("runner_ is nullptr");
    }
    CloseHelp:eletePointer(script, true);
}
```

Worker线程创建后执行Executethread

```
void Worker::ExecuteInThread(const void* data)
{
    HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
    HILOG_INFO("worker:: Execute in the thread!");
    auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
    // 1. create a runtime, nativeengine
    napi_env workerEnv = nullptr;
    {
        std::lock_guard<std::recursive_mutex> lock(worker->liveStatusLock_);
        if (worker->HostIsStop()) {
            HILOG_ERROR("worker:: host thread is stop");
            CloseHelp:eletePointer(worker, false);
            return;
        }
        napi_env env = worker->GetHostEnv();
        //创建runtime,为worker创建ArkNativeEngine
        napi_create_runtime(env, &workerEnv);
        if (workerEnv == nullptr) {
            HILOG_ERROR("worker:: Worker create runtime error");
            ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_NOT_RUNNING, "Worker create runtime error");
            return;
        }
            // mark worker env is workerThread
reinterpret_cast<NativeEngine*>(workerEnv)->MarkWorkerThread();
        // for load balance in taskpool
        reinterpret_cast<NativeEngine*>(env)->IncreaseSubEnvCounter();

        worker->SetWorkerEnv(workerEnv);
    }
    //根据workerEnv得到worker线程的loop
    uv_loop_t* loop = worker->GetWorkerLoop();
    if (loop == nullptr) {
        HILOG_ERROR("worker:: Worker loop is nullptr");
        return;
    }

    // 2. add some preparation for the worker
    //执行worker初始化操作
    if (worker->repareForWorkerInstance()) {
        worker->workerOnMessageSignal_ = new uv_async_t;
        // worker loop,初始化workerOnMessageSignal_ handle,handle回调执行WorkerOnMessage
        uv_async_init(loop, worker->workerOnMessageSignal_, reinterpret_cast<uv_async_cb>(Worker::WorkerOnMessage));
        worker->workerOnMessageSignal_->data = worker;
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
        uv_async_init(loop, &worker->debuggerOnPostTaskSignal_, reinterpret_cast<uv_async_cb>(
            Worker::HandleDebuggerTask));
#endif
        //worker状态置为RUNNING
        worker->UpdateWorkerState(RUNNING);
        // in order to invoke worker send before subThread start
        uv_async_send(worker->workerOnMessageSignal_);
        HITRACE_HELPER_FINISH_TRACE;
        //运行事件循环
        // 3. start worker loop
        worker->Loop();
    } else {
        HILOG_ERROR("worker:: worker PrepareForWorkerInstance fail");
        worker->UpdateWorkerState(TERMINATED);
        HITRACE_HELPER_FINISH_TRACE;
    }
    // worker初始化失败或worker的loop结束,释放worker线程资源:删除监听、端口,清空worker消息队列
    worker->ReleaseWorkerThreadContent();
    std::lock_guard<std::recursive_mutex> lock(worker->liveStatusLock_);
    //如果宿主stop,直接释放worker;否则向宿主消息队列添加nullptr,宿主接收信号后关闭回调
    if (worker->HostIsStop()) {
        HILOG_INFO("worker:: host is stopped");
        CloseHelp:eletePointer(worker, false);
    } else {
        worker->ublishWorkerOverSignal();
    }
}

```

执行worker初始化操作

```
bool Worker:repareForWorkerInstance()
{
    std::string rawFileName = script_;
    uint8_t* scriptContent = nullptr;
    size_t scriptContentSize = 0;
    std::vector<uint8_t> content;
    std::string workerAmi;
    {
        std::lock_guard<std::recursive_mutex> lock(liveStatusLock_);
        if (HostIsStop()) {
            HILOG_INFO("worker:: host is stopped");
            return false;
        }
        // 1. init worker async func
        auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);

        auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
        // 2. init worker environment
#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
        workerEngine->SetDebuggerPostTaskFunc(
            std::bind(&Worker:ebuggerOnPostTask, this, std::placeholders::_1));
#endif
        if (!hostEngine->CallInitWorkerFunc(workerEngine)) {
            HILOG_ERROR("worker:: CallInitWorkerFunc error");
            return false;
        }
        // 3. get uril content
        if (isRelativePath_) {
            rawFileName = fileName_;
        }
        //worker线程文件
        if (!hostEngine->GetAbcBuffer(rawFileName.c_str(), &scriptContent, &scriptContentSize, content, workerAmi)) {
            HILOG_ERROR("worker:: GetAbcBuffer error");
            return false;
        }
    }
    // add timer interface
    Timer::RegisterTime(workerEnv_);
    HILOG_DEBUG("worker:: stringContent size is %{public}zu", scriptContentSize);
    napi_value execScriptResult = nullptr;
    //run actor,JSNApi执行worker线程文件
    napi_status status = napi_run_actor(workerEnv_, scriptContent, scriptContentSize,
        workerAmi.c_str(), &execScriptResult, const_cast<char*>(script_.c_str()));
    if (status != napi_ok || execScriptResult == nullptr) {
        // An exception occurred when running the script.
        HILOG_ERROR("worker:: run script exception occurs, will handle exception");
        HandleException();
        return false;
    }

    // 4. register worker name in DedicatedWorkerGlobalScope
    if (!uv_async_init_.empty()) {
        napi_value nameValue = nullptr;
        napi_create_string_utf8(workerEnv_, name_.c_str(), name_.length(), &nameValue);
        NapiHelper::SetNamePropertyInGlobal(workerEnv_, "name", nameValue);
    }
    return true;
}

```

# 消息传递

宿主线程向worker线程发送消息PostMessage

```
napi_value Worker:ostMessage(napi_env env, napi_callback_info cbinfo)
{
    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
    return CommonPostMessage(env, cbinfo, true);
}

napi_value Worker::CommonPostMessage(napi_env env, napi_callback_info cbinfo, bool cloneSendable)
{
    ......
    if (worker == nullptr || worker->IsTerminated() || worker->IsTerminating()) {
        HILOG_ERROR("worker:: worker is nullptr when PostMessage, maybe worker is terminated");
        WorkerThrowError(env, ErrorHelper::ERR_WORKER_NOT_RUNNING, "maybe worker is terminated when PostMessage");
        return nullptr;
    }
    //序列化数据
    MessageDataType data = nullptr;
    napi_status serializeStatus = napi_ok;
    bool defaultClone = cloneSendable ? true : false;
    napi_value undefined = NapiHelper::GetUndefinedValue(env);
    if (argc >= NUM_WORKER_ARGS) {
        if (!NapiHelper::IsArray(env, argv[1])) {
            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "transfer list must be an Array");
            return nullptr;
        }
        serializeStatus = napi_serialize_inner(env, argv[0], argv[1], undefined, false, defaultClone, &data);
    } else {
        serializeStatus = napi_serialize_inner(env, argv[0], undefined, undefined, false, defaultClone, &data);
    }
    if (serializeStatus != napi_ok || data == nullptr) {
        worker->HostOnMessageErrorInner();
        WorkerThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, "failed to serialize message.");
        return nullptr;
    }
    worker->ostMessageInner(data);
    return NapiHelper::GetUndefinedValue(env);
}

void Worker:ostMessageInner(MessageDataType data)
{
    if (IsTerminated()) {
        HILOG_DEBUG("worker:: worker has been terminated when PostMessageInner.");
        return;
    }
    // 加锁数据加入worker消息队列
    workerMessageQueue_.EnQueue(data);
    std::lock_guard<std::mutex> lock(workerOnmessageMutex_);
    if (workerOnMessageSignal_ != nullptr && uv_is_active((uv_handle_t*)workerOnMessageSignal_)) {
        //uv_async_send通知worker线程执行WorkerOnMessage
        uv_async_send(workerOnMessageSignal_);
    }
}

```

worker loop执行WorkerOnMessage

```
void Worker::WorkerOnMessage(const uv_async_t* req)
{
    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
    Worker* worker = static_cast<Worker*>(req->data);
    if (worker == nullptr) {
        HILOG_ERROR("worker::worker is null");
        return;
    }
    worker->WorkerOnMessageInner();
}

void Worker::WorkerOnMessageInner()
{
    if (IsTerminated()) {
        return;
    }
    napi_status status;
    napi_handle_scope scope = nullptr;
    status = napi_open_handle_scope(workerEnv_, &scope);
    if (status != napi_ok || scope == nullptr) {
        HILOG_ERROR("worker:: WorkerOnMessage open handle scope failed.");
        return;
    }
    MessageDataType data = nullptr;
    //worker线程没有终结且worker消息队列有数据,处理出列数据
    while (!IsTerminated() && workerMessageQueue_.DeQueue(&data)) {
        // nullptr表明宿主发送的是终止信号
        if (data == nullptr) {
            HILOG_DEBUG("worker:: worker reveive terminate signal");
            // Close handlescope need before TerminateWorker
            napi_close_handle_scope(workerEnv_, scope);
            TerminateWorker();
            return;
        }
        //反序列化数据
        napi_value result = nullptr;
        status = napi_deserialize(workerEnv_, data, &result);
        napi_delete_serialization_data(workerEnv_, data);
        if (status != napi_ok || result == nullptr) {
            WorkerOnMessageErrorInner();
            continue;
        }

        napi_value event = nullptr;
        napi_create_object(workerEnv_, &event);
        napi_set_named_property(workerEnv_, event, "data", result);
        napi_value argv[1] = { event };
        //调用worker线程文件中定义的onmessagee方法
        CallWorkerFunction(1, argv, "onmessage", true);

        napi_value obj = NapiHelper::GetReferenceValue(workerEnv_, this->workerPort_);
        ParentPortHandleEventListeners(workerEnv_, obj, 1, argv, "message", true);
    }
    napi_close_handle_scope(workerEnv_, scope);
}

```

# 关闭worker

```
napi_value Worker::Terminate(napi_env env, napi_callback_info cbinfo)
{
    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
    napi_value thisVar = nullptr;
    napi_get_cb_info(env, cbinfo, nullptr, nullptr, &thisVar, nullptr);
    Worker* worker = nullptr;
    napi_unwrap(env, thisVar, reinterpret_cast<void**>(&worker));
    if (worker == nullptr) {
        HILOG_ERROR("worker:: worker is nullptr when Terminate, maybe worker is terminated");
        WorkerThrowError(env, ErrorHelper::ERR_WORKER_NOT_RUNNING, "worker is nullptr when Terminate");
        return nullptr;
    }
    if (worker->IsTerminated() || worker->IsTerminating()) {
        HILOG_DEBUG("worker:: worker is not in running when Terminate");
        return nullptr;
    }
    worker->TerminateInner();
    return NapiHelper::GetUndefinedValue(env);
}

void Worker::TerminateInner()
{
    if (IsTerminated() || IsTerminating()) {
        HILOG_INFO("worker:: worker is not in running when TerminateInner");
        return;
    }
    //worker状态置为TERMINATEING
    // 1. Update State
    UpdateWorkerState(TERMINATEING);
    //发送nullptr信号,worker线程接收到后执行TerminateWorker
    // 2. send null signal
    PostMessageInner(nullptr);
}

```

# 总结

简单的worker的创建、消息向worker线程传递、销毁流程:

1.通过NAPI,ts层的worker.ThreadWorker、worker.workertPort和c++层的worker相互调用;

2.构造时创建worker实例,初始化宿主线程loop uv句柄;加载worker线程文件;

3‘创建并启动worker线程,worker状态置为RUNNING,运行worker loop事件循环;

4.ts层宿主调用ThreadWorker向PostMessage向worker线程传递消息,数据包装后加入worker线程消息队列,uv发送消息;

5.worker线程接收消息,从消息队列取数据,调用worker线程文件中的onmessage方法执行;

6.宿主调用terminate方法终结worker,worker状态置为TERMINATEING,nullptr加入消息队列,uv发送消息;

7.worker线程接收消息,从消息队列取数据为nullptr,worker线程关闭事件循环,worker状态置为TERMINATED。
无用

©著作权归作者所有,转载或内容合作请联系作者

您尚未登录,无法参与评论,登录后可以:
参与开源共建问题交流
认同或收藏高质量问答
获取积分成为开源共建先驱

Copyright   ©2023  OpenHarmony开发者论坛  京ICP备2020036654号-3 |技术支持 Discuz!

返回顶部