diff options
| -rw-r--r-- | src/lane.cpp | 76 | ||||
| -rw-r--r-- | unit_tests/scripts/coro/yielding_function.lua | 45 |
2 files changed, 75 insertions, 46 deletions
diff --git a/src/lane.cpp b/src/lane.cpp index f8685e9..5ec6bcf 100644 --- a/src/lane.cpp +++ b/src/lane.cpp | |||
| @@ -128,14 +128,11 @@ static LUAG_FUNC(lane_join) | |||
| 128 | } else if (!lua_isnoneornil(L_, 2)) { | 128 | } else if (!lua_isnoneornil(L_, 2)) { |
| 129 | raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); | 129 | raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); |
| 130 | } | 130 | } |
| 131 | lua_settop(L_, 1); // L_: lane | ||
| 131 | 132 | ||
| 132 | // it is forbidden to join a suspended coroutine | ||
| 133 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 134 | raise_luaL_error(L_, "cannot join a suspended coroutine"); | ||
| 135 | } | ||
| 136 | 133 | ||
| 137 | lua_settop(L_, 1); // L_: lane | 134 | // wait until suspended or done |
| 138 | bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) }; | 135 | bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; |
| 139 | 136 | ||
| 140 | if (!_done) { | 137 | if (!_done) { |
| 141 | lua_pushnil(L_); // L_: lane nil | 138 | lua_pushnil(L_); // L_: lane nil |
| @@ -143,6 +140,19 @@ static LUAG_FUNC(lane_join) | |||
| 143 | return 2; | 140 | return 2; |
| 144 | } | 141 | } |
| 145 | 142 | ||
| 143 | // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread | ||
| 144 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 145 | auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point<std::chrono::steady_clock>::max(), WakeLane::Yes, 0) }; | ||
| 146 | if (_cr == CancelResult::Timeout) { | ||
| 147 | lua_pushnil(L_); // L_: lane nil | ||
| 148 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
| 149 | return 2; | ||
| 150 | } | ||
| 151 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 152 | raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here"); | ||
| 153 | } | ||
| 154 | } | ||
| 155 | |||
| 146 | STACK_CHECK_START_REL(L_, 0); // L_: lane | 156 | STACK_CHECK_START_REL(L_, 0); // L_: lane |
| 147 | // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. | 157 | // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. |
| 148 | 158 | ||
| @@ -151,9 +161,8 @@ static LUAG_FUNC(lane_join) | |||
| 151 | STACK_GROW(L_, std::max(3, _stored + 1)); | 161 | STACK_GROW(L_, std::max(3, _stored + 1)); |
| 152 | switch (_lane->status.load(std::memory_order_acquire)) { | 162 | switch (_lane->status.load(std::memory_order_acquire)) { |
| 153 | case Lane::Suspended: | 163 | case Lane::Suspended: |
| 154 | // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED! | 164 | raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); |
| 155 | raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine"); | 165 | break; |
| 156 | [[fallthrough]]; | ||
| 157 | 166 | ||
| 158 | case Lane::Done: // got regular return values | 167 | case Lane::Done: // got regular return values |
| 159 | if (_stored > 0) { | 168 | if (_stored > 0) { |
| @@ -274,11 +283,6 @@ static int lane_index_number(lua_State* L_) | |||
| 274 | raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); | 283 | raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); |
| 275 | } | 284 | } |
| 276 | 285 | ||
| 277 | // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume() | ||
| 278 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 279 | raise_luaL_error(L_, "cannot index a suspended coroutine"); | ||
| 280 | } | ||
| 281 | |||
| 282 | // make sure results are stored | 286 | // make sure results are stored |
| 283 | int const _stored{ _lane->storeResults(L_) }; | 287 | int const _stored{ _lane->storeResults(L_) }; |
| 284 | if (_key > _stored) { | 288 | if (_key > _stored) { |
| @@ -287,6 +291,7 @@ static int lane_index_number(lua_State* L_) | |||
| 287 | } else { | 291 | } else { |
| 288 | _lane->pushIndexedResult(L_, _key); // L_: lane result | 292 | _lane->pushIndexedResult(L_, _key); // L_: lane result |
| 289 | } | 293 | } |
| 294 | |||
| 290 | return 1; | 295 | return 1; |
| 291 | } | 296 | } |
| 292 | 297 | ||
| @@ -744,31 +749,45 @@ static void lane_main(Lane* const lane_) | |||
| 744 | if (_rc == LuaError::YIELD) { | 749 | if (_rc == LuaError::YIELD) { |
| 745 | // on the stack we find the values pushed by lane:resume() | 750 | // on the stack we find the values pushed by lane:resume() |
| 746 | _nargs = lua_gettop(_L); | 751 | _nargs = lua_gettop(_L); |
| 747 | if (std::unique_lock _guard{ lane_->doneMutex }; true) | 752 | if (std::unique_lock _guard{ lane_->doneMutex }; true) { |
| 748 | { | 753 | // change our status to suspended, and wait until someone wants us to resume |
| 749 | // change our status to suspended, and wait until someone wants us to resume | 754 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended |
| 750 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended | 755 | lane_->doneCondVar.notify_one(); |
| 751 | lane_->doneCondVar.notify_one(); | 756 | // wait until the user wants us to resume |
| 752 | // wait until the user wants us to resume | ||
| 753 | // update waiting_on, so that the lane can be woken by cancellation requests here? | 757 | // update waiting_on, so that the lane can be woken by cancellation requests here? |
| 754 | lane_->waiting_on = &lane_->doneCondVar; | 758 | lane_->waiting_on = &lane_->doneCondVar; |
| 755 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); | 759 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); |
| 756 | // here lane_->doneMutex is locked again | 760 | // here lane_->doneMutex is locked again |
| 757 | lane_->waiting_on = nullptr; | 761 | lane_->waiting_on = nullptr; |
| 758 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running | 762 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running |
| 759 | } | 763 | } |
| 760 | // wait was interrupted because of a cancellation, finish the lane | 764 | // wait was interrupted because of a cancellation, finish the lane |
| 761 | _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); | 765 | _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); |
| 762 | } else { | 766 | } else { |
| 763 | _again = false; | 767 | _again = false; |
| 764 | } | 768 | } |
| 765 | } while (_again); | 769 | } while (_again); |
| 766 | #if LUA_VERSION_NUM >= 504 | ||
| 767 | if (_rc == LuaError::YIELD) { | 770 | if (_rc == LuaError::YIELD) { |
| 771 | #if LUA_VERSION_NUM >= 504 | ||
| 772 | lua_State* const _S{ lane_->S }; | ||
| 773 | STACK_CHECK_START_REL(_S, 0); | ||
| 774 | // lua_closethread cleans the stack, meaning we lose the yielded values! -> store | ||
| 775 | lua_xmove(_L, _S, lane_->nresults); | ||
| 768 | // lane is cancelled before completion (for example at Lanes shutdown), close everything | 776 | // lane is cancelled before completion (for example at Lanes shutdown), close everything |
| 769 | _rc = static_cast<LuaError>(lua_closethread(_L, nullptr)); | 777 | _rc = static_cast<LuaError>(lua_closethread(_L, nullptr)); // L: ... retvals|err <close_err> |
| 770 | } | 778 | // then restore the yielded values |
| 779 | if (_rc == LuaError::OK) { | ||
| 780 | lua_xmove(_S, _L, lane_->nresults); | ||
| 781 | } else { | ||
| 782 | lua_pop(_S, lane_->nresults); | ||
| 783 | } | ||
| 784 | STACK_CHECK(_S, 0); | ||
| 785 | |||
| 786 | #else // LUA_VERSION_NUM | ||
| 787 | // Lua prior to 5.4 do not have lua_closethread. | ||
| 788 | _rc = LuaError::OK; | ||
| 771 | #endif // LUA_VERSION_NUM | 789 | #endif // LUA_VERSION_NUM |
| 790 | } | ||
| 772 | if (_rc != LuaError::OK) { // an error occurred // L: err... | 791 | if (_rc != LuaError::OK) { // an error occurred // L: err... |
| 773 | // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack | 792 | // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack |
| 774 | // or false + the error message when running Lua 5.1 | 793 | // or false + the error message when running Lua 5.1 |
| @@ -980,7 +999,8 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point<std::chron | |||
| 980 | 999 | ||
| 981 | // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here | 1000 | // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here |
| 982 | // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) | 1001 | // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) |
| 983 | if (status.load(std::memory_order_acquire) >= Lane::Done) { | 1002 | auto const _status{ status.load(std::memory_order_acquire) }; |
| 1003 | if (_status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled) { | ||
| 984 | // say "ok" by default, including when lane is already done | 1004 | // say "ok" by default, including when lane is already done |
| 985 | return CancelResult::Cancelled; | 1005 | return CancelResult::Cancelled; |
| 986 | } | 1006 | } |
| @@ -1015,7 +1035,7 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi | |||
| 1015 | } | 1035 | } |
| 1016 | } | 1036 | } |
| 1017 | // wait until the lane stops working with its state (either Suspended or Done+) | 1037 | // wait until the lane stops working with its state (either Suspended or Done+) |
| 1018 | CancelResult const result{ waitForCompletion(until_, true) ? CancelResult::Cancelled : CancelResult::Timeout }; | 1038 | CancelResult const result{ waitForCompletion(until_, false) ? CancelResult::Cancelled : CancelResult::Timeout }; |
| 1019 | return result; | 1039 | return result; |
| 1020 | } | 1040 | } |
| 1021 | 1041 | ||
diff --git a/unit_tests/scripts/coro/yielding_function.lua b/unit_tests/scripts/coro/yielding_function.lua index e7367ea..636f094 100644 --- a/unit_tests/scripts/coro/yielding_function.lua +++ b/unit_tests/scripts/coro/yielding_function.lua | |||
| @@ -17,13 +17,13 @@ local yielder = function(...) | |||
| 17 | local _ack = coroutine.yield(_val) | 17 | local _ack = coroutine.yield(_val) |
| 18 | assert(_ack == _i) | 18 | assert(_ack == _i) |
| 19 | end | 19 | end |
| 20 | return "done!" | 20 | return "bye!" |
| 21 | end | 21 | end |
| 22 | 22 | ||
| 23 | -------------------------------------------------------------------------------------------------- | 23 | -------------------------------------------------------------------------------------------------- |
| 24 | -- TEST: if we start a non-coroutine lane with a yielding function, we should get an error, right? | 24 | -- TEST: if we start a non-coroutine lane with a yielding function, we should get an error, right? |
| 25 | -------------------------------------------------------------------------------------------------- | 25 | -------------------------------------------------------------------------------------------------- |
| 26 | if true then | 26 | if false then |
| 27 | local fun_g = lanes.gen("*", { name = 'auto' }, yielder) | 27 | local fun_g = lanes.gen("*", { name = 'auto' }, yielder) |
| 28 | local h = fun_g("hello", "world", "!") | 28 | local h = fun_g("hello", "world", "!") |
| 29 | local err, status, stack = h:join() | 29 | local err, status, stack = h:join() |
| @@ -48,7 +48,7 @@ local coro_g = lanes.coro("*", {name = "auto"}, yielder) | |||
| 48 | ------------------------------------------------------------------------------------------------- | 48 | ------------------------------------------------------------------------------------------------- |
| 49 | -- TEST: we can resume as many times as the lane yields, then read the returned value on indexing | 49 | -- TEST: we can resume as many times as the lane yields, then read the returned value on indexing |
| 50 | ------------------------------------------------------------------------------------------------- | 50 | ------------------------------------------------------------------------------------------------- |
| 51 | if true then | 51 | if false then |
| 52 | -- launch coroutine lane | 52 | -- launch coroutine lane |
| 53 | local h = coro_g("hello", "world", "!") | 53 | local h = coro_g("hello", "world", "!") |
| 54 | -- read the yielded values, sending back the expected index | 54 | -- read the yielded values, sending back the expected index |
| @@ -57,13 +57,13 @@ if true then | |||
| 57 | assert(h:resume(3) == "!") | 57 | assert(h:resume(3) == "!") |
| 58 | -- the lane return value is available as usual | 58 | -- the lane return value is available as usual |
| 59 | local r = h[1] | 59 | local r = h[1] |
| 60 | assert(r == "done!") | 60 | assert(r == "bye!") |
| 61 | end | 61 | end |
| 62 | 62 | ||
| 63 | --------------------------------------------------------------------------------------------- | 63 | --------------------------------------------------------------------------------------------- |
| 64 | -- TEST: we can resume as many times as the lane yields, then read the returned value on join | 64 | -- TEST: we can resume as many times as the lane yields, then read the returned value on join |
| 65 | --------------------------------------------------------------------------------------------- | 65 | --------------------------------------------------------------------------------------------- |
| 66 | if true then | 66 | if false then |
| 67 | -- launch coroutine lane | 67 | -- launch coroutine lane |
| 68 | local h = coro_g("hello", "world", "!") | 68 | local h = coro_g("hello", "world", "!") |
| 69 | -- read the yielded values, sending back the expected index | 69 | -- read the yielded values, sending back the expected index |
| @@ -72,7 +72,7 @@ if true then | |||
| 72 | assert(h:resume(3) == "!") | 72 | assert(h:resume(3) == "!") |
| 73 | -- the lane return value is available as usual | 73 | -- the lane return value is available as usual |
| 74 | local s, r = h:join() | 74 | local s, r = h:join() |
| 75 | assert(h.status == "done" and s == true and r == "done!") | 75 | assert(h.status == "done" and s == true and r == "bye!") |
| 76 | end | 76 | end |
| 77 | 77 | ||
| 78 | --------------------------------------------------------------------------------------------------- | 78 | --------------------------------------------------------------------------------------------------- |
| @@ -83,23 +83,32 @@ if true then | |||
| 83 | local h = coro_g("hello", "world", "!") | 83 | local h = coro_g("hello", "world", "!") |
| 84 | -- read the first yielded value, sending back the expected index | 84 | -- read the first yielded value, sending back the expected index |
| 85 | assert(h:resume(1) == "hello") | 85 | assert(h:resume(1) == "hello") |
| 86 | -- join the lane. since it will reach a yield point, it remains suspended, and we should get a timeout | 86 | -- join the lane. since it will reach a yield point, it unblocks and ends. last yielded values are returned normally |
| 87 | local b, r = h:join(0.5) | 87 | local b, r = h:join(0.5) |
| 88 | local s = h.status | 88 | local s = h.status |
| 89 | assert(s == "suspended" and b == nil and r == "timeout", "got " .. s .. " " .. tostring(b) .. " " .. r) | 89 | assert(s == "done" and b == true and r == "world", "got " .. s .. " " .. tostring(b) .. " " .. tostring(r)) |
| 90 | -- trying to resume again should proceed normally, since nothing changed | ||
| 91 | assert(h:resume(2) == "world") | ||
| 92 | assert(h:resume(3) == "!") | ||
| 93 | -- the lane return value is available as usual | ||
| 94 | local s, r = h:join() | ||
| 95 | assert(h.status == "done" and s == true and r == "done!") | ||
| 96 | end | 90 | end |
| 97 | 91 | ||
| 98 | --------------------------------------------------------- | 92 | ----------------------------------------------------------------------- |
| 99 | -- TEST: if we index yielded lane, we should get an error | 93 | -- TEST: if we index yielded lane, we should get the last yielded value |
| 100 | --------------------------------------------------------- | 94 | ----------------------------------------------------------------------- |
| 101 | -- TODO: implement this test! | 95 | if false then |
| 96 | -- launch coroutine lane | ||
| 97 | local h = coro_g("hello", "world", "!") | ||
| 98 | -- read the first yielded value, sending back the expected index | ||
| 99 | assert(h:resume(1) == "hello") | ||
| 100 | -- indexing multiple times gives back the same us the same yielded value | ||
| 101 | local r1 = h[1] | ||
| 102 | local r2 = h[1] | ||
| 103 | local r3 = h[1] | ||
| 104 | assert(r1 == "world" and r2 == "world" and r3 == "world", "got " .. r1 .. " " .. r2 .. " " .. r3) | ||
| 105 | assert(h:resume(2) == "world") | ||
| 102 | 106 | ||
| 107 | -- THERE IS AN INCONSISTENCY: h:resume pulls the yielded values directly out of the lane's stack | ||
| 108 | -- but h[n] removes them and stores them in the internal values storage table | ||
| 109 | -- TODO: so we need to decide: should indexing a yielded lane work like resume()? | ||
| 110 | assert(h:resume(3) == "!") | ||
| 111 | end | ||
| 103 | 112 | ||
| 104 | ------------------------------------------------------------------------------------- | 113 | ------------------------------------------------------------------------------------- |
| 105 | -- TEST: if we close yielded lane, we can join it and get the last yielded values out | 114 | -- TEST: if we close yielded lane, we can join it and get the last yielded values out |
