libgo源码剖析(2.libgo调度策略源码实现)
本文将从源码实现上对 libgo 的调度策略进行分析,主要涉及到上一篇文章中的三个结构体的定义:
为河北等地区用户提供了全套网页设计制作服务,及河北网站建设行业解决方案。主营业务为成都做网站、网站建设、河北网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!
- 调度器 Scheduler(简称 S)
- 执行器 Processer(简称 P)
- 协程 Task(简称 T)
三者的关系如下图所示:
本文会列出类内的主要成员和主要函数做以分析。
1. 协程调度器:class Scheduler
libgo/scheduler/scheduler.h
class Scheduler{
public:
/*
* 创建一个调度器,初始化 libgo
* 创建主线程的执行器,如果后续 STart 的时候没有参数,默认只有一个执行器去做
* 当仅使用一个线程进行协程调度时, 协程地执行会严格地遵循其创建顺序.
* */
static Scheduler* Create();
/*
* 创建一个协程 Task 对象,并添加到当前的执行器 processer 的任务队列中,
* 调度器的任务数 taskCount_ +1
* */
void CreateTask(TaskF const& fn, TaskOpt const& opt);
/* 启动调度器
* @minThreadNumber : 最小调度线程数, 为0时, 设置为cpu核心数.
* @maxThreadNumber : 最大调度线程数, 为0时, 设置为minThreadNumber.
* 如果maxThreadNumber大于minThreadNumber, 则当协程产生长时间阻塞时,
* 可以自动扩展调度线程数.
* 唤醒定时器线程
* 每个调度线程都会调用 Process 开始调度,最后开启 id 为 0 的调度线程
* 如果 maxThreadNumber_ > 1 的话,会开启调度线程 DispatcherThread
* */
void Start(int minThreadNumber = 1, int maxThreadNumber = 0);
/*
* 停止调度,停止后无法恢复, 仅用于安全退出main函数
* 如果某个调度线程被协程阻塞, 必须等待阻塞结束才能退出.
* */
void Stop();
private:
/*
* 调度线程,主要为平衡多个 processer 的负载将高负载或阻塞的 p 中的协程 steal 给低负载的 p
* 如果全部阻塞但是还有协程待执行,会起新线程,线程数不超过
maxThreadNumber_
* 会将阻塞 P 中的协程分摊给负载较少的 P
* */
void DispatcherThread();
/*
* 创建一个新的 Processer,并添加到双端队列 processers_ 中
* */
void NewProcessThread();
private:
atomic_t taskCount_{0}; // 用来统计协程数量
Deque processers_; // DispatcherThread双端队列,用来存放所有的执行器,每个执行器都会单独开一个线程去执行,线程中回调 Process() 方法。
LFLock started_; // libgo 提供的自选锁
};
调度器负责管理 1~N 个调度线程,每个调度线程一个执行器 Processer。调度器仅负责均衡各个执行器的负载,防止全部卡住的情况,并不涉及协程的切换等工作。
使用
ligbo提供了默认的协程调度器 co_sched
#define g_Scheduler ::co::Scheduler::getInstance()
#define co_sched g_Scheduler
用户也可以创建自己的协程调度器
co::Scheduler* my_sched = co::Scheduler::Create();
启动调度
std::thread t([my_sched]{mysched->Start();});
t.detach();
调度器原理
schedule 负责整个系统的协程调度,协程的运行依赖于执行器 Processer(简称 P),因此在调度器初始化的时候会选择创建 P 的数量(支持动态增长),所有的执行器会添加到双端队列中。主线程也作为一个执行器,在创建 Scheduler 对象的时候创建,位于双端队列下标为 0 的位置(注意:只是创建对象,并没有开始运行);
当调用了 Start() 函数后,会正式开始运行。在 Start 函数内部,会创建指定数量的执行器 P,具体数量取决于参数,默认会创建 minThreadNumber 个,当全部执行器都阻塞之后,会动态扩展,最多 maxThreadNumber 个执行器。每个执行器都会运行于一个单独的线程,执行器负责该线程内部协程的切换和执行;
当创建协程时,会将协程添加到某一个处于活跃状态的执行器,如果恰好都不活跃,也会添加到某一个 P 中,这并不影响执行器的正常工作,因为调度器的调度线程会去处理它;
Start 函数内部,除了上述执行器所在线程,还会开启调度线程 DispatcherThread,调度线程会平衡各个 P 的协程数量和负载,进行 steal,如果所有 P 都阻塞,会根据 maxThreadNumber 动态增加 P 的数量,如果仅仅部分 P 阻塞,会将阻塞的 P 中的协程全部拿出(steal),均摊到负载最小的 P 中;
Schedule 也会选择性开启协程的定时器线程;
- 开启 FastSteadyClock 线程。
关于定时器以及时钟的实现,会在之后的文章中讨论。
2. 协程执行器:class Processer
libgo/scheduler/processer.h
每个协程执行器对应一个线程,负责本线程的协程调度,但并非线程安全的,是协程调度的核心。
class Processer
{
public:
// 协程挂起标识,用于后续进行唤醒和超时判断
struct SuspendEntry {
// ...
};
// 协程切出
ALWAYS_INLINE static void StaticCoYield();
// 挂起当前协程
static SuspendEntry Suspend();
// 挂起当前协程, 并在指定时间后自动唤醒
static SuspendEntry Suspend(FastSteadyClock::duration dur);
// 唤醒协程
static bool Wakeup(SuspendEntry const& entry);
private:
/*
* 执行器对协程的调度,也是执行器所在现在的主处理逻辑
* */
void Process();
/*
* 从当前执行器中偷 n 个协程并返回
* n 为0则全部偷出来,否则取出相应的个数
* */
SList Steal(std::size_t n);
/*
* 给当前执行器打标记,用于检测协程是否阻塞
* */
void Mark();
private:
int id_; // 线程 id,与 shcedule 中的 _processer 下标对应
Scheduler * scheduler_; // 该执行器依赖的调度器
volatile bool active_ = true; // 该执行器的活跃状态,活跃表明该执行器未被阻塞,由调度器的调度线程控制
volatile int64_t markTick_ = 0; // mark 的时间戳
volatile uint64_t markSwitch_ = 0; // mark 的时候处于第几次协程调度
volatile uint64_t switchCount_ = 0; // 协程调度的次数
// 当前正在运行的协程
Task* runningTask_{nullptr};
Task* nextTask_{nullptr};
// 协程队列
typedef TSQueue TaskQueue;
TaskQueue runnableQueue_; // 运行协程队列
TaskQueue waitQueue_; // 等待协程队列
TSQueue gcQueue_; // 待回收的协程队列,协程运行完毕之后,会被添加到该队列中,等待回收
TaskQueue newQueue_; // 新添加到该执行器中的协程,包括刚刚 steal 过来的协程,该队列中的协程暂不会执行,会由 Process() 函数将该队列中的协程不断添加到 runnableQueue_ 中
volatile uint64_t switchCount_ = 0; // 协程调度的次数
// 执行器等待的条件变量
std::mutex cvMutex_;
std::condition_variable cv_;
std::atomic_bool waiting_{false};
};
执行器对协程的调度 Process()
执行器 Processer 维护了三个线程安全的协程队列:
- runnableQueue_:可运行协程队列;
- waitQueue_:存放挂起的协程;
- newQueue_:该队列中存放的是新加入的协程,包括新创建的协程,唤醒挂起的协程,还有 steal 来的协程;
void Processer::Process()
{
GetCurrentProcesser() = this;
bool & isStop = *stop_;
while (!isStop)
{
runnableQueue_.front(runningTask_);
// 获取一个可以运行对协程对象
if (!runningTask_) {
if (AddNewTasks())
runnableQueue_.front(runningTask_);
if (!runningTask_) {
WaitCondition(); // 没有可以执行的协程,wait 条件变量
AddNewTasks();
continue;
}
}
addNewQuota_ = 1;
while (runningTask_ && !isStop) {
runningTask_->state_ = TaskState::runnable;
runningTask_->proc_ = this;
++switchCount_;
runningTask_->SwapIn();
switch (runningTask_->state_) {
case TaskState::runnable:
{
std::unique_lock lock(runnableQueue_.LockRef());
auto next = (Task*)runningTask_->next;
if (next) {
runningTask_ = next;
runningTask_->check_ = runnableQueue_.check_;
break;
}
if (addNewQuota_ < 1 || newQueue_.emptyUnsafe()) {
runningTask_ = nullptr;
} else {
lock.unlock();
if (AddNewTasks()) {
runnableQueue_.next(runningTask_, runningTask_);
-- addNewQuota_;
} else {
std::unique_lock lock2(runnableQueue_.LockRef());
runningTask_ = nullptr;
}
}
}
break;
case TaskState::block:
{
std::unique_lock lock(runnableQueue_.LockRef());
runningTask_ = nextTask_;
nextTask_ = nullptr;
}
break;
case TaskState::done:
default:
{
runnableQueue_.next(runningTask_, nextTask_);
if (!nextTask_ && addNewQuota_ > 0) {
if (AddNewTasks()) {
runnableQueue_.next(runningTask_, nextTask_);
-- addNewQuota_;
}
}
DebugPrint(dbg_task, "task(%s) done.", runningTask_->DebugInfo());
runnableQueue_.erase(runningTask_);
if (gcQueue_.size() > 16) // 执行完毕的协程,需要回收资源
GC();
gcQueue_.push(runningTask_);
if (runningTask_->eptr_) {
std::exception_ptr ep = runningTask_->eptr_;
std::rethrow_exception(ep);
}
std::unique_lock lock(runnableQueue_.LockRef());
runningTask_ = nextTask_;
nextTask_ = nullptr;
}
break;
}
}
}
}
在调度器 Schedule 执行 Stop() 函数之前,执行器 P 会一直处于调度协程阶段 Process()。在期间,执行器 P 会将运行队列 runnableQueue 中的第一个协程获取进行执行,如果可运行队列为空,执行器会尝试将处于 newQueue 中的协程添加到可运行队列中去,如果 newQueue_ 为空,说明此时该执行器处于无协程可调度状态,通过设置条件变量,将执行器设置为等待状态;
当获取到一个可执行协程之后,会执行该协程的任务。协程的执行流程是通过状态机来实现的。(协程有三个状态:运行中,阻塞,执行完毕)
- 对于运行中的协程,我们只需要确定下一个要执行的协程对象即可;
- 对于阻塞的协程,只有当协程挂起时(调用了 Suspend 方法),状态才会切换到这里,因此,这时候只需要去执行 nextTask 即可;
- 对于运行完毕的协程,只有当 Task 处理函数执行完成之后,状态才会切换到这里,因此,需要考虑对该协程资源进行回收;
条件变量
Processer 使用了 std::mutex,并且提供了条件变量用来唤醒。当调度器尝试获取下一个可运行的协程对象时,若此时无可用协程对象,就会主动去等待该条件变量,默认100毫秒的超时时间。
void Processer::WaitCondition()
{
GC();
std::unique_lock lock(cvMutex_);
waiting_ = true;
cv_.wait_for(lock, std::chrono::milliseconds(100));
waiting_ = false;
}
void Processer::NotifyCondition()
{
cv_.notify_all();
}
当调度器向该执行器中增加了新的协程对象时,会唤醒该条件变量,继续执行 Process 流程。使用条件变量唤醒的效率,要远远高于不断去轮询。
为什么在使用了条件变量后还要设置超时时间,定时轮询,即使条件变量没有被唤醒也希望它返回呢?
因为我们不希望线程会在这里阻塞,只要没有新的协程加入,就一直在死等。我们希望线程在等待的同时,也可以定时跳出,执行一些其它的检测工作等。
从执行器中偷指定数量的协程出来 -> steal()
简单来说,从执行器中取协程出来,就是从执行器维护的双端队列中获取执行个数的结点。
为什么要取出来?前面提到过,要么该执行器负载过大,要么该执行器处于阻塞的状态。
SList Processer::Steal(std::size_t n)
{
if (n > 0) {
// steal 指定个数协程
newQueue_.AssertLink();
auto slist = newQueue_.pop_back(n);
newQueue_.AssertLink();
if (slist.size() >= n)
return slist;
std::unique_lock lock(runnableQueue_.LockRef());
bool pushRunningTask = false, pushNextTask = false;
if (runningTask_)
pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_);
if (nextTask_)
pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_);
auto slist2 = runnableQueue_.pop_backWithoutLock(n - slist.size());
if (pushRunningTask)
runnableQueue_.pushWithoutLock(runningTask_);
if (pushNextTask)
runnableQueue_.pushWithoutLock(nextTask_);
lock.unlock();
slist2.append(std::move(slist));
if (!slist2.empty())
DebugPrint(dbg_scheduler, "Proc(%d).Stealed = %d", id_, (int)slist2.size());
return slist2;
} else {
// steal all
newQueue_.AssertLink();
auto slist = newQueue_.pop_all();
newQueue_.AssertLink();
std::unique_lock lock(runnableQueue_.LockRef());
bool pushRunningTask = false, pushNextTask = false;
if (runningTask_)
pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_);
if (nextTask_)
pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_);
auto slist2 = runnableQueue_.pop_allWithoutLock();
if (pushRunningTask)
runnableQueue_.pushWithoutLock(runningTask_);
if (pushNextTask)
runnableQueue_.pushWithoutLock(nextTask_);
lock.unlock();
slist2.append(std::move(slist));
if (!slist2.empty())
DebugPrint(dbg_scheduler, "Proc(%d).Stealed all = %d", id_, (int)slist2.size());
return slist2;
}
}
首先,会从 newQueue 队列中获取协程结点,因为 newQueue 中的结点还没有添加到运行队列中,因此可以直接取出;如果 newQueue 中协程数量不足,会从 runnableQueue 队列尾部中继续获取结点。由于 runnableQueue 队列中我们记录了正在执行的协程和下一次将执行的协程(runningTask & nextTask),需要特殊处理。在从 runnableQueue 偷协程之前,会将 runningTask & nextTask 从队列删除,待偷完结点之后再次添加到当前 runnableQueue_ 队列中。
简单说,偷协程的工作,不会从队列中获取到 runningTask & nextTask 标识的协程。
阻塞判断
void Processer::Mark()
{
if (runningTask_ && markSwitch_ != switchCount_) {
markSwitch_ = switchCount_;
markTick_ = NowMicrosecond();
}
}
uint32_t cycle_timeout_us = 10 * 1000;
bool Processer::IsBlocking()
{
if (!markSwitch_ || markSwitch_ != switchCount_) return false;
return NowMicrosecond() > markTick_ + CoroutineOptions::getInstance().cycle_timeout_us;
}
Mark 函数会在调度器的调度函数中被调用,需要注意的是,只有执行器处于活跃状态时才会调用。Mark 顾名思义,是给该执行打标记,会记录mark的时间戳,并记录下是在第多少次协程调度的过程中做了标记,Mark 的作用是用来进行执行器的阻塞检测。
处于活跃状态的执行器,总是在执行着协程的切换,因此,会不断自增 switchCount_ 的值,根据 IsBlocking 函数得知,当我们此时标签记录的协程调度次数超过10ms没有发生改变,我们认为该执行器发生阻塞,Scheduler 会进行 Steal 操作。
协程挂起 Suspend
static SuspendEntry Suspend();
一种方式是直接挂起,会将该协程状态转换为 TaskState::block,然后将该协程从 runnableQueue 中删除,再添加到 waitQueue 中;
另外一种方式是挂起之后(第一种方式执行完毕之后),允许配置一个时间段之后去自动唤醒该协程。
wakeup
用于唤醒协程
唤醒协程要做的,就是讲待唤醒的协程从 waitQueue_ 中删除并重新添加到 newQueue_中去。
StaticCoYield
用于在一个执行器中切出当前协程
有两种可能,一种是协程被阻塞需要挂起;另外一种是协程执行完毕,主动切出。
具体实现是通过获取当前执行器正在执行的协程 Task,调用 SwapOut() 方法实现。
ALWAYS_INLINE void Processer::StaticCoYield()
{
auto proc = GetCurrentProcesser();
if (proc) proc->CoYield();
}
ALWAYS_INLINE void Processer::CoYield()
{
Task *tk = GetCurrentTask();
assert(tk);
++ tk->yieldCount_;
#if ENABLE_DEBUGGER
DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_));
if (Listener::GetTaskListener())
Listener::GetTaskListener()->onSwapOut(tk->id_);
#endif
tk->SwapOut();
}
几个需要注意的问题
> 可能会切出协程上下文的几种情况:
- 协程被挂起;
- 协程执行完毕;
- 用户主动切出 co_yield。
#define co_yield do { ::co::Processer::StaticCoYield(); } while (0)
> 协程被挂起的几种情况:
- 系统函数被 hook;
- libgo_poll (被 hook 的 io 操作函数会调用 libgo_poll 实现切换)
- select
- sleep、usleep、nanosleep
- 调用了协程锁 CoMutex(co_mutex),协程读写锁 CoRWMutex(co_rwmutex),或者使用了 channel。
> 切入协程上下文的几种情况:
- 执行器在调度(Process)期间;
- 唤醒挂起协程不会切入上下文,只是从等待队列中重新添加到 newQueue_。
3. 协程对象:struct Task
# 协程状态
enum class TaskState
{
runnable, // 可运行
block, // 阻塞
done, // 协程运行完毕
};
typedef std::function TaskF; // c++11提供的函数模板
struct Task
{
TaskState state_ = TaskState::runnable;
uint64_t id_; // 当前调度器下协程编号,从0开始
TaskF fn_; // 协程运行的函数
uint64_t yieldCount_ = 0; // 协程切出的次数
Context ctx_; // 上下文信息
Processer* proc_ = nullptr; // 归属于哪个执行器
// 提供了协程切入、切出、切换到指定线程三个函数
ALWAYS_INLINE void SwapIn();
ALWAYS_INLINE void SwapTo(Task* other);
ALWAYS_INLINE void SwapOut();
private:
static void StaticRun(intptr_t vp); // 参数为 Task*,函数会去执行该 Task 的 fn_(),执行完毕后,协程状态改为 TaskState::done,并在执行器 P 中切出
};
每个 Task 对象是一个协程,在使用过程中,创建一个协程实际就是创建了一个 Task 对象,再添加到对应的执行器 P 中。之前提到过,执行器进行协程调度是通过一个状态机来实现的,这里的 TaskState 就是协程状态,协程函数 fn_ 会在 StaticRun 静态方法中调用,该静态方法注册到了协程上下文 _ctx 中。
除此之外,Task 类内部,也提供了协程的切入切出方法,本质也是调用了上下文的切换。
StaticRun
控制协程的运行,内部调用了 Task::Run() 方法,会在协程函数 fn_ 执行完毕之后,将协程状态转换为 TaskState::done,并将协程切出。
void Task::Run()
{
auto call_fn = [this]() {
this->fn_();
this->fn_ = TaskF(); //让协程function对象的析构也在协程中执行
};
\\ ...
call_fn();
\\ ...
state_ = TaskState::done;
Processer::StaticCoYield();
}
void Task::StaticRun(intptr_t vp)
{
Task* tk = (Task*)vp;
tk->Run();
}
这里就是对 libgo 调度相关实现的描述,本文跳过了对定时器和时钟部分的实现,这个会在之后单独叙述。本文涉及到的代码在源码目录下的
libgo-master/libgo/scheduler/processer.cpp
libgo-master/libgo/scheduler/processer.h
libgo-master/libgo/scheduler/scheduler.cpp
libgo-master/libgo/scheduler/scheduler.h
有兴趣的读者可以对照源码学习,欢迎讨论学习
网站题目:libgo源码剖析(2.libgo调度策略源码实现)
网页地址:http://pwwzsj.com/article/jeiipj.html