From 13f7f505375f7c1afd3a7e479a64cc147501b01d Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Tue, 7 May 2024 17:56:10 +0200 Subject: Linda API changes * timeout clarifications (negative values are no longer accepted, use nil instead) * linda(send, linda.null, key, ...) removed, if you want to send a nil, just do it as usual --- src/cancel.cpp | 35 ++++++++++++++++++++--------------- src/cancel.h | 2 +- src/lanes.cpp | 46 ++++++++++++++++++++++++++-------------------- src/lanes.lua | 17 ++++++++++------- src/lanes_private.h | 2 +- src/linda.cpp | 25 +++++++++---------------- src/lindafactory.cpp | 1 + src/universe.cpp | 3 +-- 8 files changed, 69 insertions(+), 62 deletions(-) (limited to 'src') diff --git a/src/cancel.cpp b/src/cancel.cpp index dd848a7..fe1623b 100644 --- a/src/cancel.cpp +++ b/src/cancel.cpp @@ -107,7 +107,7 @@ LUAG_FUNC(cancel_test) // ################################################################################################# -[[nodiscard]] static CancelResult thread_cancel_soft(Lane* lane_, lua_Duration duration_, bool wakeLane_) +[[nodiscard]] static CancelResult thread_cancel_soft(Lane* lane_, std::chrono::time_point until_, bool wakeLane_) { lane_->cancelRequest = CancelRequest::Soft; // it's now signaled to stop // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own @@ -118,12 +118,12 @@ LUAG_FUNC(cancel_test) } } - return lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout; + return lane_->waitForCompletion(until_) ? CancelResult::Cancelled : CancelResult::Timeout; } // ################################################################################################# -[[nodiscard]] static CancelResult thread_cancel_hard(Lane* lane_, lua_Duration duration_, bool wakeLane_) +[[nodiscard]] static CancelResult thread_cancel_hard(Lane* lane_, std::chrono::time_point until_, bool wakeLane_) { lane_->cancelRequest = CancelRequest::Hard; // it's now signaled to stop // lane_->thread.get_stop_source().request_stop(); @@ -134,13 +134,13 @@ LUAG_FUNC(cancel_test) } } - CancelResult result{ lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout }; + CancelResult result{ lane_->waitForCompletion(until_) ? CancelResult::Cancelled : CancelResult::Timeout }; return result; } // ################################################################################################# -CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hookCount_, lua_Duration duration_, bool wakeLane_) +CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hookCount_, std::chrono::time_point until_, bool wakeLane_) { // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) @@ -152,12 +152,12 @@ CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hookCount_, lua_Durati // signal the linda the wake up the thread so that it can react to the cancel query // let us hope we never land here with a pointer on a linda that has been destroyed... if (op_ == CancelOp::Soft) { - return thread_cancel_soft(lane_, duration_, wakeLane_); + return thread_cancel_soft(lane_, until_, wakeLane_); } else if (static_cast(op_) > static_cast(CancelOp::Soft)) { lua_sethook(lane_->L, cancel_hook, static_cast(op_), hookCount_); } - return thread_cancel_hard(lane_, duration_, wakeLane_); + return thread_cancel_hard(lane_, until_, wakeLane_); } // ################################################################################################# @@ -200,7 +200,7 @@ CancelOp which_cancel_op(char const* opString_) // ################################################################################################# -// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lindas]) +// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lane]) LUAG_FUNC(thread_cancel) { Lane* const lane{ ToLane(L_, 1) }; @@ -215,14 +215,19 @@ LUAG_FUNC(thread_cancel) } } - lua_Duration wait_timeout{ 0.0 }; - if (lua_type(L_, 2) == LUA_TNUMBER) { - wait_timeout = lua_Duration{ lua_tonumber(L_, 2) }; - lua_remove(L_, 2); // argument is processed, remove it - if (wait_timeout.count() < 0.0) { - raise_luaL_error(L_, "cancel timeout cannot be < 0"); + std::chrono::time_point until{ std::chrono::time_point::max() }; + if (lua_type(L_, 2) == LUA_TNUMBER) { // we don't want to use lua_isnumber() because of autocoercion + lua_Duration const duration{ lua_tonumber(L_, 2) }; + if (duration.count() >= 0.0) { + until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration); + } else { + raise_luaL_argerror(L_, 2, "duration cannot be < 0"); } + lua_remove(L_, 2); // argument is processed, remove it + } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the key + lua_remove(L_, 2); // argument is processed, remove it } + // we wake by default in "hard" mode (remember that hook is hard too), but this can be turned off if desired bool wake_lane{ op != CancelOp::Soft }; if (lua_gettop(L_) >= 2) { @@ -233,7 +238,7 @@ LUAG_FUNC(thread_cancel) lua_remove(L_, 2); // argument is processed, remove it } STACK_CHECK_START_REL(L_, 0); - switch (thread_cancel(lane, op, hook_count, wait_timeout, wake_lane)) { + switch (thread_cancel(lane, op, hook_count, until, wake_lane)) { default: // should never happen unless we added a case and forgot to handle it LUA_ASSERT(L_, false); break; diff --git a/src/cancel.h b/src/cancel.h index 3df5252..1918df3 100644 --- a/src/cancel.h +++ b/src/cancel.h @@ -49,7 +49,7 @@ enum class CancelOp static constexpr UniqueKey kCancelError{ 0x0630345FEF912746ull, "lanes.cancel_error" }; // 'raise_cancel_error' sentinel [[nodiscard]] CancelOp which_cancel_op(char const* opString_); -[[nodiscard]] CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hookCount_, lua_Duration secs_, bool wakeLane_); +[[nodiscard]] CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hookCount_, std::chrono::time_point until_, bool wakeLane_); [[noreturn]] static inline void raise_cancel_error(lua_State* L_) { diff --git a/src/lanes.cpp b/src/lanes.cpp index 90f0f9f..d211b6a 100644 --- a/src/lanes.cpp +++ b/src/lanes.cpp @@ -169,17 +169,12 @@ Lane::Lane(Universe* U_, lua_State* L_) // ################################################################################################# -bool Lane::waitForCompletion(lua_Duration duration_) +bool Lane::waitForCompletion(std::chrono::time_point until_) { - std::chrono::time_point until{ std::chrono::time_point::max() }; - if (duration_.count() >= 0.0) { - until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration_); - } - std::unique_lock lock{ doneMutex }; // std::stop_token token{ thread.get_stop_token() }; // return doneCondVar.wait_until(lock, token, secs_, [this](){ return status >= Lane::Done; }); - return doneCondVar.wait_until(lock, until, [this]() { return status >= Lane::Done; }); + return doneCondVar.wait_until(lock, until_, [this]() { return status >= Lane::Done; }); } // ################################################################################################# @@ -1209,22 +1204,33 @@ void Lane::pushThreadStatus(lua_State* L_) LUAG_FUNC(thread_join) { Lane* const lane{ ToLane(L_, 1) }; - lua_Duration const duration{ luaL_optnumber(L_, 2, -1.0) }; lua_State* const L2{ lane->L }; - bool const done{ !lane->thread.joinable() || lane->waitForCompletion(duration) }; + std::chrono::time_point until{ std::chrono::time_point::max() }; + if (lua_type(L_, 2) == LUA_TNUMBER) { // we don't want to use lua_isnumber() because of autocoercion + lua_Duration const duration{ lua_tonumber(L_, 2) }; + if (duration.count() >= 0.0) { + until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration); + } else { + raise_luaL_argerror(L_, 2, "duration cannot be < 0"); + } + + } else if (!lua_isnoneornil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the key + raise_luaL_argerror(L_, 2, "incorrect duration type"); + } + + bool const done{ !lane->thread.joinable() || lane->waitForCompletion(until) }; + lua_settop(L_, 1); // L_: lane if (!done || !L2) { - STACK_GROW(L_, 2); - lua_pushnil(L_); // L_: lane timeout? nil - lua_pushliteral(L_, "timeout"); // L_: lane timeout? nil "timeout" + lua_pushnil(L_); // L_: lane nil + lua_pushliteral(L_, "timeout"); // L_: lane nil "timeout" return 2; } - STACK_CHECK_START_REL(L_, 0); + STACK_CHECK_START_REL(L_, 0); // L_: lane // Thread is Done/Error/Cancelled; all ours now int ret{ 0 }; - Universe* const U{ lane->U }; // debugName is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed // so store it in the userdata uservalue at a key that can't possibly collide lane->securizeDebugName(L_); @@ -1234,8 +1240,8 @@ LUAG_FUNC(thread_join) int const n{ lua_gettop(L2) }; // whole L2 stack if ( (n > 0) && - (InterCopyContext{ U, DestState{ L_ }, SourceState{ L2 }, {}, {}, {}, {}, {} }.inter_move(n) != InterCopyResult::Success) - ) { // L_: lane timeout? results L2: + (InterCopyContext{ lane->U, DestState{ L_ }, SourceState{ L2 }, {}, {}, {}, {}, {} }.inter_move(n) != InterCopyResult::Success) + ) { // L_: lane results L2: raise_luaL_error(L_, "tried to copy unsupported types"); } ret = n; @@ -1244,12 +1250,12 @@ LUAG_FUNC(thread_join) case Lane::Error: { - int const n{ lua_gettop(L2) }; // L_: lane timeout? L2: "err" [trace] + int const n{ lua_gettop(L2) }; // L_: lane L2: "err" [trace] STACK_GROW(L_, 3); - lua_pushnil(L_); // L_: lane timeout? nil + lua_pushnil(L_); // L_: lane nil // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ... - InterCopyContext c{ U, DestState{ L_ }, SourceState{ L2 }, {}, {}, {}, {}, {} }; - if (c.inter_move(n) != InterCopyResult::Success) { // L_: lane timeout? nil "err" [trace] L2: + InterCopyContext c{ lane->U, DestState{ L_ }, SourceState{ L2 }, {}, {}, {}, {}, {} }; + if (c.inter_move(n) != InterCopyResult::Success) { // L_: lane nil "err" [trace] L2: raise_luaL_error(L_, "tried to copy unsupported types: %s", lua_tostring(L_, -n)); } ret = 1 + n; diff --git a/src/lanes.lua b/src/lanes.lua index caa8818..0ab6661 100644 --- a/src/lanes.lua +++ b/src/lanes.lua @@ -51,16 +51,17 @@ local lanes = setmetatable({}, lanesMeta) -- and 'table' visible. -- local assert = assert(assert) +local error = assert(error) local io = assert(io) +local pairs = assert(pairs) local string_gmatch = assert(string.gmatch) local string_format = assert(string.format) local select = assert(select) local setmetatable = assert(setmetatable) local table_insert = assert(table.insert) -local type = assert(type) -local pairs = assert(pairs) +local tonumber = assert(tonumber) local tostring = assert(tostring) -local error = assert(error) +local type = assert(type) -- ################################################################################################# @@ -625,10 +626,12 @@ end -- -- PUBLIC LANES API local sleep = function(seconds_) - seconds_ = seconds_ or 0.0 -- this causes false and nil to be a valid input, equivalent to 0.0, but that's ok - if seconds_ == 'indefinitely' then - seconds_ = nil - elseif type(seconds_) ~= "number" then + local type = type(seconds_) + if type == "string" then + seconds_ = (seconds_ ~= 'indefinitely') and tonumber(seconds_) or nil + elseif type == "nil" then + seconds_ = 0 + elseif type ~= "number" then error("invalid duration " .. string_format("%q", tostring(seconds_))) end -- receive data on a channel no-one ever sends anything, thus blocking for the specified duration diff --git a/src/lanes_private.h b/src/lanes_private.h index 196a346..a756c42 100644 --- a/src/lanes_private.h +++ b/src/lanes_private.h @@ -94,7 +94,7 @@ class Lane Lane(Universe* U_, lua_State* L_); ~Lane(); - [[nodiscard]] bool waitForCompletion(lua_Duration duration_); + [[nodiscard]] bool waitForCompletion(std::chrono::time_point until_); void startThread(int priority_); void pushThreadStatus(lua_State* L_); void changeDebugName(int nameIdx_); diff --git a/src/linda.cpp b/src/linda.cpp index bbfbd69..40ef6c7 100644 --- a/src/linda.cpp +++ b/src/linda.cpp @@ -180,7 +180,7 @@ int Linda::ProtectedCall(lua_State* L_, lua_CFunction f_) // ################################################################################################# /* - * bool= linda_send( linda_ud, [timeout_secs=-1,] [linda.null,] key_num|str|bool|lightuserdata, ... ) + * bool= linda:linda_send([timeout_secs=nil,] key_num|str|bool|lightuserdata, ...) * * Send one or more values to a Linda. If there is a limit, all values must fit. * @@ -192,25 +192,21 @@ LUAG_FUNC(linda_send) { auto send = [](lua_State* L_) { Linda* const linda{ ToLinda(L_, 1) }; - std::chrono::time_point until{ std::chrono::time_point::max() }; int key_i{ 2 }; // index of first key, if timeout not there + std::chrono::time_point until{ std::chrono::time_point::max() }; if (lua_type(L_, 2) == LUA_TNUMBER) { // we don't want to use lua_isnumber() because of autocoercion lua_Duration const duration{ lua_tonumber(L_, 2) }; if (duration.count() >= 0.0) { until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration); + } else { + raise_luaL_argerror(L_, 2, "duration cannot be < 0"); } ++key_i; } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the key ++key_i; } - bool const as_nil_sentinel{ kNilSentinel.equals(L_, key_i) }; // if not nullptr, send() will silently send a single nil if nothing is provided - if (as_nil_sentinel) { - // the real key to send data to is after the kNilSentinel marker - ++key_i; - } - // make sure the key is of a valid type check_key_types(L_, key_i, key_i); @@ -218,12 +214,7 @@ LUAG_FUNC(linda_send) // make sure there is something to send if (lua_gettop(L_) == key_i) { - if (as_nil_sentinel) { - // send a single nil if nothing is provided - kNilSentinel.pushKey(L_); - } else { - raise_luaL_error(L_, "no data to send"); - } + raise_luaL_error(L_, "no data to send"); } // convert nils to some special non-nil sentinel in sent values @@ -322,7 +313,7 @@ LUAG_FUNC(linda_send) /* * 2 modes of operation - * [val, key]= linda_receive( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata [, ...] ) + * [val, key]= linda_receive( linda_ud, [timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) * Consumes a single value from the Linda, in any key. * Returns: received value (which is consumed from the slot), and the key which had it @@ -335,13 +326,15 @@ LUAG_FUNC(linda_receive) { auto receive = [](lua_State* L_) { Linda* const linda{ ToLinda(L_, 1) }; - std::chrono::time_point until{ std::chrono::time_point::max() }; int key_i{ 2 }; // index of first key, if timeout not there + std::chrono::time_point until{ std::chrono::time_point::max() }; if (lua_type(L_, 2) == LUA_TNUMBER) { // we don't want to use lua_isnumber() because of autocoercion lua_Duration const duration{ lua_tonumber(L_, 2) }; if (duration.count() >= 0.0) { until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration); + } else { + raise_luaL_argerror(L_, 2, "duration cannot be < 0"); } ++key_i; } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the key diff --git a/src/lindafactory.cpp b/src/lindafactory.cpp index 0ec5a0a..917d949 100644 --- a/src/lindafactory.cpp +++ b/src/lindafactory.cpp @@ -32,6 +32,7 @@ THE SOFTWARE. #include "lindafactory.h" +#include "lanes_private.h" #include "linda.h" // must be a #define instead of a constexpr to work with lua_pushliteral (until I templatize it) diff --git a/src/universe.cpp b/src/universe.cpp index 6adc314..becffdd 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -89,13 +89,12 @@ void Universe::terminateFreeRunningLanes(lua_State* L_, lua_Duration shutdownTim { std::lock_guard guard{ selfdestructMutex }; Lane* lane{ selfdestructFirst }; - lua_Duration timeout{ 1us }; while (lane != SELFDESTRUCT_END) { // attempt the requested cancel with a small timeout. // if waiting on a linda, they will raise a cancel_error. // if a cancellation hook is desired, it will be installed to try to raise an error if (lane->thread.joinable()) { - std::ignore = thread_cancel(lane, op_, 1, timeout, true); + std::ignore = thread_cancel(lane, op_, 1, std::chrono::steady_clock::now() + 1us, true); } lane = lane->selfdestruct_next; } -- cgit v1.2.3-55-g6feb