From f81fe873dd24f93306f0f667fc47766990a9321b Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Wed, 3 Jul 2024 09:28:33 +0200 Subject: Add minimal coroutine support: no doc, no error handling, no cancellation handling (yet) --- src/lane.cpp | 142 ++++++++++++++++++++++++++++++++++++++++++++++++---------- src/lane.h | 26 ++++++++--- src/lanes.cpp | 28 +++++++++--- 3 files changed, 160 insertions(+), 36 deletions(-) (limited to 'src') diff --git a/src/lane.cpp b/src/lane.cpp index ba24af3..dd038a3 100644 --- a/src/lane.cpp +++ b/src/lane.cpp @@ -186,6 +186,55 @@ static LUAG_FUNC(thread_join) // ################################################################################################# +LUAG_FUNC(thread_resume) +{ + static constexpr int kSelf{ 1 }; + Lane* const _lane{ ToLane(L_, kSelf) }; + lua_State* const _L2{ _lane->L }; + + // wait until the lane yields + std::optional _hadToWait{}; // for debugging, if we ever raise the error just below + { + std::unique_lock _guard{ _lane->doneMutex }; + if (_lane->status == Lane::Pending || _lane->status == Lane::Running || _lane->status == Lane::Resuming) { + _hadToWait = _lane->status; + _lane->doneCondVar.wait(_guard, [_lane]() { return _lane->status == Lane::Suspended; }); + } + } + if (_lane->status != Lane::Suspended) { + if (_hadToWait) { + raise_luaL_error(L_, "INTERNAL ERROR: Lane status is %s instead of 'suspended'", _lane->threadStatusString().data()); + } else { + raise_luaL_error(L_, "Can't resume a non-suspended coroutine-type Lane"); + } + } + int const _nargs{ lua_gettop(L_) - 1 }; + int const _nresults{ lua_gettop(_L2) }; + STACK_CHECK_START_ABS(L_, 1 + _nargs); // L_: self args... _L2: results... + STACK_CHECK_START_ABS(_L2, _nresults); + + // to retrieve the yielded value of the coroutine on our stack + InterCopyContext _cin{ _lane->U, DestState{ L_ }, SourceState{ _L2 }, {}, {}, {}, {}, {} }; + if (_cin.interMove(_nresults) != InterCopyResult::Success) { // L_: self args... results... _L2: + raise_luaL_error(L_, "Failed to retrieve yielded values"); + } + + // to send our args on the coroutine stack + InterCopyContext _cout{ _lane->U, DestState{ _L2 }, SourceState{ L_ }, {}, SourceIndex{ 2 }, {}, {}, {} }; + if (_cout.interCopy(_nargs) != InterCopyResult::Success) { // L_: self args... results... _L2: args... + raise_luaL_error(L_, "Failed to send resumed values"); + } + + STACK_CHECK(_L2, _nargs); // we should have removed everything from the lane's stack, and pushed our args + STACK_CHECK(L_, 1 + _nargs + _nresults); // and the results of the coroutine are on top here + std::unique_lock _guard{ _lane->doneMutex }; + _lane->status = Lane::Resuming; + _lane->doneCondVar.notify_one(); + return _nresults; +} + +// ################################################################################################# + // key is numeric, wait until the thread returns and populate the environment with the return values // If the return values signal an error, propagate it // Else If key is found in the environment, return it @@ -697,12 +746,37 @@ static void lane_main(Lane* const lane_) if (lane_->status == Lane::Pending) { // nothing wrong happened during preparation, we can work // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler int const _errorHandlerCount{ lane_->errorTraceLevel == Lane::Minimal ? 0 : 1}; - int const _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; - lane_->status = Lane::Running; // Pending -> Running + int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; + { + std::unique_lock _guard{ lane_->doneMutex }; + lane_->status = Lane::Running; // Pending -> Running + } PrepareLaneHelpers(lane_); - - _rc = ToLuaError(lua_pcall(_L, _nargs, LUA_MULTRET, _errorHandlerCount)); // L: eh? retvals|err + if (lane_->S == lane_->L) { // L: eh? f args... + _rc = ToLuaError(lua_pcall(_L, _nargs, LUA_MULTRET, _errorHandlerCount)); // L: eh? retvals|err + } else { + // S and L are different: we run as a coroutine in Lua thread L created in state S + do { + int _nresults{}; + _rc = luaG_resume(_L, nullptr, _nargs, &_nresults); // L: eh? retvals|err + if (_rc == LuaError::YIELD) { + // change our status to suspended, and wait until someone wants us to resume + std::unique_lock _guard{ lane_->doneMutex }; + lane_->status = Lane::Suspended; // Running -> Suspended + lane_->doneCondVar.notify_one(); + // wait until the user wants us to resume + // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? + // lane_->waiting_on = &lane_->doneCondVar; + lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status == Lane::Resuming; }); + // here lane_->doneMutex is locked again + // lane_->waiting_on = nullptr; + lane_->status = Lane::Running; // Resuming -> Running + // on the stack we find the values pushed by lane:resume() + _nargs = lua_gettop(_L); + } + } while (_rc == LuaError::YIELD); + } if (_errorHandlerCount) { lua_remove(_L, 1); // L: retvals|error @@ -713,7 +787,6 @@ static void lane_main(Lane* const lane_) DEBUGSPEW_CODE(DebugSpew(lane_->U) << "Lane " << _L << " body: " << GetErrcodeName(_rc) << " (" << (kCancelError.equals(_L, 1) ? "cancelled" : luaG_typename(_L, 1)) << ")" << std::endl); // Call finalizers, if the script has set them up. - // LuaError const _rc2{ run_finalizers(_L, lane_->errorTraceLevel, _rc) }; DEBUGSPEW_CODE(DebugSpew(lane_->U) << "Lane " << _L << " finalizer: " << GetErrcodeName(_rc2) << std::endl); if (_rc2 != LuaError::OK) { // Error within a finalizer! @@ -722,7 +795,7 @@ static void lane_main(Lane* const lane_) } lane_->waiting_on = nullptr; // just in case if (selfdestruct_remove(lane_)) { // check and remove (under lock!) - // We're a free-running thread and no-one's there to clean us up. + // We're a free-running thread and no-one is there to clean us up. lane_->closeState(); lane_->U->selfdestructMutex.lock(); // done with lua_close(), terminal shutdown sequence may proceed @@ -739,7 +812,7 @@ static void lane_main(Lane* const lane_) // leave results (1..top) or error message + stack trace (1..2) on the stack - master will copy them Lane::Status const _st{ (_rc == LuaError::OK) ? Lane::Done : kCancelError.equals(_L, 1) ? Lane::Cancelled : Lane::Error }; - // 'doneMutex' protects the -> Done|Error|Cancelled state change + // 'doneMutex' protects the -> Done|Error|Cancelled state change, and the Running|Suspended|Resuming state change too std::lock_guard _guard{ lane_->doneMutex }; lane_->status = _st; lane_->doneCondVar.notify_one(); // wake up master (while 'lane_->doneMutex' is on) @@ -825,14 +898,21 @@ static LUAG_FUNC(lane_gc) // #################################### Lane implementation ######################################## // ################################################################################################# -Lane::Lane(Universe* U_, lua_State* L_, ErrorTraceLevel errorTraceLevel_) +Lane::Lane(Universe* const U_, lua_State* const L_, ErrorTraceLevel const errorTraceLevel_, bool const asCoroutine_) : U{ U_ } +, S{ L_ } , L{ L_ } , errorTraceLevel{ errorTraceLevel_ } { + STACK_CHECK_START_REL(S, 0); assert(errorTraceLevel == ErrorTraceLevel::Minimal || errorTraceLevel == ErrorTraceLevel::Basic || errorTraceLevel == ErrorTraceLevel::Extended); - kExtendedStackTraceRegKey.setValue(L_, [yes = errorTraceLevel == ErrorTraceLevel::Extended ? 1 : 0](lua_State* L_) { lua_pushboolean(L_, yes); }); + kExtendedStackTraceRegKey.setValue(S, [yes = errorTraceLevel == ErrorTraceLevel::Extended ? 1 : 0](lua_State* L_) { lua_pushboolean(L_, yes); }); U->tracker.tracking_add(this); + if (asCoroutine_) { + L = lua_newthread(S); // S: thread + //kCoroutineRegKey.setValue(S, [](lua_State* const L_) { lua_insert(L_, -2); }); // S: + } + STACK_CHECK(S, asCoroutine_ ? 1 : 0); } // ################################################################################################# @@ -970,6 +1050,7 @@ namespace { { "cancel", LG_thread_cancel }, { "get_debug_threadname", LG_get_debug_threadname }, { "join", LG_thread_join }, + { "resume", LG_thread_resume }, { nullptr, nullptr } }; } // namespace local @@ -1059,25 +1140,38 @@ void Lane::startThread(int priority_) //--- // str= thread_status( lane ) // -// Returns: "pending" not started yet -// -> "running" started, doing its work.. -// <-> "waiting" blocked in a receive() -// -> "done" finished, results are there -// / "error" finished at an error, error value is there -// / "cancelled" execution cancelled by M (state gone) +// "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming" | -> "done"/"error"/"cancelled" + +// "pending" not started yet +// "running" started, doing its work.. +// "suspended" returned from a lua_resume +// "resuming" told by its parent state to resume +// "waiting" blocked in a send()/receive() +// "done" finished, results are there +// "error" finished at an error, error value is there +// "cancelled" execution cancelled (state gone) // [[nodiscard]] std::string_view Lane::threadStatusString() const { - std::string_view const _str{ - (status == Lane::Pending) ? "pending" : - (status == Lane::Running) ? "running" : // like in 'co.status()' - (status == Lane::Waiting) ? "waiting" : - (status == Lane::Done) ? "done" : - (status == Lane::Error) ? "error" : - (status == Lane::Cancelled) ? "cancelled" : - "" + static constexpr std::string_view kStrs[] = { + "pending", + "running", "suspended", "resuming", + "waiting", + "done", "error", "cancelled" }; - return _str; + static_assert(0 == static_cast>(Pending)); + static_assert(1 == static_cast>(Running)); + static_assert(2 == static_cast>(Suspended)); + static_assert(3 == static_cast>(Resuming)); + static_assert(4 == static_cast>(Waiting)); + static_assert(5 == static_cast>(Done)); + static_assert(6 == static_cast>(Error)); + static_assert(7 == static_cast>(Cancelled)); + auto const _status{ static_cast>(status) }; + if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry + return ""; + } + return kStrs[_status]; } // ################################################################################################# diff --git a/src/lane.h b/src/lane.h index 0426240..753c230 100644 --- a/src/lane.h +++ b/src/lane.h @@ -17,6 +17,9 @@ static constexpr RegistryUniqueKey kExtendedStackTraceRegKey{ 0x38147AD48FB426E2 * error (and maybe stack trace) arguments to the finalizer functions would * anyways complicate that approach. */ +// xxh64 of string "kCoroutineRegKey" generated at https://www.pelock.com/products/hash-calculator +static constexpr RegistryUniqueKey kCoroutineRegKey{ 0x72B049B0D130F009ull }; + // xxh64 of string "kFinalizerRegKey" generated at https://www.pelock.com/products/hash-calculator static constexpr RegistryUniqueKey kFinalizerRegKey{ 0xFE936BFAA718FEEAull }; @@ -45,13 +48,17 @@ class Lane public: /* Pending: The Lua VM hasn't done anything yet. - Running, Waiting: Thread is inside the Lua VM. If the thread is forcefully stopped, we can't lua_close() the Lua State. + Resuming: The user requested the lane to resume execution from Suspended state. + Suspended: returned from lua_resume, waiting for the client to request a lua_resume. + Running, Suspended, Waiting: Thread is inside the Lua VM. Done, Error, Cancelled: Thread execution is outside the Lua VM. It can be lua_close()d. */ enum class Status { Pending, Running, + Suspended, + Resuming, Waiting, Done, Error, @@ -84,8 +91,9 @@ class Lane std::string_view debugName{ "" }; - Universe* const U; - lua_State* L; + Universe* const U{}; + lua_State* S{}; // the master state of the lane + lua_State* L{}; // the state we run things in (either S or a lua_newthread() state if we run in coroutine mode) // // M: prepares the state, and reads results // S: while S is running, M must keep out of modifying the state @@ -93,7 +101,7 @@ class Lane Status volatile status{ Pending }; // // M: sets to Pending (before launching) - // S: updates -> Running/Waiting -> Done/Error/Cancelled + // S: updates -> Running/Waiting/Suspended -> Done/Error/Cancelled std::condition_variable* volatile waiting_on{ nullptr }; // @@ -121,7 +129,7 @@ class Lane // this one is for us, to make sure memory is freed by the correct allocator static void operator delete(void* p_) { static_cast(p_)->U->internalAllocator.free(p_, sizeof(Lane)); } - Lane(Universe* U_, lua_State* L_, ErrorTraceLevel errorTraceLevel_); + Lane(Universe* U_, lua_State* L_, ErrorTraceLevel errorTraceLevel_, bool asCoroutine_); ~Lane(); private: @@ -133,7 +141,13 @@ class Lane CancelResult cancel(CancelOp op_, int hookCount_, std::chrono::time_point until_, bool wakeLane_); void changeDebugName(int const nameIdx_); - void closeState() { lua_State* _L{ L }; L = nullptr; lua_close(_L); } + void closeState() + { + lua_State* _L{ S }; + S = nullptr; + L = nullptr; + lua_close(_L); // this collects our coroutine thread at the same time + } [[nodiscard]] std::string_view errorTraceLevelString() const; [[nodiscard]] int pushErrorHandler() const; [[nodiscard]] std::string_view pushErrorTraceLevel(lua_State* L_) const; diff --git a/src/lanes.cpp b/src/lanes.cpp index 473e150..c11dd6a 100644 --- a/src/lanes.cpp +++ b/src/lanes.cpp @@ -268,11 +268,16 @@ LUAG_FUNC(lane_new) DEBUGSPEW_CODE(DebugSpew(_U) << "lane_new: setup" << std::endl); std::optional _libs_str{ lua_isnil(L_, kLibsIdx) ? std::nullopt : std::make_optional(luaG_tostring(L_, kLibsIdx)) }; - lua_State* const _L2{ state::NewLaneState(_U, SourceState{ L_ }, _libs_str) }; // L_: [fixed] ... L2: - STACK_CHECK_START_REL(_L2, 0); + lua_State* const _S{ state::NewLaneState(_U, SourceState{ L_ }, _libs_str) }; // L_: [fixed] ... L2: + STACK_CHECK_START_REL(_S, 0); // 'lane' is allocated from heap, not Lua, since its life span may surpass the handle's (if free running thread) - Lane* const _lane{ new (_U) Lane{ _U, _L2, static_cast(lua_tointeger(L_, kErTlIdx)) } }; + Lane::ErrorTraceLevel const _errorTraceLevel{ static_cast(lua_tointeger(L_, kErTlIdx)) }; + bool const _asCoroutine{ lua_toboolean(L_, kAsCoro) ? true : false }; + Lane* const _lane{ new (_U) Lane{ _U, _S, _errorTraceLevel, _asCoroutine } }; + STACK_CHECK(_S, _asCoroutine ? 1 : 0); // the Lane's thread is on the Lane's state stack + lua_State* const _L2{ _lane->L }; + STACK_CHECK_START_REL(_L2, 0); if (_lane == nullptr) { raise_luaL_error(L_, "could not create lane: out of memory"); } @@ -347,7 +352,7 @@ LUAG_FUNC(lane_new) lua_setiuservalue(L, -2, 1); // L: ... lane - lua_State* _L2{ lane->L }; + lua_State* const _L2{ lane->L }; STACK_CHECK_START_REL(_L2, 0); int const _name_idx{ lua_isnoneornil(L, kNameIdx) ? 0 : kNameIdx }; std::string_view const _debugName{ (_name_idx > 0) ? luaG_tostring(L, _name_idx) : std::string_view{} }; @@ -357,7 +362,7 @@ LUAG_FUNC(lane_new) luaG_pushstring(_L2, _debugName); // L: ... lane L2: "" } else { lua_Debug _ar; - lua_pushvalue(L, 1); // L: ... lane func + lua_pushvalue(L, kFuncIdx); // L: ... lane func lua_getinfo(L, ">S", &_ar); // L: ... lane luaG_pushstring(_L2, "%s:%d", _ar.short_src, _ar.linedefined); // L: ... lane L2: "" } @@ -415,6 +420,8 @@ LUAG_FUNC(lane_new) [[maybe_unused]] InterCopyResult const _ret{ _c.interCopyPackage() }; LUA_ASSERT(L_, _ret == InterCopyResult::Success); // either all went well, or we should not even get here } + STACK_CHECK(L_, 0); + STACK_CHECK(_L2, 0); // modules to require in the target lane *before* the function is transfered! int const _required_idx{ lua_isnoneornil(L_, kRequIdx) ? 0 : kRequIdx }; @@ -526,6 +533,7 @@ LUAG_FUNC(lane_new) } STACK_CHECK(L_, -_nargs); LUA_ASSERT(L_, lua_gettop(L_) == kFixedArgsIdx); + STACK_CHECK(_L2, _errorHandlerCount + 1 + _nargs); // Store 'lane' in the lane's registry, for 'cancel_test()' (we do cancel tests at pending send/receive). kLanePointerRegKey.setValue( @@ -533,6 +541,14 @@ LUAG_FUNC(lane_new) ); STACK_CHECK(_L2, _errorHandlerCount + 1 + _nargs); + // if in coroutine mode, the Lane's master state stack should contain the thread + if (_asCoroutine) { + LUA_ASSERT(L_, _S != _L2); + STACK_CHECK(_S, 1); + } + // and the thread's stack has whatever is needed to run + STACK_CHECK(_L2, _errorHandlerCount + 1 + _nargs); + STACK_CHECK_RESET_REL(L_, 0); // all went well, the lane's thread can start working _onExit.success(); // L_: [fixed] lane L2: @@ -541,7 +557,7 @@ LUAG_FUNC(lane_new) return 1; } -// ################################################################################################ +// ################################################################################################# // threads() -> {}|nil // Return a list of all known lanes -- cgit v1.2.3-55-g6feb