aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/intercopycontext.cpp6
-rw-r--r--src/keeper.cpp36
-rw-r--r--src/keeper.hpp13
-rw-r--r--src/lanes.lua5
-rw-r--r--src/linda.cpp461
-rw-r--r--src/linda.hpp5
-rw-r--r--src/lindafactory.cpp4
-rw-r--r--src/threading.cpp4
8 files changed, 258 insertions, 276 deletions
diff --git a/src/intercopycontext.cpp b/src/intercopycontext.cpp
index d6716a3..a93615b 100644
--- a/src/intercopycontext.cpp
+++ b/src/intercopycontext.cpp
@@ -770,7 +770,7 @@ bool InterCopyContext::tryCopyClonable() const
770 // we need to copy over the uservalues of the userdata as well 770 // we need to copy over the uservalues of the userdata as well
771 { 771 {
772 StackIndex const _mt{ luaG_absindex(L1, StackIndex{ -2 }) }; // L1: ... mt __lanesclone 772 StackIndex const _mt{ luaG_absindex(L1, StackIndex{ -2 }) }; // L1: ... mt __lanesclone
773 size_t const userdata_size{ lua_rawlen(L1, _L1_i) }; 773 auto const userdata_size{ static_cast<size_t>(lua_rawlen(L1, _L1_i)) }; // make 32-bits builds happy
774 // extract all the uservalues, but don't transfer them yet 774 // extract all the uservalues, but don't transfer them yet
775 UserValueCount const _nuv{ luaG_getalluservalues(L1, _L1_i) }; // L1: ... mt __lanesclone [uv]* 775 UserValueCount const _nuv{ luaG_getalluservalues(L1, _L1_i) }; // L1: ... mt __lanesclone [uv]*
776 // create the clone userdata with the required number of uservalue slots 776 // create the clone userdata with the required number of uservalue slots
@@ -930,7 +930,7 @@ bool InterCopyContext::interCopyFunction() const
930 _source = lua_touserdata(L1, -1); 930 _source = lua_touserdata(L1, -1);
931 void* _clone{ nullptr }; 931 void* _clone{ nullptr };
932 // get the number of bytes to allocate for the clone 932 // get the number of bytes to allocate for the clone
933 size_t const _userdata_size{ lua_rawlen(L1, kIdxTop) }; 933 auto const _userdata_size{ static_cast<size_t>(lua_rawlen(L1, kIdxTop)) }; // make 32-bits builds happy
934 { 934 {
935 // extract uservalues (don't transfer them yet) 935 // extract uservalues (don't transfer them yet)
936 UserValueCount const _nuv{ luaG_getalluservalues(L1, source_i) }; // L1: ... u [uv]* 936 UserValueCount const _nuv{ luaG_getalluservalues(L1, source_i) }; // L1: ... u [uv]*
@@ -994,7 +994,7 @@ bool InterCopyContext::interCopyLightuserdata() const
994 // recognize and print known UniqueKey names here 994 // recognize and print known UniqueKey names here
995 if constexpr (USE_DEBUG_SPEW()) { 995 if constexpr (USE_DEBUG_SPEW()) {
996 bool _found{ false }; 996 bool _found{ false };
997 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; 997 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel };
998 for (UniqueKey const& _key : kKeysToCheck) { 998 for (UniqueKey const& _key : kKeysToCheck) {
999 if (_key.equals(L1, L1_i)) { 999 if (_key.equals(L1, L1_i)) {
1000 DEBUGSPEW_CODE(DebugSpew(nullptr) << _key.debugName); 1000 DEBUGSPEW_CODE(DebugSpew(nullptr) << _key.debugName);
diff --git a/src/keeper.cpp b/src/keeper.cpp
index cad9207..c8c470f 100644
--- a/src/keeper.cpp
+++ b/src/keeper.cpp
@@ -811,22 +811,6 @@ KeeperCallResult keeper_call(KeeperState const K_, keeper_api_t const func_, lua
811// ################################################################################################# 811// #################################################################################################
812// ################################################################################################# 812// #################################################################################################
813 813
814void* Keeper::operator new[](size_t size_, Universe* U_) noexcept
815{
816 // size_ is the memory for the element count followed by the elements themselves
817 return U_->internalAllocator.alloc(size_);
818}
819
820// #################################################################################################
821
822// can't actually delete the operator because the compiler generates stack unwinding code that could call it in case of exception
823void Keeper::operator delete[](void* p_, Universe* U_)
824{
825 U_->internalAllocator.free(p_, *static_cast<size_t*>(p_) * sizeof(Keeper) + sizeof(size_t));
826}
827
828// #################################################################################################
829
830// only used by linda:dump() and linda:__towatch() for debugging purposes 814// only used by linda:dump() and linda:__towatch() for debugging purposes
831// table is populated as follows: 815// table is populated as follows:
832// { 816// {
@@ -927,7 +911,7 @@ void Keepers::DeleteKV::operator()(Keeper* const k_) const
927 for (auto& _k : std::span<Keeper>(k_, count)) { 911 for (auto& _k : std::span<Keeper>(k_, count)) {
928 _k.~Keeper(); 912 _k.~Keeper();
929 } 913 }
930 U->internalAllocator.free(k_, count * sizeof(Keeper)); 914 U.internalAllocator.free(k_, count * sizeof(Keeper));
931} 915}
932 916
933// ################################################################################################# 917// #################################################################################################
@@ -959,8 +943,8 @@ void Keepers::collectGarbage()
959 // when keeper N+1 is closed, object is GCed, linda operation is called, which attempts to acquire keeper N, whose Lua state no longer exists 943 // when keeper N+1 is closed, object is GCed, linda operation is called, which attempts to acquire keeper N, whose Lua state no longer exists
960 // in that case, the linda operation should do nothing. which means that these operations must check for keeper acquisition success 944 // in that case, the linda operation should do nothing. which means that these operations must check for keeper acquisition success
961 // which is early-outed with a keepers->nbKeepers null-check 945 // which is early-outed with a keepers->nbKeepers null-check
962 for (size_t const _i : std::ranges::iota_view{ size_t{ 0 }, _kv.nbKeepers }) { 946 for (Keeper& _k : std::span<Keeper>{ _kv.keepers.get(), _kv.nbKeepers }) {
963 _gcOneKeeper(_kv.keepers[_i]); 947 _gcOneKeeper(_k);
964 } 948 }
965 } 949 }
966} 950}
@@ -996,9 +980,8 @@ void Keepers::close()
996 // when keeper N+1 is closed, object is GCed, linda operation is called, which attempts to acquire keeper N, whose Lua state no longer exists 980 // when keeper N+1 is closed, object is GCed, linda operation is called, which attempts to acquire keeper N, whose Lua state no longer exists
997 // in that case, the linda operation should do nothing. which means that these operations must check for keeper acquisition success 981 // in that case, the linda operation should do nothing. which means that these operations must check for keeper acquisition success
998 // which is early-outed with a keepers->nbKeepers null-check 982 // which is early-outed with a keepers->nbKeepers null-check
999 size_t const _nbKeepers{ std::exchange(_kv.nbKeepers, size_t{ 0 }) }; 983 for (Keeper& _k : std::span<Keeper>{ _kv.keepers.get(), std::exchange(_kv.nbKeepers, size_t{ 0 }) }) {
1000 for (size_t const _i : std::ranges::iota_view{ size_t{ 0 }, _nbKeepers }) { 984 if (!_closeOneKeeper(_k)) {
1001 if (!_closeOneKeeper(_kv.keepers[_i])) {
1002 // detected partial init: destroy only the mutexes that got initialized properly 985 // detected partial init: destroy only the mutexes that got initialized properly
1003 break; 986 break;
1004 } 987 }
@@ -1137,11 +1120,14 @@ void Keepers::initialize(Universe& U_, lua_State* L_, size_t const nbKeepers_, i
1137 1120
1138 default: 1121 default:
1139 KV& _kv = keeper_array.emplace<KV>( 1122 KV& _kv = keeper_array.emplace<KV>(
1140 std::unique_ptr<Keeper[], DeleteKV>{ new(&U_) Keeper[nbKeepers_], DeleteKV{ &U_, nbKeepers_ } }, 1123 std::unique_ptr<Keeper, DeleteKV>{ static_cast<Keeper*>(U_.internalAllocator.alloc(sizeof(Keeper) * nbKeepers_)), DeleteKV{ U_, nbKeepers_ } },
1141 nbKeepers_ 1124 nbKeepers_
1142 ); 1125 );
1143 for (size_t const _i : std::ranges::iota_view{ size_t{ 0 }, nbKeepers_ }) { 1126 // fak. std::ranges::views::enumerate is c++23 (would help having item and index iterated over simultaneously)
1144 _initOneKeeper(_kv.keepers[_i], static_cast<int>(_i)); 1127 int _i{};
1128 for (Keeper& _k : std::span<Keeper>{ _kv.keepers.get(), nbKeepers_ }) {
1129 new (&_k) Keeper{};
1130 _initOneKeeper(_k, _i++);
1145 } 1131 }
1146 } 1132 }
1147} 1133}
diff --git a/src/keeper.hpp b/src/keeper.hpp
index 5cebe07..f1083b3 100644
--- a/src/keeper.hpp
+++ b/src/keeper.hpp
@@ -26,12 +26,6 @@ struct Keeper
26 std::mutex mutex; 26 std::mutex mutex;
27 KeeperState K{ static_cast<lua_State*>(nullptr) }; 27 KeeperState K{ static_cast<lua_State*>(nullptr) };
28 28
29 [[nodiscard]]
30 static void* operator new[](size_t size_, Universe* U_) noexcept;
31 // can't actually delete the operator because the compiler generates stack unwinding code that could call it in case of exception
32 static void operator delete[](void* p_, Universe* U_);
33
34
35 ~Keeper() = default; 29 ~Keeper() = default;
36 Keeper() = default; 30 Keeper() = default;
37 // non-copyable, non-movable 31 // non-copyable, non-movable
@@ -51,14 +45,15 @@ struct Keepers
51 private: 45 private:
52 struct DeleteKV 46 struct DeleteKV
53 { 47 {
54 Universe* U{}; 48 Universe& U;
55 size_t count{}; 49 size_t count{};
56 void operator()(Keeper* k_) const; 50 void operator()(Keeper* k_) const;
57 }; 51 };
58 // can't use std::vector<Keeper> because Keeper contains a mutex, so we need a raw memory buffer 52 // can't use std::unique_ptr<Keeper[]> because of interactions with placement new and custom deleters
53 // and I'm not using std::vector<Keeper> because I don't have an allocator to plug on the Universe (yet)
59 struct KV 54 struct KV
60 { 55 {
61 std::unique_ptr<Keeper[], DeleteKV> keepers; 56 std::unique_ptr<Keeper, DeleteKV> keepers;
62 size_t nbKeepers{}; 57 size_t nbKeepers{};
63 }; 58 };
64 std::variant<std::monostate, Keeper, KV> keeper_array; 59 std::variant<std::monostate, Keeper, KV> keeper_array;
diff --git a/src/lanes.lua b/src/lanes.lua
index 98f8c20..bd94a14 100644
--- a/src/lanes.lua
+++ b/src/lanes.lua
@@ -603,7 +603,6 @@ local configure_timers = function()
603 return next_wakeup -- may be 'nil' 603 return next_wakeup -- may be 'nil'
604 end -- check_timers() 604 end -- check_timers()
605 605
606 local timer_gateway_batched = timerLinda.batched
607 set_finalizer(function(err, stk) 606 set_finalizer(function(err, stk)
608 if err and type(err) ~= "userdata" then 607 if err and type(err) ~= "userdata" then
609 error("LanesTimer error: "..tostring(err)) 608 error("LanesTimer error: "..tostring(err))
@@ -628,7 +627,7 @@ local configure_timers = function()
628 627
629 if _timerKey == TGW_KEY then 628 if _timerKey == TGW_KEY then
630 assert(getmetatable(_what) == "Linda") -- '_what' should be a linda on which the client sets a timer 629 assert(getmetatable(_what) == "Linda") -- '_what' should be a linda on which the client sets a timer
631 local _, key, wakeup_at, period = timerLinda:receive(0, timer_gateway_batched, TGW_KEY, 3) 630 local _, key, wakeup_at, period = timerLinda:receive_batched(0, TGW_KEY, 3)
632 assert(key) 631 assert(key)
633 set_timer(_what, key, wakeup_at, period and period > 0 and period or nil) 632 set_timer(_what, key, wakeup_at, period and period > 0 and period or nil)
634 elseif _timerKey == TGW_QUERY then 633 elseif _timerKey == TGW_QUERY then
@@ -758,7 +757,7 @@ local genlock = function(linda_, key_, N)
758 -- 'nil' timeout allows 'key_' to be numeric 757 -- 'nil' timeout allows 'key_' to be numeric
759 return linda_:send(timeout, key_, trues(M_)) -- suspends until been able to push them 758 return linda_:send(timeout, key_, trues(M_)) -- suspends until been able to push them
760 else 759 else
761 local _k, _v = linda_:receive(nil, linda_.batched, key_, -M_) 760 local _k, _v = linda_:receive_batched(nil, key_, -M_)
762 -- propagate cancel_error if we got it, else return true or false 761 -- propagate cancel_error if we got it, else return true or false
763 return (_v == cancel_error and _v) or (_k and true or false) 762 return (_v == cancel_error and _v) or (_k and true or false)
764 end 763 end
diff --git a/src/linda.cpp b/src/linda.cpp
index a094a8f..0cdacfa 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -43,7 +43,6 @@ namespace {
43 // ############################################################################################# 43 // #############################################################################################
44 // ############################################################################################# 44 // #############################################################################################
45 45
46
47 static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_) 46 static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_)
48 { 47 {
49 STACK_CHECK_START_REL(L_, 0); 48 STACK_CHECK_START_REL(L_, 0);
@@ -63,7 +62,7 @@ namespace {
63 62
64 case LuaType::LIGHTUSERDATA: 63 case LuaType::LIGHTUSERDATA:
65 { 64 {
66 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; 65 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel };
67 for (UniqueKey const& _key : kKeysToCheck) { 66 for (UniqueKey const& _key : kKeysToCheck) {
68 if (_key.equals(L_, _i)) { 67 if (_key.equals(L_, _i)) {
69 raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); 68 raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data());
@@ -125,6 +124,165 @@ namespace {
125 } 124 }
126 125
127 // ############################################################################################# 126 // #############################################################################################
127
128 // a helper to process the timeout argument of linda:send() and linda:receive()
129 [[nodiscard]]
130 static auto ProcessTimeoutArg(lua_State* const L_)
131 {
132 StackIndex _key_i{ 2 }; // index of first slot, if timeout not there
133
134 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
135 if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion
136 lua_Duration const _duration{ lua_tonumber(L_, 2) };
137 if (_duration.count() >= 0.0) {
138 _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration);
139 } else {
140 raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0");
141 }
142 ++_key_i;
143 } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot
144 ++_key_i;
145 }
146 return std::make_pair(_key_i, _until);
147 }
148
149 // #############################################################################################
150
151 // the implementation for linda:receive() and linda:receive_batched()
152 static int ReceiveInternal(lua_State* const L_, bool const batched_)
153 {
154 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
155
156 auto const [_key_i, _until] = ProcessTimeoutArg(L_);
157
158 keeper_api_t _selected_keeper_receive{ nullptr };
159 int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 };
160 // are we in batched mode?
161 if (batched_) {
162 // make sure the keys are of a valid type
163 CheckKeyTypes(L_, _key_i, _key_i);
164 // receive multiple values from a single slot
165 _selected_keeper_receive = KEEPER_API(receive_batched);
166 // we expect a user-defined amount of return value
167 _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1);
168 if (_expected_pushed_min < 1) {
169 raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count");
170 }
171 _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min);
172 // don't forget to count the slot in addition to the values
173 ++_expected_pushed_min;
174 ++_expected_pushed_max;
175 if (_expected_pushed_min > _expected_pushed_max) {
176 raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error");
177 }
178 } else {
179 // make sure the keys are of a valid type
180 CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) });
181 // receive a single value, checking multiple slots
182 _selected_keeper_receive = KEEPER_API(receive);
183 // we expect a single (value, slot) pair of returned values
184 _expected_pushed_min = _expected_pushed_max = 2;
185 }
186
187 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
188 Keeper* const _keeper{ _linda->whichKeeper() };
189 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
190 if (_K == nullptr)
191 return 0;
192
193 CancelRequest _cancel{ CancelRequest::None };
194 KeeperCallResult _pushed{};
195
196 STACK_CHECK_START_REL(_K, 0);
197 for (bool _try_again{ true };;) {
198 if (_lane != nullptr) {
199 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed);
200 }
201 _cancel = (_cancel != CancelRequest::None)
202 ? _cancel
203 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
204 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
205 if (!_try_again || _cancel != CancelRequest::None) {
206 _pushed.emplace(0);
207 break;
208 }
209
210 // all arguments of receive() but the first are passed to the keeper's receive function
211 STACK_CHECK(_K, 0);
212 _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i);
213 if (!_pushed.has_value()) {
214 break;
215 }
216 if (_pushed.value() > 0) {
217 LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max);
218 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) {
219 raise_luaL_error(L_, "Key is restricted");
220 }
221 _linda->readHappened.notify_all();
222 break;
223 }
224
225 if (std::chrono::steady_clock::now() >= _until) {
226 break; /* instant timeout */
227 }
228
229 // nothing received, wait until timeout or signalled that we should try again
230 {
231 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
232 if (_lane != nullptr) {
233 // change status of lane to "waiting"
234 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
235 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
236 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
237 _lane->waiting_on = &_linda->writeHappened;
238 _lane->status.store(Lane::Waiting, std::memory_order_release);
239 }
240 // not enough data to read: wakeup when data was sent, or when timeout is reached
241 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
242 std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) };
243 _guard.release(); // we don't want to unlock the mutex on exit!
244 _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups
245 if (_lane != nullptr) {
246 _lane->waiting_on = nullptr;
247 _lane->status.store(_prev_status, std::memory_order_release);
248 }
249 }
250 }
251 STACK_CHECK(_K, 0);
252
253 if (!_pushed.has_value()) {
254 raise_luaL_error(L_, "tried to copy unsupported types");
255 }
256
257 switch (_cancel) {
258 case CancelRequest::None:
259 {
260 int const _nbPushed{ _pushed.value() };
261 if (_nbPushed == 0) {
262 // not enough data in the linda slot to fulfill the request, return nil, "timeout"
263 lua_pushnil(L_);
264 luaG_pushstring(L_, "timeout");
265 return 2;
266 }
267 return _nbPushed;
268 }
269
270 case CancelRequest::Soft:
271 // if user wants to soft-cancel, the call returns nil, kCancelError
272 lua_pushnil(L_);
273 kCancelError.pushKey(L_);
274 return 2;
275
276 case CancelRequest::Hard:
277 // raise an error interrupting execution only in case of hard cancel
278 raise_cancel_error(L_); // raises an error and doesn't return
279
280 default:
281 raise_luaL_error(L_, "internal error: unknown cancel request");
282 }
283 }
284
285 // #############################################################################################
128 // ############################################################################################# 286 // #############################################################################################
129} // namespace 287} // namespace
130// ################################################################################################# 288// #################################################################################################
@@ -630,163 +788,25 @@ LUAG_FUNC(linda_limit)
630// ################################################################################################# 788// #################################################################################################
631 789
632/* 790/*
633 * 2 modes of operation 791 * [val, slot] = linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] )
634 * [val, slot]= linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] )
635 * Consumes a single value from the Linda, in any slot. 792 * Consumes a single value from the Linda, in any slot.
636 * Returns: received value (which is consumed from the slot), and the slot which had it 793 * Returns: received value (which is consumed from the slot), and the slot which had it
637
638 * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
639 * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot.
640 * returns the actual consumed values, or nil if there weren't enough values to consume
641 */ 794 */
642LUAG_FUNC(linda_receive) 795LUAG_FUNC(linda_receive)
643{ 796{
644 static constexpr lua_CFunction _receive{ 797 return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, false); });
645 +[](lua_State* const L_) { 798}
646 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
647 StackIndex _key_i{ 2 }; // index of first slot, if timeout not there
648
649 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
650 if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion
651 lua_Duration const _duration{ lua_tonumber(L_, 2) };
652 if (_duration.count() >= 0.0) {
653 _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration);
654 } else {
655 raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0");
656 }
657 ++_key_i;
658 } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot
659 ++_key_i;
660 }
661
662 keeper_api_t _selected_keeper_receive{ nullptr };
663 int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 };
664 // are we in batched mode?
665 if (kLindaBatched.equals(L_, _key_i)) {
666 // no need to pass linda.batched in the keeper state
667 ++_key_i;
668 // make sure the keys are of a valid type
669 CheckKeyTypes(L_, _key_i, _key_i);
670 // receive multiple values from a single slot
671 _selected_keeper_receive = KEEPER_API(receive_batched);
672 // we expect a user-defined amount of return value
673 _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1);
674 if (_expected_pushed_min < 1) {
675 raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count");
676 }
677 _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min);
678 // don't forget to count the slot in addition to the values
679 ++_expected_pushed_min;
680 ++_expected_pushed_max;
681 if (_expected_pushed_min > _expected_pushed_max) {
682 raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error");
683 }
684 } else {
685 // make sure the keys are of a valid type
686 CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) });
687 // receive a single value, checking multiple slots
688 _selected_keeper_receive = KEEPER_API(receive);
689 // we expect a single (value, slot) pair of returned values
690 _expected_pushed_min = _expected_pushed_max = 2;
691 }
692
693 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
694 Keeper* const _keeper{ _linda->whichKeeper() };
695 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
696 if (_K == nullptr)
697 return 0;
698
699 CancelRequest _cancel{ CancelRequest::None };
700 KeeperCallResult _pushed{};
701 STACK_CHECK_START_REL(_K, 0);
702 for (bool _try_again{ true };;) {
703 if (_lane != nullptr) {
704 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed);
705 }
706 _cancel = (_cancel != CancelRequest::None)
707 ? _cancel
708 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
709 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
710 if (!_try_again || _cancel != CancelRequest::None) {
711 _pushed.emplace(0);
712 break;
713 }
714
715 // all arguments of receive() but the first are passed to the keeper's receive function
716 _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i);
717 if (!_pushed.has_value()) {
718 break;
719 }
720 if (_pushed.value() > 0) {
721 LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max);
722 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) {
723 raise_luaL_error(L_, "Key is restricted");
724 }
725 _linda->readHappened.notify_all();
726 break;
727 }
728
729 if (std::chrono::steady_clock::now() >= _until) {
730 break; /* instant timeout */
731 }
732
733 // nothing received, wait until timeout or signalled that we should try again
734 {
735 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
736 if (_lane != nullptr) {
737 // change status of lane to "waiting"
738 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
739 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
740 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
741 _lane->waiting_on = &_linda->writeHappened;
742 _lane->status.store(Lane::Waiting, std::memory_order_release);
743 }
744 // not enough data to read: wakeup when data was sent, or when timeout is reached
745 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
746 std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) };
747 _guard.release(); // we don't want to unlock the mutex on exit!
748 _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups
749 if (_lane != nullptr) {
750 _lane->waiting_on = nullptr;
751 _lane->status.store(_prev_status, std::memory_order_release);
752 }
753 }
754 }
755 STACK_CHECK(_K, 0);
756
757 if (!_pushed.has_value()) {
758 raise_luaL_error(L_, "tried to copy unsupported types");
759 }
760
761 switch (_cancel) {
762 case CancelRequest::None:
763 {
764 int const _nbPushed{ _pushed.value() };
765 if (_nbPushed == 0) {
766 // not enough data in the linda slot to fulfill the request, return nil, "timeout"
767 lua_pushnil(L_);
768 luaG_pushstring(L_, "timeout");
769 return 2;
770 }
771 return _nbPushed;
772 }
773
774 case CancelRequest::Soft:
775 // if user wants to soft-cancel, the call returns nil, kCancelError
776 lua_pushnil(L_);
777 kCancelError.pushKey(L_);
778 return 2;
779 799
780 case CancelRequest::Hard: 800// #################################################################################################
781 // raise an error interrupting execution only in case of hard cancel
782 raise_cancel_error(L_); // raises an error and doesn't return
783 801
784 default: 802/*
785 raise_luaL_error(L_, "internal error: unknown cancel request"); 803 * [val1, ... valCOUNT] = linda_receive_batched( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
786 } 804 * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot.
787 } 805 * returns the actual consumed values, or nil if there weren't enough values to consume
788 }; 806 */
789 return Linda::ProtectedCall(L_, _receive); 807LUAG_FUNC(linda_receive_batched)
808{
809 return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, true); });
790} 810}
791 811
792// ################################################################################################# 812// #################################################################################################
@@ -848,20 +868,8 @@ LUAG_FUNC(linda_send)
848 static constexpr lua_CFunction _send{ 868 static constexpr lua_CFunction _send{
849 +[](lua_State* const L_) { 869 +[](lua_State* const L_) {
850 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; 870 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
851 StackIndex _key_i{ 2 }; // index of first slot, if timeout not there
852 871
853 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; 872 auto const [_key_i, _until] = ProcessTimeoutArg(L_);
854 if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion
855 lua_Duration const _duration{ lua_tonumber(L_, 2) };
856 if (_duration.count() >= 0.0) {
857 _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration);
858 } else {
859 raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0");
860 }
861 ++_key_i;
862 } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot
863 ++_key_i;
864 }
865 873
866 // make sure the slot is of a valid type 874 // make sure the slot is of a valid type
867 CheckKeyTypes(L_, _key_i, _key_i); 875 CheckKeyTypes(L_, _key_i, _key_i);
@@ -873,78 +881,78 @@ LUAG_FUNC(linda_send)
873 raise_luaL_error(L_, "no data to send"); 881 raise_luaL_error(L_, "no data to send");
874 } 882 }
875 883
884 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
885 Keeper* const _keeper{ _linda->whichKeeper() };
886 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
887 if (_K == nullptr)
888 return 0;
889
876 bool _ret{ false }; 890 bool _ret{ false };
877 CancelRequest _cancel{ CancelRequest::None }; 891 CancelRequest _cancel{ CancelRequest::None };
878 KeeperCallResult _pushed; 892 KeeperCallResult _pushed{};
879 {
880 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
881 Keeper* const _keeper{ _linda->whichKeeper() };
882 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
883 if (_K == nullptr)
884 return 0;
885 893
886 STACK_CHECK_START_REL(_K, 0); 894 STACK_CHECK_START_REL(_K, 0);
887 for (bool _try_again{ true };;) { 895 for (bool _try_again{ true };;) {
888 if (_lane != nullptr) { 896 if (_lane != nullptr) {
889 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); 897 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed);
890 } 898 }
891 _cancel = (_cancel != CancelRequest::None) 899 _cancel = (_cancel != CancelRequest::None)
892 ? _cancel 900 ? _cancel
893 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); 901 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
894 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything 902 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
895 if (!_try_again || _cancel != CancelRequest::None) { 903 if (!_try_again || _cancel != CancelRequest::None) {
896 _pushed.emplace(0); 904 _pushed.emplace(0);
897 break; 905 break;
898 } 906 }
899 907
900 STACK_CHECK(_K, 0); 908 // all arguments of send() but the first are passed to the keeper's send function
901 _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i); 909 STACK_CHECK(_K, 0);
902 if (!_pushed.has_value()) { 910 _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i);
903 break; 911 if (!_pushed.has_value()) {
904 } 912 break;
905 LUA_ASSERT(L_, _pushed.value() == 1); 913 }
914 LUA_ASSERT(L_, _pushed.value() == 1);
906 915
907 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { 916 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) {
908 raise_luaL_error(L_, "Key is restricted"); 917 raise_luaL_error(L_, "Key is restricted");
909 } 918 }
910 _ret = lua_toboolean(L_, -1) ? true : false; 919 _ret = lua_toboolean(L_, -1) ? true : false;
911 lua_pop(L_, 1); 920 lua_pop(L_, 1);
912 921
913 if (_ret) { 922 if (_ret) {
914 // Wake up ALL waiting threads 923 // Wake up ALL waiting threads
915 _linda->writeHappened.notify_all(); 924 _linda->writeHappened.notify_all();
916 break; 925 break;
917 } 926 }
918 927
919 // instant timout to bypass the wait syscall 928 // instant timout to bypass the wait syscall
920 if (std::chrono::steady_clock::now() >= _until) { 929 if (std::chrono::steady_clock::now() >= _until) {
921 break; /* no wait; instant timeout */ 930 break; /* no wait; instant timeout */
922 } 931 }
923 932
924 // storage limit hit, wait until timeout or signalled that we should try again 933 // storage limit hit, wait until timeout or signalled that we should try again
925 { 934 {
926 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings 935 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
927 if (_lane != nullptr) { 936 if (_lane != nullptr) {
928 // change status of lane to "waiting" 937 // change status of lane to "waiting"
929 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely 938 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
930 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case 939 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
931 LUA_ASSERT(L_, _lane->waiting_on == nullptr); 940 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
932 _lane->waiting_on = &_linda->readHappened; 941 _lane->waiting_on = &_linda->readHappened;
933 _lane->status.store(Lane::Waiting, std::memory_order_release); 942 _lane->status.store(Lane::Waiting, std::memory_order_release);
934 } 943 }
935 // could not send because no room: wait until some data was read before trying again, or until timeout is reached 944 // could not send because no room: wait until some data was read before trying again, or until timeout is reached
936 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; 945 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
937 std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) }; 946 std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) };
938 _guard.release(); // we don't want to unlock the mutex on exit! 947 _guard.release(); // we don't want to unlock the mutex on exit!
939 _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups 948 _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
940 if (_lane != nullptr) { 949 if (_lane != nullptr) {
941 _lane->waiting_on = nullptr; 950 _lane->waiting_on = nullptr;
942 _lane->status.store(_prev_status, std::memory_order_release); 951 _lane->status.store(_prev_status, std::memory_order_release);
943 }
944 } 952 }
945 } 953 }
946 STACK_CHECK(_K, 0);
947 } 954 }
955 STACK_CHECK(_K, 0);
948 956
949 if (!_pushed.has_value()) { 957 if (!_pushed.has_value()) {
950 raise_luaL_error(L_, "tried to copy unsupported types"); 958 raise_luaL_error(L_, "tried to copy unsupported types");
@@ -1104,6 +1112,7 @@ namespace {
1104 { "get", LG_linda_get }, 1112 { "get", LG_linda_get },
1105 { "limit", LG_linda_limit }, 1113 { "limit", LG_linda_limit },
1106 { "receive", LG_linda_receive }, 1114 { "receive", LG_linda_receive },
1115 { "receive_batched", LG_linda_receive_batched },
1107 { "restrict", LG_linda_restrict }, 1116 { "restrict", LG_linda_restrict },
1108 { "send", LG_linda_send }, 1117 { "send", LG_linda_send },
1109 { "set", LG_linda_set }, 1118 { "set", LG_linda_set },
diff --git a/src/linda.hpp b/src/linda.hpp
index 01ca7e1..2d5c9dc 100644
--- a/src/linda.hpp
+++ b/src/linda.hpp
@@ -8,11 +8,6 @@ struct Keeper;
8 8
9// ################################################################################################# 9// #################################################################################################
10 10
11// xxh64 of string "kLindaBatched" generated at https://www.pelock.com/products/hash-calculator
12static constexpr UniqueKey kLindaBatched{ 0xB8234DF772646567ull, "linda.batched" };
13
14// #################################################################################################
15
16DECLARE_UNIQUE_TYPE(LindaGroup, int); 11DECLARE_UNIQUE_TYPE(LindaGroup, int);
17 12
18class Linda final 13class Linda final
diff --git a/src/lindafactory.cpp b/src/lindafactory.cpp
index 6e3f759..42d0984 100644
--- a/src/lindafactory.cpp
+++ b/src/lindafactory.cpp
@@ -53,10 +53,6 @@ void LindaFactory::createMetatable(lua_State* L_) const
53 // the linda functions 53 // the linda functions
54 luaG_registerlibfuncs(L_, mLindaMT); 54 luaG_registerlibfuncs(L_, mLindaMT);
55 55
56 // some constants
57 kLindaBatched.pushKey(L_); // L_: mt kLindaBatched
58 lua_setfield(L_, -2, "batched"); // L_: mt
59
60 kNilSentinel.pushKey(L_); // L_: mt kNilSentinel 56 kNilSentinel.pushKey(L_); // L_: mt kNilSentinel
61 lua_setfield(L_, -2, "null"); // L_: mt 57 lua_setfield(L_, -2, "null"); // L_: mt
62 58
diff --git a/src/threading.cpp b/src/threading.cpp
index 3e594ff..3435075 100644
--- a/src/threading.cpp
+++ b/src/threading.cpp
@@ -449,7 +449,9 @@ void THREAD_SETNAME(std::string_view const& name_)
449{ 449{
450 // exact API to set the thread name is platform-dependant 450 // exact API to set the thread name is platform-dependant
451 // if you need to fix the build, or if you know how to fill a hole, tell me (bnt.germain@gmail.com) so that I can submit the fix in github. 451 // if you need to fix the build, or if you know how to fill a hole, tell me (bnt.germain@gmail.com) so that I can submit the fix in github.
452#if defined PLATFORM_BSD && !defined __NetBSD__ 452#if defined PLATFORM_MINGW
453 pthread_setname_np(pthread_self(), name_.data());
454#elif defined PLATFORM_BSD && !defined __NetBSD__
453 pthread_set_name_np(pthread_self(), name_.data()); 455 pthread_set_name_np(pthread_self(), name_.data());
454#elif defined PLATFORM_BSD && defined __NetBSD__ 456#elif defined PLATFORM_BSD && defined __NetBSD__
455 pthread_setname_np(pthread_self(), "%s", (void*) name_.data()); 457 pthread_setname_np(pthread_self(), "%s", (void*) name_.data());