当前位置: 首页 > article >正文

async++源码阅读——task模块

1、task_base.h

本人将自己的理解以注释的形式添加的代码中,方便需要的时候重新复习。该文件中用到的一些技术:

  • 该文件中的类并没有使用virtual,而是自定义了需函数表,但是并没有放到每个对象的开始位置,而是通过指针进行存取。
  • EBO技术,空类对象本来就占用一个字节的空间,为了节省空间,EBO就是利用这一个字节的空间存储信息
  • 对类成员指定内存对齐方式
  • 该框架中将一个任务设计成了func和result的组合,task_result用于保存任务的结果而func_holder保存真正执行的函数,而task_func是两者的组合
  • template<typename Func, typename = void>,用法在博客中进行了详细介绍
  • 这个框架中用到了大量的元编程,感觉很牛逼,本人借助大模型勉强看懂,不知道什么时候能像作者一样,可以熟练的设计一套这样的框架。
// 该枚举是任务的状态,其中比较难理解的是unwrapped
// unwrapped 表示任务处于分解的状态,该任务被分解成了子任务
// completed 和 canceled都表示任务结束状态,一个是正常完成状态,一个是异常取消状态
enum class task_state: unsigned char {
	pending, // Task has not completed yet
	locked, // Task is locked (used by event_task to prevent double set)
	unwrapped, // Task is waiting for an unwrapped task to finish
	completed, // Task has finished execution and a result is available
	canceled // Task has been canceled and an exception is available
};

// Determine whether a task is in a final state
// 判断任务是不是执行完毕
inline bool is_finished(task_state s)
{
	return s == task_state::completed || s == task_state::canceled;
}

// Virtual function table used to allow dynamic dispatch for task objects.
// While this is very similar to what a compiler would generate with virtual
// functions, this scheme was found to result in significantly smaller
// generated code size.
// 自定义的虚函数表
struct task_base_vtable {
	// Destroy the function and result
	void (*destroy)(task_base*) LIBASYNC_NOEXCEPT;

	// Run the associated function
	void (*run)(task_base*) LIBASYNC_NOEXCEPT;

	// Cancel the task with an exception
	void (*cancel)(task_base*, std::exception_ptr&&) LIBASYNC_NOEXCEPT;

	// Schedule the task using its scheduler
	void (*schedule)(task_base* parent, task_ptr t);
};

// Type-generic base task object
// 任务基类
struct task_base_deleter;
struct LIBASYNC_CACHELINE_ALIGN task_base: public ref_count_base<task_base, task_base_deleter> {
	// Task state
	std::atomic<task_state> state;

	// Whether get_task() was already called on an event_task
	// 在event类型的任务中使用
	bool event_task_got_task;

	// Vector of continuations
	// 这个任务的后续任务
	continuation_vector continuations;

	// Virtual function table used for dynamic dispatch
	const task_base_vtable* vtable;

	// Use aligned memory allocation
	static void* operator new(std::size_t size)
	{
		return aligned_alloc(size, LIBASYNC_CACHELINE_SIZE);
	}
	static void operator delete(void* ptr)
	{
		aligned_free(ptr);
	}

	// Initialize task state
	task_base()
		: state(task_state::pending) {}

	// Check whether the task is ready and include an acquire barrier if it is
	// 检查任务是否完成,完成返回true,否则返回false
	bool ready() const
	{
		return is_finished(state.load(std::memory_order_acquire));
	}

	// Run a single continuation
	template<typename Sched>
	void run_continuation(Sched& sched, task_ptr&& cont)
	{
		LIBASYNC_TRY {
			detail::schedule_task(sched, cont);
		} LIBASYNC_CATCH(...) {
			// This is suboptimal, but better than letting the exception leak
			cont->vtable->cancel(cont.get(), std::current_exception());
		}
	}

	// Run all of the task's continuations after it has completed or canceled.
	// The list of continuations is emptied and locked to prevent any further
	// continuations from being added.
	// 任务执行完后,执行延续任务,执行的时候会锁定队列
	void run_continuations()
	{
		continuations.flush_and_lock([this](task_ptr t) {
			const task_base_vtable* vtable_ptr = t->vtable;
			vtable_ptr->schedule(this, std::move(t));
		});
	}

	// Add a continuation to this task
	// 这个调度器参数只有在当前的任务完成时才有作用
	template<typename Sched>
	void add_continuation(Sched& sched, task_ptr cont)
	{
		// Check for task completion
		// 当前任务还没有执行完,将延续任务添加到容器
		// 否则,立马执行延续任务
		task_state current_state = state.load(std::memory_order_relaxed);
		if (!is_finished(current_state)) {
			// Try to add the task to the continuation list. This can fail only
			// if the task has just finished, in which case we run it directly.
			if (continuations.try_add(std::move(cont)))
				return;
		}

		// Otherwise run the continuation directly
		std::atomic_thread_fence(std::memory_order_acquire);
		run_continuation(sched, std::move(cont));
	}

	// Finish the task after it has been executed and the result set
	// 当前任务结束时需要调用的函数,当前任务结束后,执行后续任务
	void finish()
	{
		state.store(task_state::completed, std::memory_order_release);
		run_continuations();
	}

	// Wait for the task to finish executing
	// 等待当前任务执行结束
	task_state wait()
	{
		task_state s = state.load(std::memory_order_acquire);
		if (!is_finished(s)) {
			wait_for_task(this);
			s = state.load(std::memory_order_relaxed);
		}
		return s;
	}
};

// Deleter for task_ptr
struct task_base_deleter {
	static void do_delete(task_base* p)
	{
		// Go through the vtable to delete p with its proper type
		p->vtable->destroy(p);
	}
};

// Result type-specific task object
// 定义了一个可以持有任务结果的任务
template<typename Result>
struct task_result_holder: public task_base {
	union {
		alignas(Result) std::uint8_t result[sizeof(Result)];
		alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];

		// Scheduler that should be used to schedule this task. The scheduler
		// type has been erased and is held by vtable->schedule.
		void* sched;
	};

	template<typename T>
	void set_result(T&& t)
	{
		new(&result) Result(std::forward<T>(t));
	}

	// Return a result using an lvalue or rvalue reference depending on the task
	// type. The task parameter is not used, it is just there for overload resolution.
	// 这里的参数没有使用,作用就是用来指明重载版本
	template<typename T>
	Result&& get_result(const task<T>&)
	{
		return std::move(*reinterpret_cast<Result*>(&result));
	}
	template<typename T>
	const Result& get_result(const shared_task<T>&)
	{
		return *reinterpret_cast<Result*>(&result);
	}

	// Destroy the result
	~task_result_holder()
	{
		// Result is only present if the task completed successfully
		if (state.load(std::memory_order_relaxed) == task_state::completed)
			reinterpret_cast<Result*>(&result)->~Result();
	}
};

// Specialization for references
// 特化版本
template<typename Result>
struct task_result_holder<Result&>: public task_base {
	union {
		// Store as pointer internally
		Result* result;
		alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];
		void* sched;
	};

	void set_result(Result& obj)
	{
		result = std::addressof(obj);
	}

	template<typename T>
	Result& get_result(const task<T>&)
	{
		return *result;
	}
	template<typename T>
	Result& get_result(const shared_task<T>&)
	{
		return *result;
	}
};

// Specialization for void
template<>
struct task_result_holder<fake_void>: public task_base {
	union {
		alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];
		void* sched;
	};

	void set_result(fake_void) {}

	// Get the result as fake_void so that it can be passed to set_result and
	// continuations
	template<typename T>
	fake_void get_result(const task<T>&)
	{
		return fake_void();
	}
	template<typename T>
	fake_void get_result(const shared_task<T>&)
	{
		return fake_void();
	}
};
// 外层类,持有结果的任务类
template<typename Result>
struct task_result: public task_result_holder<Result> {
	// Virtual function table for task_result
	static const task_base_vtable vtable_impl;
	task_result()
	{
		this->vtable = &vtable_impl;
	}

	// Destroy the exception
	~task_result()
	{
		// Exception is only present if the task was canceled
		if (this->state.load(std::memory_order_relaxed) == task_state::canceled)
			reinterpret_cast<std::exception_ptr*>(&this->except)->~exception_ptr();
	}

	// Cancel a task with the given exception
	// 取消任务,并且设置指定异常
	void cancel_base(std::exception_ptr&& except_)
	{
		set_exception(std::move(except_));
		this->state.store(task_state::canceled, std::memory_order_release);
		this->run_continuations();
	}

	// Set the exception value of the task
	void set_exception(std::exception_ptr&& except_)
	{
		new(&this->except) std::exception_ptr(std::move(except_));
	}

	// Get the exception a task was canceled with
	std::exception_ptr& get_exception()
	{
		return *reinterpret_cast<std::exception_ptr*>(&this->except);
	}

	// Wait and throw the exception if the task was canceled
	// 该框架中任务的取消都是因为设置了异常
	void wait_and_throw()
	{
		if (this->wait() == task_state::canceled)
			LIBASYNC_RETHROW_EXCEPTION(get_exception());
	}

	// Delete the task using its proper type
	// 因为是union,因此只需要销毁Result即可
	static void destroy(task_base* t) LIBASYNC_NOEXCEPT
	{
		delete static_cast<task_result<Result>*>(t);
	}
};
template<typename Result>
const task_base_vtable task_result<Result>::vtable_impl = {
	task_result<Result>::destroy, // destroy
	nullptr, // run
	nullptr, // cancel
	nullptr // schedule
};

// Class to hold a function object, with empty base class optimization
// 这是一个具有默认类型 void 的模板参数,通常用来配合 SFINAE 来决定是否启用某个特定的模板实例化
template<typename Func, typename = void>
struct func_base {
	Func func;

	template<typename F>
	explicit func_base(F&& f)
		: func(std::forward<F>(f)) {}
	Func& get_func()
	{
		return func;
	}
};
// 特化版本
// 这种特化通过 空基类优化(EBO)减少了内存开销。
// 对于没有成员变量的类型(如空结构体),通过将 Func 对象直接存储在func_base 的内部内存中,可以避免额外的内存分配。
template<typename Func>
struct func_base<Func, typename std::enable_if<std::is_empty<Func>::value>::type> {
	template<typename F>
	explicit func_base(F&& f)
	{
		new(this) Func(std::forward<F>(f));
	}
	~func_base()
	{
		get_func().~Func();
	}
	Func& get_func()
	{
		return *reinterpret_cast<Func*>(this);
	}
};

// Class to hold a function object and initialize/destroy it at any time
template<typename Func, typename = void>
struct func_holder {
	alignas(Func) std::uint8_t func[sizeof(Func)];

	Func& get_func()
	{
		return *reinterpret_cast<Func*>(&func);
	}
	template<typename... Args>
	void init_func(Args&&... args)
	{
		new(&func) Func(std::forward<Args>(args)...);
	}
	void destroy_func()
	{
		get_func().~Func();
	}
};
template<typename Func>
struct func_holder<Func, typename std::enable_if<std::is_empty<Func>::value>::type> {
	Func& get_func()
	{
		return *reinterpret_cast<Func*>(this);
	}
	template<typename... Args>
	void init_func(Args&&... args)
	{
		new(this) Func(std::forward<Args>(args)...);
	}
	void destroy_func()
	{
		get_func().~Func();
	}
};

// Task object with an associated function object
// Using private inheritance so empty Func doesn't take up space
// 这个结构体才是拥有func、任务状态、任务结果的结构体
template<typename Sched, typename Func, typename Result>
struct task_func: public task_result<Result>, func_holder<Func> {
	// Virtual function table for task_func
	static const task_base_vtable vtable_impl;
	template<typename... Args>
	explicit task_func(Args&&... args)
	{
		this->vtable = &vtable_impl;
		this->init_func(std::forward<Args>(args)...);
	}

	// Run the stored function
	// 执行对应的任务,其实t指向的是task_func<Sched, Func, Result>类型
	// Func在编译的时候也已经确定,执行任务t,并将结果放到t中(只不过这部分由上层实现,模板使用方)
	static void run(task_base* t) LIBASYNC_NOEXCEPT
	{
		LIBASYNC_TRY {
			// Dispatch to execution function
			static_cast<task_func<Sched, Func, Result>*>(t)->get_func()(t);
		} LIBASYNC_CATCH(...) {
			cancel(t, std::current_exception());
		}
	}

	// Cancel the task
	static void cancel(task_base* t, std::exception_ptr&& except) LIBASYNC_NOEXCEPT
	{
		// Destroy the function object when canceling since it won't be
		// used anymore.
		static_cast<task_func<Sched, Func, Result>*>(t)->destroy_func();
		static_cast<task_func<Sched, Func, Result>*>(t)->cancel_base(std::move(except));
	}

	// Schedule a continuation task using its scheduler
	// 当当前任务执行完后,执行任务t
	static void schedule(task_base* parent, task_ptr t)
	{
		void* sched = static_cast<task_func<Sched, Func, Result>*>(t.get())->sched;
		parent->run_continuation(*static_cast<Sched*>(sched), std::move(t));
	}

	// Free the function
	~task_func()
	{
		// If the task hasn't completed yet, destroy the function object. Note
		// that an unwrapped task has already destroyed its function object.
		if (this->state.load(std::memory_order_relaxed) == task_state::pending)
			this->destroy_func();
	}

	// Delete the task using its proper type
	static void destroy(task_base* t) LIBASYNC_NOEXCEPT
	{
		delete static_cast<task_func<Sched, Func, Result>*>(t);
	}
};
template<typename Sched, typename Func, typename Result>
const task_base_vtable task_func<Sched, Func, Result>::vtable_impl = {
	task_func<Sched, Func, Result>::destroy, // destroy
	task_func<Sched, Func, Result>::run, // run
	task_func<Sched, Func, Result>::cancel, // cancel
	task_func<Sched, Func, Result>::schedule // schedule
};

// Helper functions to access the internal_task member of a task object, which
// avoids us having to specify half of the functions in the detail namespace
// as friend. Also, internal_task is downcast to the appropriate task_result<>.
template<typename Task>
typename Task::internal_task_type* get_internal_task(const Task& t)
{
	return static_cast<typename Task::internal_task_type*>(t.internal_task.get());
}
template<typename Task>
void set_internal_task(Task& t, task_ptr p)
{
	t.internal_task = std::move(p);
}

// Common code for task unwrapping
template<typename Result, typename Child>
struct unwrapped_func {
	explicit unwrapped_func(task_ptr t)
		: parent_task(std::move(t)) {}
	// 这个函数在子任务执行完成的时候执行,主要的作用是将子任务的执行结果和父任务关联,设置到父任务中去。
	void operator()(Child child_task) const
	{
		// Forward completion state and result to parent task
		task_result<Result>* parent = static_cast<task_result<Result>*>(parent_task.get());
		LIBASYNC_TRY {
			if (get_internal_task(child_task)->state.load(std::memory_order_relaxed) == task_state::completed) {
				parent->set_result(get_internal_task(child_task)->get_result(child_task));
				parent->finish();
			} else {
				// We don't call the generic cancel function here because
				// the function of the parent task has already been destroyed.
				parent->cancel_base(std::exception_ptr(get_internal_task(child_task)->get_exception()));
			}
		} LIBASYNC_CATCH(...) {
			// If the copy/move constructor of the result threw, propagate the exception
			parent->cancel_base(std::current_exception());
		}
	}
	task_ptr parent_task;
};
// Sched:调度器类型
// Result:父任务的结果类型
// Func:父任务执行的函数类型
// Child:子任务类型
// 该函数的主要作用是将设置父任务的状态,并将父任务和子任务结果
template<typename Sched, typename Result, typename Func, typename Child>
void unwrapped_finish(task_base* parent_base, Child child_task)
{
	// Destroy the parent task's function since it has been executed
	// 执行到这里父任务已经执行完毕,只需要让其等待子任务执行的结果
	parent_base->state.store(task_state::unwrapped, std::memory_order_relaxed);
	static_cast<task_func<Sched, Func, Result>*>(parent_base)->destroy_func();

	// Set up a continuation on the child to set the result of the parent
	LIBASYNC_TRY {
		parent_base->add_ref();
		// 设置子任务的延续,使得子任务完成后能够通过 unwrapped_func传递结果给父任务。then 是一个常见的延续函数,它会在子任务完成时触发
		child_task.then(inline_scheduler(), unwrapped_func<Result, Child>(task_ptr(parent_base)));
	} LIBASYNC_CATCH(...) {
		// Use cancel_base here because the function object is already destroyed.
		static_cast<task_result<Result>*>(parent_base)->cancel_base(std::current_exception());
	}
}

// Execution functions for root tasks:
// - With and without task unwraping
// Sched:调度器类型,定义任务调度的策略。
// Result:任务的返回类型,即执行该任务后得到的结果类型。
// Func:任务执行的函数类型或可调用对象类型。
// Unwrap:布尔类型参数,决定任务是否会进行解包(unwrap)
// 我理解poerator()参数中task_base中static_cast<task_func<Sched, root_exec_func, Result>*>(t)指向的
// Func对象,就是的当前this的this->get_func()
template<typename Sched, typename Result, typename Func, bool Unwrap>
struct root_exec_func: private func_base<Func> {
	template<typename F>
	explicit root_exec_func(F&& f)
		: func_base<Func>(std::forward<F>(f)) {}
	void operator()(task_base* t)
	{
		static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func())));
		static_cast<task_func<Sched, root_exec_func, Result>*>(t)->destroy_func();
		t->finish();
	}
};
// 特化版本,需要任务分包,其中构造函数中的任务是子任务,operator中的参数是父任务
// 会将子任务的结果设置给父任务
// 我理解poerator()参数中task_base中static_cast<task_func<Sched, root_exec_func, Result>*>(t)指向的
// Func对象是父任务,而this->get_func()是分解出来的任务
template<typename Sched, typename Result, typename Func>
struct root_exec_func<Sched, Result, Func, true>: private func_base<Func> {
	template<typename F>
	explicit root_exec_func(F&& f)
		: func_base<Func>(std::forward<F>(f)) {}
	void operator()(task_base* t)
	{
		unwrapped_finish<Sched, Result, root_exec_func>(t, std::move(this->get_func())());
	}
};

// Execution functions for continuation tasks:
// - With and without task unwraping
// - For void, value-based and task-based continuations
// Sched:调度器类型,决定任务如何调度。
// Parent:父任务类型,即当前任务的前驱任务。
// Result:父任务的结果类型,后续任务会使用该结果。
// Func:后续任务执行的函数类型(即任务的函数体)。
// ValueCont 表示后续任务是否需要父任务的返回值做参数
// Unwrap:用于指示是否需要解包父任务的结果。true 表示需要解包,false 表示不需要解包。
template<typename Sched, typename Parent, typename Result, typename Func, typename ValueCont, bool Unwrap>
struct continuation_exec_func: private func_base<Func> {
	template<typename F, typename P>
	continuation_exec_func(F&& f, P&& p)
		: func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}
	void operator()(task_base* t)
	{
		static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), std::move(parent)));
		static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();
		t->finish();
	}
	Parent parent;
};
// 当任务有值返回,并且不需要解包时,执行函数根据父任务的结果设置当前任务的结果。如果父任务已被取消,则取消当前任务。
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::true_type, false>: private func_base<Func> {
	template<typename F, typename P>
	continuation_exec_func(F&& f, P&& p)
		: func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}
	void operator()(task_base* t)
	{
		if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)
			task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));
		else {
			static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), get_internal_task(parent)->get_result(parent)));
			static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();
			t->finish();
		}
	}
	Parent parent;
};

template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, fake_void, false>: private func_base<Func> {
	template<typename F, typename P>
	continuation_exec_func(F&& f, P&& p)
		: func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}
	void operator()(task_base* t)
	{
		if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)
			task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));
		else {
			static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), fake_void()));
			static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();
			t->finish();
		}
	}
	Parent parent;
};
// 当需要解包父任务的结果时,且父任务没有值返回时,执行函数将父任务的结果传递给后续任务。
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::false_type, true>: private func_base<Func> {
	template<typename F, typename P>
	continuation_exec_func(F&& f, P&& p)
		: func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}
	void operator()(task_base* t)
	{
		unwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), std::move(parent)));
	}
	Parent parent;
};
// 需要解包且有返回值的情况。它会解包父任务的结果,并使用该结果来设置当前任务的结果。
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::true_type, true>: private func_base<Func> {
	template<typename F, typename P>
	continuation_exec_func(F&& f, P&& p)
		: func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}
	void operator()(task_base* t)
	{
		if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)
			task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));
		else
			unwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), get_internal_task(parent)->get_result(parent)));
	}
	Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, fake_void, true>: private func_base<Func> {
	template<typename F, typename P>
	continuation_exec_func(F&& f, P&& p)
		: func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}
	void operator()(task_base* t)
	{
		if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)
			task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));
		else
			unwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), fake_void()));
	}
	Parent parent;
};

2、task.h

废话不多说,直接看注释:

namespace detail {

// Common code for task and shared_task
// basic_task 主要对task_base进行了封装,
template<typename Result>
class basic_task {
	// Reference counted internal task object
	detail::task_ptr internal_task;

	// Real result type, with void turned into fake_void
	typedef typename void_to_fake_void<Result>::type internal_result;

	// Type-specific task object
	typedef task_result<internal_result> internal_task_type;

	// Friend access
	friend async::task<Result>;
	friend async::shared_task<Result>;
	template<typename T>
	friend typename T::internal_task_type* get_internal_task(const T& t);
	template<typename T>
	friend void set_internal_task(T& t, task_ptr p);

	// Common code for get()
	// 这里也会等待任务结束
	void get_internal() const
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty task object");

		// If the task was canceled, throw the associated exception
		get_internal_task(*this)->wait_and_throw();
	}

	// Common code for then()
	template<typename Sched, typename Func, typename Parent>
	// 返回的仍然是一个任务,可以继续使用then
	typename continuation_traits<Parent, Func>::task_type then_internal(Sched& sched, Func&& f, Parent&& parent) const
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty task object");

		// Save a copy of internal_task because it might get moved into exec_func
		task_base* my_internal = internal_task.get();

		// Create continuation
		typedef continuation_traits<Parent, Func> traits;
		typedef typename void_to_fake_void<typename traits::task_type::result_type>::type cont_internal_result;
		typedef continuation_exec_func<Sched, typename std::decay<Parent>::type, cont_internal_result, typename traits::decay_func, typename traits::is_value_cont, is_task<typename traits::result_type>::value> exec_func;
		typename traits::task_type cont;
		// 创建一个新的task,并将新创建的任务放到当前任务的后续任务队列
		set_internal_task(cont, task_ptr(new task_func<Sched, exec_func, cont_internal_result>(std::forward<Func>(f), std::forward<Parent>(parent))));

		// Add the continuation to this task
		// Avoid an expensive ref-count modification since the task isn't shared yet
		get_internal_task(cont)->add_ref_unlocked();
		get_internal_task(cont)->sched = std::addressof(sched);
		my_internal->add_continuation(sched, task_ptr(get_internal_task(cont)));

		return cont;
	}

public:
	// Task result type
	typedef Result result_type;

	// Check if this task is not empty
	bool valid() const
	{
		return internal_task != nullptr;
	}

	// Query whether the task has finished executing
	bool ready() const
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty task object");
		return internal_task->ready();
	}

	// Query whether the task has been canceled with an exception
	bool canceled() const
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty task object");
		return internal_task->state.load(std::memory_order_acquire) == task_state::canceled;
	}

	// Wait for the task to complete
	void wait() const
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty task object");
		internal_task->wait();
	}

	// Get the exception associated with a canceled task
	// 	这里会等待任务执行
	std::exception_ptr get_exception() const
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty task object");
		if (internal_task->wait() == task_state::canceled)
			return get_internal_task(*this)->get_exception();
		else
			return std::exception_ptr();
	}
};

// Common code for event_task specializations
// 定义事件,用户可以使用接口从事件获得task<Result>,可以用于存储任务结果
template<typename Result>
class basic_event {
	// Reference counted internal task object
	detail::task_ptr internal_task;

	// Real result type, with void turned into fake_void
	typedef typename detail::void_to_fake_void<Result>::type internal_result;

	// Type-specific task object
	typedef detail::task_result<internal_result> internal_task_type;

	// Friend access
	friend async::event_task<Result>;
	template<typename T>
	friend typename T::internal_task_type* get_internal_task(const T& t);

	// Common code for set()
	template<typename T>
	// 将事件的结果关联到成员对象对应的位置
	bool set_internal(T&& result) const
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty event_task object");

		// Only allow setting the value once
		detail::task_state expected = detail::task_state::pending;
		if (!internal_task->state.compare_exchange_strong(expected, detail::task_state::locked, std::memory_order_acquire, std::memory_order_relaxed))
			return false;

		LIBASYNC_TRY {
			// Store the result and finish
			get_internal_task(*this)->set_result(std::forward<T>(result));
			internal_task->finish();
		} LIBASYNC_CATCH(...) {
			// At this point we have already committed to setting a value, so
			// we can't return the exception to the caller. If we did then it
			// could cause concurrent set() calls to fail, thinking a value has
			// already been set. Instead, we simply cancel the task with the
			// exception we just got.
			get_internal_task(*this)->cancel_base(std::current_exception());
		}
		return true;
	}

public:
	// Movable but not copyable
	basic_event(basic_event&& other) LIBASYNC_NOEXCEPT
		: internal_task(std::move(other.internal_task)) {}
	basic_event& operator=(basic_event&& other) LIBASYNC_NOEXCEPT
	{
		internal_task = std::move(other.internal_task);
		return *this;
	}

	// Main constructor
	basic_event()
		: internal_task(new internal_task_type)
	{
		internal_task->event_task_got_task = false;
	}

	// Cancel events if they are destroyed before they are set
	~basic_event()
	{
		// This check isn't thread-safe but set_exception does a proper check
		if (internal_task && !internal_task->ready() && !internal_task->is_unique_ref(std::memory_order_relaxed)) {
#ifdef LIBASYNC_NO_EXCEPTIONS
			// This will result in an abort if the task result is read
			set_exception(std::exception_ptr());
#else
			set_exception(std::make_exception_ptr(abandoned_event_task()));
#endif
		}
	}

	// Get the task linked to this event. This can only be called once.
	// 事件只能被取走一次
	task<Result> get_task()
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty event_task object");
		LIBASYNC_ASSERT(!internal_task->event_task_got_task, std::logic_error, "get_task() called twice on event_task");

		// Even if we didn't trigger an assert, don't return a task if one has
		// already been returned.
		task<Result> out;
		if (!internal_task->event_task_got_task)
			set_internal_task(out, internal_task);
		internal_task->event_task_got_task = true;
		return out;
	}

	// Cancel the event with an exception and cancel continuations
	// 将异常结果关联到成员对象对应的位置
	bool set_exception(std::exception_ptr except) const
	{
		LIBASYNC_ASSERT(internal_task, std::invalid_argument, "Use of empty event_task object");

		// Only allow setting the value once
		detail::task_state expected = detail::task_state::pending;
		if (!internal_task->state.compare_exchange_strong(expected, detail::task_state::locked, std::memory_order_acquire, std::memory_order_relaxed))
			return false;

		// Cancel the task
		get_internal_task(*this)->cancel_base(std::move(except));
		return true;
	}
};

} // namespace detail

template<typename Result>
class task: public detail::basic_task<Result> {
public:
	// Movable but not copyable
	task() = default;
	task(task&& other) LIBASYNC_NOEXCEPT
		: detail::basic_task<Result>(std::move(other)) {}
	task& operator=(task&& other) LIBASYNC_NOEXCEPT
	{
		detail::basic_task<Result>::operator=(std::move(other));
		return *this;
	}

	// Get the result of the task
	// 等待任务执行完,并获取结果,我理解这是使用了移动语义,取一次后,任务结果就被释放了
	Result get()
	{
		this->get_internal();

		// Move the internal state pointer so that the task becomes invalid,
		// even if an exception is thrown.
		detail::task_ptr my_internal = std::move(this->internal_task);
		return detail::fake_void_to_void(static_cast<typename task::internal_task_type*>(my_internal.get())->get_result(*this));
	}

	// Add a continuation to the task
	// 为当前任务添加一个子任务
	template<typename Sched, typename Func>
	typename detail::continuation_traits<task, Func>::task_type then(Sched& sched, Func&& f)
	{
		return this->then_internal(sched, std::forward<Func>(f), std::move(*this));
	}
	// 为当前任务添加一个子任务,使用默认调度器,调度任务
	template<typename Func>
	typename detail::continuation_traits<task, Func>::task_type then(Func&& f)
	{
		return then(::async::default_scheduler(), std::forward<Func>(f));
	}

	// Create a shared_task from this task
	shared_task<Result> share()
	{
		LIBASYNC_ASSERT(this->internal_task, std::invalid_argument, "Use of empty task object");

		shared_task<Result> out;
		detail::set_internal_task(out, std::move(this->internal_task));
		return out;
	}
};

template<typename Result>
class shared_task: public detail::basic_task<Result> {
	// get() return value: const Result& -or- void
	typedef typename std::conditional<
		std::is_void<Result>::value,
		void,
		typename std::add_lvalue_reference<
			typename std::add_const<Result>::type
		>::type
	>::type get_result;

public:
	// Movable and copyable
	shared_task() = default;

	// Get the result of the task
	// 这里的任务结果可以多次获取
	get_result get() const
	{
		this->get_internal();
		return detail::fake_void_to_void(detail::get_internal_task(*this)->get_result(*this));
	}

	// Add a continuation to the task
	// 为当前任务添加一个子任务
	template<typename Sched, typename Func>
	typename detail::continuation_traits<shared_task, Func>::task_type then(Sched& sched, Func&& f) const
	{
		return this->then_internal(sched, std::forward<Func>(f), *this);
	}
	// 为当前任务添加一个子任务,使用默认调度器,调度任务
	template<typename Func>
	typename detail::continuation_traits<shared_task, Func>::task_type then(Func&& f) const
	{
		return then(::async::default_scheduler(), std::forward<Func>(f));
	}
};

// Special task type which can be triggered manually rather than when a function executes.
// 手动触发的事件任务
template<typename Result>
class event_task: public detail::basic_event<Result> {
public:
	// Movable but not copyable
	event_task() = default;
	event_task(event_task&& other) LIBASYNC_NOEXCEPT
		: detail::basic_event<Result>(std::move(other)) {}
	event_task& operator=(event_task&& other) LIBASYNC_NOEXCEPT
	{
		detail::basic_event<Result>::operator=(std::move(other));
		return *this;
	}

	// Set the result of the task, mark it as completed and run its continuations
	bool set(const Result& result) const
	{
		return this->set_internal(result);
	}
	bool set(Result&& result) const
	{
		return this->set_internal(std::move(result));
	}
};

// Specialization for references
template<typename Result>
class event_task<Result&>: public detail::basic_event<Result&> {
public:
	// Movable but not copyable
	event_task() = default;
	event_task(event_task&& other) LIBASYNC_NOEXCEPT
		: detail::basic_event<Result&>(std::move(other)) {}
	event_task& operator=(event_task&& other) LIBASYNC_NOEXCEPT
	{
		detail::basic_event<Result&>::operator=(std::move(other));
		return *this;
	}

	// Set the result of the task, mark it as completed and run its continuations
	bool set(Result& result) const
	{
		return this->set_internal(result);
	}
};

// Specialization for void
template<>
class event_task<void>: public detail::basic_event<void> {
public:
	// Movable but not copyable
	event_task() = default;
	event_task(event_task&& other) LIBASYNC_NOEXCEPT
		: detail::basic_event<void>(std::move(other)) {}
	event_task& operator=(event_task&& other) LIBASYNC_NOEXCEPT
	{
		detail::basic_event<void>::operator=(std::move(other));
		return *this;
	}

	// Set the result of the task, mark it as completed and run its continuations
	bool set()
	{
		return this->set_internal(detail::fake_void());
	}
};

// Task type returned by local_spawn()
template<typename Sched, typename Func>
class local_task {
	// Make sure the function type is callable
	typedef typename std::decay<Func>::type decay_func;
	static_assert(detail::is_callable<decay_func()>::value, "Invalid function type passed to local_spawn()");

	// Task result type
	typedef typename detail::remove_task<decltype(std::declval<decay_func>()())>::type result_type;
	typedef typename detail::void_to_fake_void<result_type>::type internal_result;

	// Task execution function type
	typedef detail::root_exec_func<Sched, internal_result, decay_func, detail::is_task<decltype(std::declval<decay_func>()())>::value> exec_func;

	// Task object embedded directly. The ref-count is initialized to 1 so it
	// will never be freed using delete, only when the local_task is destroyed.
	detail::task_func<Sched, exec_func, internal_result> internal_task;

	// Friend access for local_spawn
	template<typename S, typename F>
	friend local_task<S, F> local_spawn(S& sched, F&& f);
	template<typename F>
	friend local_task<detail::default_scheduler_type, F> local_spawn(F&& f);

	// Constructor, used by local_spawn
	local_task(Sched& sched, Func&& f)
		: internal_task(std::forward<Func>(f))
	{
		// Avoid an expensive ref-count modification since the task isn't shared yet
		internal_task.add_ref_unlocked();
		detail::schedule_task(sched, detail::task_ptr(&internal_task));
	}

public:
	// Non-movable and non-copyable
	local_task(const local_task&) = delete;
	local_task& operator=(const local_task&) = delete;

	// Wait for the task to complete when destroying
	~local_task()
	{
		wait();

		// Now spin until the reference count drops to 1, since the scheduler
		// may still have a reference to the task.
		while (!internal_task.is_unique_ref(std::memory_order_acquire)) {
#if defined(__GLIBCXX__) && __GLIBCXX__ <= 20140612
			// Some versions of libstdc++ (4.7 and below) don't include a
			// definition of std::this_thread::yield().
			sched_yield();
#else
			std::this_thread::yield();
#endif
		}
	}

	// Query whether the task has finished executing
	bool ready() const
	{
		return internal_task.ready();
	}

	// Query whether the task has been canceled with an exception
	bool canceled() const
	{
		return internal_task.state.load(std::memory_order_acquire) == detail::task_state::canceled;
	}

	// Wait for the task to complete
	void wait()
	{
		internal_task.wait();
	}

	// Get the result of the task
	result_type get()
	{
		internal_task.wait_and_throw();
		return detail::fake_void_to_void(internal_task.get_result(task<result_type>()));
	}

	// Get the exception associated with a canceled task
	std::exception_ptr get_exception() const
	{
		if (internal_task.wait() == detail::task_state::canceled)
			return internal_task.get_exception();
		else
			return std::exception_ptr();
	}
};

// Spawn a function asynchronously
#if (__cplusplus >= 201703L)
// Use std::invoke_result instead of std::result_of for C++17 or greater because std::result_of was deprecated in C++17 and removed in C++20
template<typename Sched, typename Func>
task<typename detail::remove_task<std::invoke_result_t<std::decay_t<Func>>>::type> spawn(Sched& sched, Func&& f)
#else
// 如果 Func 是一个函数类型(比如 void(int)),则 std::decay<Func>::type 会变成一个对应的函数指针类型(比如 void (*)(int))
// 这个返回其实是一个task<Result> 这样一个对象,这其中包含了任务,执行结果等信息
// 这个模板的函数的作用是用于创建并调度一个异步任务。
template<typename Sched, typename Func>
task<typename detail::remove_task<typename std::result_of<typename std::decay<Func>::type()>::type>::type> spawn(Sched& sched, Func&& f)
#endif
{
	// Using result_of in the function return type to work around bugs in the Intel
	// C++ compiler.

	// Make sure the function type is callable
	typedef typename std::decay<Func>::type decay_func;
	static_assert(detail::is_callable<decay_func()>::value, "Invalid function type passed to spawn()");

	// Create task
	typedef typename detail::void_to_fake_void<typename detail::remove_task<decltype(std::declval<decay_func>()())>::type>::type internal_result;
	typedef detail::root_exec_func<Sched, internal_result, decay_func, detail::is_task<decltype(std::declval<decay_func>()())>::value> exec_func;
	task<typename detail::remove_task<decltype(std::declval<decay_func>()())>::type> out;
	detail::set_internal_task(out, detail::task_ptr(new detail::task_func<Sched, exec_func, internal_result>(std::forward<Func>(f))));

	// Avoid an expensive ref-count modification since the task isn't shared yet
	detail::get_internal_task(out)->add_ref_unlocked();
	detail::schedule_task(sched, detail::task_ptr(detail::get_internal_task(out)));

	return out;
}
// 使用默认的调度器,返回值类型是使用decltype进行推导的
template<typename Func>
decltype(async::spawn(::async::default_scheduler(), std::declval<Func>())) spawn(Func&& f)
{
	return async::spawn(::async::default_scheduler(), std::forward<Func>(f));
}

// Create a completed task containing a value
// make_task 这些函数用于创建已经完成的任务,并将结果或异常存储在任务对象中。
// 我认为目的是为了创建一个统一的对象,将结果放到对应位置,然后返回
template<typename T>
task<typename std::decay<T>::type> make_task(T&& value)
{
	task<typename std::decay<T>::type> out;

	detail::set_internal_task(out, detail::task_ptr(new detail::task_result<typename std::decay<T>::type>));
	detail::get_internal_task(out)->set_result(std::forward<T>(value));
	detail::get_internal_task(out)->state.store(detail::task_state::completed, std::memory_order_relaxed);

	return out;
}
template<typename T>
task<T&> make_task(std::reference_wrapper<T> value)
{
	task<T&> out;

	detail::set_internal_task(out, detail::task_ptr(new detail::task_result<T&>));
	detail::get_internal_task(out)->set_result(value.get());
	detail::get_internal_task(out)->state.store(detail::task_state::completed, std::memory_order_relaxed);

	return out;
}
inline task<void> make_task()
{
	task<void> out;

	detail::set_internal_task(out, detail::task_ptr(new detail::task_result<detail::fake_void>));
	detail::get_internal_task(out)->state.store(detail::task_state::completed, std::memory_order_relaxed);

	return out;
}

// Create a canceled task containing an exception
template<typename T>
task<T> make_exception_task(std::exception_ptr except)
{
	task<T> out;

	detail::set_internal_task(out, detail::task_ptr(new detail::task_result<typename detail::void_to_fake_void<T>::type>));
	detail::get_internal_task(out)->set_exception(std::move(except));
	detail::get_internal_task(out)->state.store(detail::task_state::canceled, std::memory_order_relaxed);

	return out;
}

// Spawn a very limited task which is restricted to the current function and
// joins on destruction. Because local_task is not movable, the result must
// be captured in a reference, like this:
// auto&& x = local_spawn(...);
template<typename Sched, typename Func>
#ifdef __GNUC__
__attribute__((warn_unused_result))
#endif
// 局部任务与普通任务不同,局部任务的生命周期受限于当前函数,任务对象在创建后不能移动。
// 局部任务通常用于在当前函数范围内执行一个任务,且确保任务在函数退出时完成。
local_task<Sched, Func> local_spawn(Sched& sched, Func&& f)
{
	// Since local_task is not movable, we construct it in-place and let the
	// caller extend the lifetime of the returned object using a reference.
	return {sched, std::forward<Func>(f)};
}
template<typename Func>
#ifdef __GNUC__
__attribute__((warn_unused_result))
#endif
local_task<detail::default_scheduler_type, Func> local_spawn(Func&& f)
{
	return {::async::default_scheduler(), std::forward<Func>(f)};
}

http://www.kler.cn/a/502193.html

相关文章:

  • OceanBase数据库设计与管理:构建高效分布式数据架构基石
  • Nginx配置VTS模块-对接Promethues监控
  • [ Spring ] Install MongoDB on Ubuntu24
  • Linux 高级路由 —— 筑梦之路
  • 【C++】string的关系运算与比较分析
  • 【电子通识】PWM驱动让有刷直流电机恒流工作
  • 【HM-React】08. Layout模块
  • 树状数组与线段树简单讲解与习题
  • SQLite 语法快速入门
  • 为AI聊天工具添加一个知识系统 之32 三“中”全“会”:推理式的ISA(父类)和IOS(母本)以及生成式CMN (双亲委派)之1
  • 数据预测2025年AI面试市场增幅超500%!
  • 机器学习 - 常用的损失函数(0-1、平方)
  • AIDD-人工智能药物设计-3DSMILES-GPT:基于词元化语言模型的3D分子生成
  • pytorch小记(二):pytorch中的连接操作:torch.cat(tensors, dim=0)
  • Kotlin面向对象编程
  • 《零基础Go语言算法实战》【题目 1-19】检查切片是否为空
  • 一些编译脚本
  • 如何将 sqlserver 数据迁移到 mysql
  • 线形回归与小批量梯度下降实例
  • 机器学习笔记——特征工程
  • OCC+VTK对象高亮
  • <C++学习>C++ Boost 容器操作教程
  • 秩为1的矩阵可以表示为两个向量的外积
  • MetaPhlAn2-增强版宏基因组分类谱工具-一条命令获得宏基因组物种组成
  • 不触碰资金的支付网关有哪些?
  • 图匹配算法(涵盖近似图匹配)