aboutsummaryrefslogtreecommitdiff
path: root/src/lane.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/lane.cpp')
-rw-r--r--src/lane.cpp79
1 files changed, 62 insertions, 17 deletions
diff --git a/src/lane.cpp b/src/lane.cpp
index c6ea358..f8685e9 100644
--- a/src/lane.cpp
+++ b/src/lane.cpp
@@ -129,8 +129,13 @@ static LUAG_FUNC(lane_join)
129 raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); 129 raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type");
130 } 130 }
131 131
132 // it is forbidden to join a suspended coroutine
133 if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) {
134 raise_luaL_error(L_, "cannot join a suspended coroutine");
135 }
136
132 lua_settop(L_, 1); // L_: lane 137 lua_settop(L_, 1); // L_: lane
133 bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; 138 bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) };
134 139
135 if (!_done) { 140 if (!_done) {
136 lua_pushnil(L_); // L_: lane nil 141 lua_pushnil(L_); // L_: lane nil
@@ -145,7 +150,11 @@ static LUAG_FUNC(lane_join)
145 int const _stored{ _lane->storeResults(L_) }; 150 int const _stored{ _lane->storeResults(L_) };
146 STACK_GROW(L_, std::max(3, _stored + 1)); 151 STACK_GROW(L_, std::max(3, _stored + 1));
147 switch (_lane->status.load(std::memory_order_acquire)) { 152 switch (_lane->status.load(std::memory_order_acquire)) {
148 case Lane::Suspended: // got yielded values 153 case Lane::Suspended:
154 // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED!
155 raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine");
156 [[fallthrough]];
157
149 case Lane::Done: // got regular return values 158 case Lane::Done: // got regular return values
150 if (_stored > 0) { 159 if (_stored > 0) {
151 lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} 160 lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv}
@@ -210,7 +219,7 @@ LUAG_FUNC(lane_resume)
210 Lane* const _lane{ ToLane(L_, kIdxSelf) }; 219 Lane* const _lane{ ToLane(L_, kIdxSelf) };
211 lua_State* const _L2{ _lane->L }; 220 lua_State* const _L2{ _lane->L };
212 221
213// wait until the lane yields or returns 222 // wait until the lane yields or returns
214 std::ignore = _lane->waitForCompletion(std::chrono::time_point<std::chrono::steady_clock>::max(), true); 223 std::ignore = _lane->waitForCompletion(std::chrono::time_point<std::chrono::steady_clock>::max(), true);
215 224
216 if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) { 225 if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) {
@@ -259,11 +268,17 @@ static int lane_index_number(lua_State* L_)
259 int const _key{ static_cast<int>(lua_tointeger(L_, 2)) }; 268 int const _key{ static_cast<int>(lua_tointeger(L_, 2)) };
260 lua_pop(L_, 1); // L_: lane 269 lua_pop(L_, 1); // L_: lane
261 270
271 // wait until the lane finishes or is suspended
262 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; 272 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
263 if (!_lane->waitForCompletion(_until, true)) { 273 if (!_lane->waitForCompletion(_until, true)) {
264 raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); 274 raise_luaL_error(L_, "INTERNAL ERROR: Failed to join");
265 } 275 }
266 276
277 // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume()
278 if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) {
279 raise_luaL_error(L_, "cannot index a suspended coroutine");
280 }
281
267 // make sure results are stored 282 // make sure results are stored
268 int const _stored{ _lane->storeResults(L_) }; 283 int const _stored{ _lane->storeResults(L_) };
269 if (_key > _stored) { 284 if (_key > _stored) {
@@ -485,6 +500,7 @@ static int PushStackTrace(lua_State* const L_, Lane::ErrorTraceLevel const error
485 StackIndex const _top{ lua_gettop(L_) }; 500 StackIndex const _top{ lua_gettop(L_) };
486 switch (rc_) { 501 switch (rc_) {
487 case LuaError::OK: // no error, body return values are on the stack 502 case LuaError::OK: // no error, body return values are on the stack
503 case LuaError::YIELD:
488 break; 504 break;
489 505
490 case LuaError::ERRRUN: // cancellation or a runtime error 506 case LuaError::ERRRUN: // cancellation or a runtime error
@@ -707,7 +723,7 @@ static void lane_main(Lane* const lane_)
707 LuaError _rc{ LuaError::ERRRUN }; 723 LuaError _rc{ LuaError::ERRRUN };
708 if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work 724 if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work
709 // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler 725 // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler
710 int const _errorHandlerCount{ lane_->errorHandlerCount() }; 726 int const _errorHandlerCount{ lane_->errorHandlerCount() }; // no error handler for coroutines, ever.
711 int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; 727 int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount };
712 { 728 {
713 std::unique_lock _guard{ lane_->doneMutex }; 729 std::unique_lock _guard{ lane_->doneMutex };
@@ -720,27 +736,40 @@ static void lane_main(Lane* const lane_)
720 lane_->nresults = lua_gettop(_L) - _errorHandlerCount; 736 lane_->nresults = lua_gettop(_L) - _errorHandlerCount;
721 } else { 737 } else {
722 // S and L are different: we run as a coroutine in Lua thread L created in state S 738 // S and L are different: we run as a coroutine in Lua thread L created in state S
739 bool _again{ true };
723 do { 740 do {
724 // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. 741 // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values.
725 // that's why we have lane_->nresults 742 // that's why we have lane_->nresults
726 _rc = luaG_resume(_L, nullptr, _nargs, &lane_->nresults); // L: eh? ... retvals|err... 743 _rc = luaG_resume(_L, nullptr, _nargs, &lane_->nresults); // L: ... retvals|err...
727 if (_rc == LuaError::YIELD) { 744 if (_rc == LuaError::YIELD) {
745 // on the stack we find the values pushed by lane:resume()
746 _nargs = lua_gettop(_L);
747 if (std::unique_lock _guard{ lane_->doneMutex }; true)
748 {
728 // change our status to suspended, and wait until someone wants us to resume 749 // change our status to suspended, and wait until someone wants us to resume
729 std::unique_lock _guard{ lane_->doneMutex };
730 lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended 750 lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended
731 lane_->doneCondVar.notify_one(); 751 lane_->doneCondVar.notify_one();
732 // wait until the user wants us to resume 752 // wait until the user wants us to resume
733 // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? 753 // update waiting_on, so that the lane can be woken by cancellation requests here?
734 // lane_->waiting_on = &lane_->doneCondVar; 754 lane_->waiting_on = &lane_->doneCondVar;
735 lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming; }); 755 lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; });
736 // here lane_->doneMutex is locked again 756 // here lane_->doneMutex is locked again
737 // lane_->waiting_on = nullptr; 757 lane_->waiting_on = nullptr;
738 lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running 758 lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running
739 // on the stack we find the values pushed by lane:resume()
740 _nargs = lua_gettop(_L);
741 } 759 }
742 } while (_rc == LuaError::YIELD); 760 // wait was interrupted because of a cancellation, finish the lane
743 if (_rc != LuaError::OK) { // : err... 761 _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None);
762 } else {
763 _again = false;
764 }
765 } while (_again);
766#if LUA_VERSION_NUM >= 504
767 if (_rc == LuaError::YIELD) {
768 // lane is cancelled before completion (for example at Lanes shutdown), close everything
769 _rc = static_cast<LuaError>(lua_closethread(_L, nullptr));
770 }
771#endif // LUA_VERSION_NUM
772 if (_rc != LuaError::OK) { // an error occurred // L: err...
744 // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack 773 // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack
745 // or false + the error message when running Lua 5.1 774 // or false + the error message when running Lua 5.1
746 // since the rest of our code wants only the error message, let us keep only the latter. 775 // since the rest of our code wants only the error message, let us keep only the latter.
@@ -805,7 +834,9 @@ static void lane_main(Lane* const lane_)
805// ################################################################################################# 834// #################################################################################################
806 835
807#if LUA_VERSION_NUM >= 504 836#if LUA_VERSION_NUM >= 504
808static LUAG_FUNC(lane_close) 837
838// __close(lane_ud, <err>)
839static LUAG_FUNC(lane___close)
809{ 840{
810 [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil 841 [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil
811 // drop the error if any 842 // drop the error if any
@@ -818,6 +849,16 @@ static LUAG_FUNC(lane_close)
818 lua_call(L_, 1, LUA_MULTRET); // L_: join() results 849 lua_call(L_, 1, LUA_MULTRET); // L_: join() results
819 return lua_gettop(L_); 850 return lua_gettop(L_);
820} 851}
852
853// #################################################################################################
854
855// close(lane_ud)
856static LUAG_FUNC(lane_close)
857{
858 [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil
859 raise_luaL_error(L_, "not implemented"); // TODO!
860 return 0;
861}
821#endif // LUA_VERSION_NUM >= 504 862#endif // LUA_VERSION_NUM >= 504
822 863
823// ################################################################################################# 864// #################################################################################################
@@ -966,7 +1007,8 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi
966 // lane_->thread.get_stop_source().request_stop(); 1007 // lane_->thread.get_stop_source().request_stop();
967 } 1008 }
968 if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired 1009 if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired
969 if (status.load(std::memory_order_acquire) == Lane::Waiting) { // waiting_on is updated under control of status acquire/release semantics 1010 auto const _status{ status.load(std::memory_order_acquire) };
1011 if (_status == Lane::Waiting || _status == Lane::Suspended) { // waiting_on is updated under control of status acquire/release semantics
970 if (std::condition_variable* const _waiting_on{ waiting_on }) { 1012 if (std::condition_variable* const _waiting_on{ waiting_on }) {
971 _waiting_on->notify_all(); 1013 _waiting_on->notify_all();
972 } 1014 }
@@ -1007,11 +1049,14 @@ namespace {
1007 namespace local { 1049 namespace local {
1008 static struct luaL_Reg const sLaneFunctions[] = { 1050 static struct luaL_Reg const sLaneFunctions[] = {
1009#if LUA_VERSION_NUM >= 504 1051#if LUA_VERSION_NUM >= 504
1010 { "__close", LG_lane_close }, 1052 { "__close", LG_lane___close },
1011#endif // LUA_VERSION_NUM >= 504 1053#endif // LUA_VERSION_NUM >= 504
1012 { "__gc", LG_lane_gc }, 1054 { "__gc", LG_lane_gc },
1013 { "__index", LG_lane_index }, 1055 { "__index", LG_lane_index },
1014 { "cancel", LG_lane_cancel }, 1056 { "cancel", LG_lane_cancel },
1057#if LUA_VERSION_NUM >= 504
1058 { "close", LG_lane_close },
1059#endif // LUA_VERSION_NUM >= 504
1015 { "get_threadname", LG_lane_get_threadname }, 1060 { "get_threadname", LG_lane_get_threadname },
1016 { "join", LG_lane_join }, 1061 { "join", LG_lane_join },
1017 { "resume", LG_lane_resume }, 1062 { "resume", LG_lane_resume },