aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenoit Germain <benoit.germain@ubisoft.com>2025-04-15 18:21:41 +0200
committerBenoit Germain <benoit.germain@ubisoft.com>2025-04-15 18:21:41 +0200
commit1bff784b474261212a996ac9fc59389d53a69590 (patch)
tree5fb048bcc6963fabbce57a6f8e7ec351bed6f828 /src
parent706d064e17d19c888c6d1e4d404ad34ac808589c (diff)
downloadlanes-1bff784b474261212a996ac9fc59389d53a69590.tar.gz
lanes-1bff784b474261212a996ac9fc59389d53a69590.tar.bz2
lanes-1bff784b474261212a996ac9fc59389d53a69590.zip
Linda batched mode rework
* linda.batched special value is removed * new function linda:receive_batched
Diffstat (limited to '')
-rw-r--r--src/intercopycontext.cpp2
-rw-r--r--src/lanes.lua5
-rw-r--r--src/linda.cpp293
-rw-r--r--src/linda.hpp5
-rw-r--r--src/lindafactory.cpp4
5 files changed, 155 insertions, 154 deletions
diff --git a/src/intercopycontext.cpp b/src/intercopycontext.cpp
index 93a8160..a93615b 100644
--- a/src/intercopycontext.cpp
+++ b/src/intercopycontext.cpp
@@ -994,7 +994,7 @@ bool InterCopyContext::interCopyLightuserdata() const
994 // recognize and print known UniqueKey names here 994 // recognize and print known UniqueKey names here
995 if constexpr (USE_DEBUG_SPEW()) { 995 if constexpr (USE_DEBUG_SPEW()) {
996 bool _found{ false }; 996 bool _found{ false };
997 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; 997 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel };
998 for (UniqueKey const& _key : kKeysToCheck) { 998 for (UniqueKey const& _key : kKeysToCheck) {
999 if (_key.equals(L1, L1_i)) { 999 if (_key.equals(L1, L1_i)) {
1000 DEBUGSPEW_CODE(DebugSpew(nullptr) << _key.debugName); 1000 DEBUGSPEW_CODE(DebugSpew(nullptr) << _key.debugName);
diff --git a/src/lanes.lua b/src/lanes.lua
index 98f8c20..bd94a14 100644
--- a/src/lanes.lua
+++ b/src/lanes.lua
@@ -603,7 +603,6 @@ local configure_timers = function()
603 return next_wakeup -- may be 'nil' 603 return next_wakeup -- may be 'nil'
604 end -- check_timers() 604 end -- check_timers()
605 605
606 local timer_gateway_batched = timerLinda.batched
607 set_finalizer(function(err, stk) 606 set_finalizer(function(err, stk)
608 if err and type(err) ~= "userdata" then 607 if err and type(err) ~= "userdata" then
609 error("LanesTimer error: "..tostring(err)) 608 error("LanesTimer error: "..tostring(err))
@@ -628,7 +627,7 @@ local configure_timers = function()
628 627
629 if _timerKey == TGW_KEY then 628 if _timerKey == TGW_KEY then
630 assert(getmetatable(_what) == "Linda") -- '_what' should be a linda on which the client sets a timer 629 assert(getmetatable(_what) == "Linda") -- '_what' should be a linda on which the client sets a timer
631 local _, key, wakeup_at, period = timerLinda:receive(0, timer_gateway_batched, TGW_KEY, 3) 630 local _, key, wakeup_at, period = timerLinda:receive_batched(0, TGW_KEY, 3)
632 assert(key) 631 assert(key)
633 set_timer(_what, key, wakeup_at, period and period > 0 and period or nil) 632 set_timer(_what, key, wakeup_at, period and period > 0 and period or nil)
634 elseif _timerKey == TGW_QUERY then 633 elseif _timerKey == TGW_QUERY then
@@ -758,7 +757,7 @@ local genlock = function(linda_, key_, N)
758 -- 'nil' timeout allows 'key_' to be numeric 757 -- 'nil' timeout allows 'key_' to be numeric
759 return linda_:send(timeout, key_, trues(M_)) -- suspends until been able to push them 758 return linda_:send(timeout, key_, trues(M_)) -- suspends until been able to push them
760 else 759 else
761 local _k, _v = linda_:receive(nil, linda_.batched, key_, -M_) 760 local _k, _v = linda_:receive_batched(nil, key_, -M_)
762 -- propagate cancel_error if we got it, else return true or false 761 -- propagate cancel_error if we got it, else return true or false
763 return (_v == cancel_error and _v) or (_k and true or false) 762 return (_v == cancel_error and _v) or (_k and true or false)
764 end 763 end
diff --git a/src/linda.cpp b/src/linda.cpp
index 1119d71..0cdacfa 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -62,7 +62,7 @@ namespace {
62 62
63 case LuaType::LIGHTUSERDATA: 63 case LuaType::LIGHTUSERDATA:
64 { 64 {
65 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; 65 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel };
66 for (UniqueKey const& _key : kKeysToCheck) { 66 for (UniqueKey const& _key : kKeysToCheck) {
67 if (_key.equals(L_, _i)) { 67 if (_key.equals(L_, _i)) {
68 raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); 68 raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data());
@@ -123,6 +123,8 @@ namespace {
123 return 0; 123 return 0;
124 } 124 }
125 125
126 // #############################################################################################
127
126 // a helper to process the timeout argument of linda:send() and linda:receive() 128 // a helper to process the timeout argument of linda:send() and linda:receive()
127 [[nodiscard]] 129 [[nodiscard]]
128 static auto ProcessTimeoutArg(lua_State* const L_) 130 static auto ProcessTimeoutArg(lua_State* const L_)
@@ -145,6 +147,142 @@ namespace {
145 } 147 }
146 148
147 // ############################################################################################# 149 // #############################################################################################
150
151 // the implementation for linda:receive() and linda:receive_batched()
152 static int ReceiveInternal(lua_State* const L_, bool const batched_)
153 {
154 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
155
156 auto const [_key_i, _until] = ProcessTimeoutArg(L_);
157
158 keeper_api_t _selected_keeper_receive{ nullptr };
159 int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 };
160 // are we in batched mode?
161 if (batched_) {
162 // make sure the keys are of a valid type
163 CheckKeyTypes(L_, _key_i, _key_i);
164 // receive multiple values from a single slot
165 _selected_keeper_receive = KEEPER_API(receive_batched);
166 // we expect a user-defined amount of return value
167 _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1);
168 if (_expected_pushed_min < 1) {
169 raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count");
170 }
171 _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min);
172 // don't forget to count the slot in addition to the values
173 ++_expected_pushed_min;
174 ++_expected_pushed_max;
175 if (_expected_pushed_min > _expected_pushed_max) {
176 raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error");
177 }
178 } else {
179 // make sure the keys are of a valid type
180 CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) });
181 // receive a single value, checking multiple slots
182 _selected_keeper_receive = KEEPER_API(receive);
183 // we expect a single (value, slot) pair of returned values
184 _expected_pushed_min = _expected_pushed_max = 2;
185 }
186
187 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
188 Keeper* const _keeper{ _linda->whichKeeper() };
189 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
190 if (_K == nullptr)
191 return 0;
192
193 CancelRequest _cancel{ CancelRequest::None };
194 KeeperCallResult _pushed{};
195
196 STACK_CHECK_START_REL(_K, 0);
197 for (bool _try_again{ true };;) {
198 if (_lane != nullptr) {
199 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed);
200 }
201 _cancel = (_cancel != CancelRequest::None)
202 ? _cancel
203 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
204 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
205 if (!_try_again || _cancel != CancelRequest::None) {
206 _pushed.emplace(0);
207 break;
208 }
209
210 // all arguments of receive() but the first are passed to the keeper's receive function
211 STACK_CHECK(_K, 0);
212 _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i);
213 if (!_pushed.has_value()) {
214 break;
215 }
216 if (_pushed.value() > 0) {
217 LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max);
218 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) {
219 raise_luaL_error(L_, "Key is restricted");
220 }
221 _linda->readHappened.notify_all();
222 break;
223 }
224
225 if (std::chrono::steady_clock::now() >= _until) {
226 break; /* instant timeout */
227 }
228
229 // nothing received, wait until timeout or signalled that we should try again
230 {
231 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
232 if (_lane != nullptr) {
233 // change status of lane to "waiting"
234 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
235 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
236 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
237 _lane->waiting_on = &_linda->writeHappened;
238 _lane->status.store(Lane::Waiting, std::memory_order_release);
239 }
240 // not enough data to read: wakeup when data was sent, or when timeout is reached
241 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
242 std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) };
243 _guard.release(); // we don't want to unlock the mutex on exit!
244 _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups
245 if (_lane != nullptr) {
246 _lane->waiting_on = nullptr;
247 _lane->status.store(_prev_status, std::memory_order_release);
248 }
249 }
250 }
251 STACK_CHECK(_K, 0);
252
253 if (!_pushed.has_value()) {
254 raise_luaL_error(L_, "tried to copy unsupported types");
255 }
256
257 switch (_cancel) {
258 case CancelRequest::None:
259 {
260 int const _nbPushed{ _pushed.value() };
261 if (_nbPushed == 0) {
262 // not enough data in the linda slot to fulfill the request, return nil, "timeout"
263 lua_pushnil(L_);
264 luaG_pushstring(L_, "timeout");
265 return 2;
266 }
267 return _nbPushed;
268 }
269
270 case CancelRequest::Soft:
271 // if user wants to soft-cancel, the call returns nil, kCancelError
272 lua_pushnil(L_);
273 kCancelError.pushKey(L_);
274 return 2;
275
276 case CancelRequest::Hard:
277 // raise an error interrupting execution only in case of hard cancel
278 raise_cancel_error(L_); // raises an error and doesn't return
279
280 default:
281 raise_luaL_error(L_, "internal error: unknown cancel request");
282 }
283 }
284
285 // #############################################################################################
148 // ############################################################################################# 286 // #############################################################################################
149} // namespace 287} // namespace
150// ################################################################################################# 288// #################################################################################################
@@ -650,153 +788,25 @@ LUAG_FUNC(linda_limit)
650// ################################################################################################# 788// #################################################################################################
651 789
652/* 790/*
653 * 2 modes of operation 791 * [val, slot] = linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] )
654 * [val, slot]= linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] )
655 * Consumes a single value from the Linda, in any slot. 792 * Consumes a single value from the Linda, in any slot.
656 * Returns: received value (which is consumed from the slot), and the slot which had it 793 * Returns: received value (which is consumed from the slot), and the slot which had it
657
658 * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
659 * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot.
660 * returns the actual consumed values, or nil if there weren't enough values to consume
661 */ 794 */
662LUAG_FUNC(linda_receive) 795LUAG_FUNC(linda_receive)
663{ 796{
664 static constexpr lua_CFunction _receive{ 797 return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, false); });
665 +[](lua_State* const L_) { 798}
666 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
667
668 auto [_key_i, _until] = ProcessTimeoutArg(L_);
669
670 keeper_api_t _selected_keeper_receive{ nullptr };
671 int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 };
672 // are we in batched mode?
673 if (kLindaBatched.equals(L_, _key_i)) {
674 // no need to pass linda.batched in the keeper state
675 ++_key_i;
676 // make sure the keys are of a valid type
677 CheckKeyTypes(L_, _key_i, _key_i);
678 // receive multiple values from a single slot
679 _selected_keeper_receive = KEEPER_API(receive_batched);
680 // we expect a user-defined amount of return value
681 _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1);
682 if (_expected_pushed_min < 1) {
683 raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count");
684 }
685 _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min);
686 // don't forget to count the slot in addition to the values
687 ++_expected_pushed_min;
688 ++_expected_pushed_max;
689 if (_expected_pushed_min > _expected_pushed_max) {
690 raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error");
691 }
692 } else {
693 // make sure the keys are of a valid type
694 CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) });
695 // receive a single value, checking multiple slots
696 _selected_keeper_receive = KEEPER_API(receive);
697 // we expect a single (value, slot) pair of returned values
698 _expected_pushed_min = _expected_pushed_max = 2;
699 }
700
701 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
702 Keeper* const _keeper{ _linda->whichKeeper() };
703 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
704 if (_K == nullptr)
705 return 0;
706
707 CancelRequest _cancel{ CancelRequest::None };
708 KeeperCallResult _pushed{};
709
710 STACK_CHECK_START_REL(_K, 0);
711 for (bool _try_again{ true };;) {
712 if (_lane != nullptr) {
713 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed);
714 }
715 _cancel = (_cancel != CancelRequest::None)
716 ? _cancel
717 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
718 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
719 if (!_try_again || _cancel != CancelRequest::None) {
720 _pushed.emplace(0);
721 break;
722 }
723
724 // all arguments of receive() but the first are passed to the keeper's receive function
725 STACK_CHECK(_K, 0);
726 _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i);
727 if (!_pushed.has_value()) {
728 break;
729 }
730 if (_pushed.value() > 0) {
731 LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max);
732 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) {
733 raise_luaL_error(L_, "Key is restricted");
734 }
735 _linda->readHappened.notify_all();
736 break;
737 }
738
739 if (std::chrono::steady_clock::now() >= _until) {
740 break; /* instant timeout */
741 }
742
743 // nothing received, wait until timeout or signalled that we should try again
744 {
745 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
746 if (_lane != nullptr) {
747 // change status of lane to "waiting"
748 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
749 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
750 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
751 _lane->waiting_on = &_linda->writeHappened;
752 _lane->status.store(Lane::Waiting, std::memory_order_release);
753 }
754 // not enough data to read: wakeup when data was sent, or when timeout is reached
755 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
756 std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) };
757 _guard.release(); // we don't want to unlock the mutex on exit!
758 _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups
759 if (_lane != nullptr) {
760 _lane->waiting_on = nullptr;
761 _lane->status.store(_prev_status, std::memory_order_release);
762 }
763 }
764 }
765 STACK_CHECK(_K, 0);
766
767 if (!_pushed.has_value()) {
768 raise_luaL_error(L_, "tried to copy unsupported types");
769 }
770
771 switch (_cancel) {
772 case CancelRequest::None:
773 {
774 int const _nbPushed{ _pushed.value() };
775 if (_nbPushed == 0) {
776 // not enough data in the linda slot to fulfill the request, return nil, "timeout"
777 lua_pushnil(L_);
778 luaG_pushstring(L_, "timeout");
779 return 2;
780 }
781 return _nbPushed;
782 }
783
784 case CancelRequest::Soft:
785 // if user wants to soft-cancel, the call returns nil, kCancelError
786 lua_pushnil(L_);
787 kCancelError.pushKey(L_);
788 return 2;
789 799
790 case CancelRequest::Hard: 800// #################################################################################################
791 // raise an error interrupting execution only in case of hard cancel
792 raise_cancel_error(L_); // raises an error and doesn't return
793 801
794 default: 802/*
795 raise_luaL_error(L_, "internal error: unknown cancel request"); 803 * [val1, ... valCOUNT] = linda_receive_batched( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
796 } 804 * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot.
797 } 805 * returns the actual consumed values, or nil if there weren't enough values to consume
798 }; 806 */
799 return Linda::ProtectedCall(L_, _receive); 807LUAG_FUNC(linda_receive_batched)
808{
809 return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, true); });
800} 810}
801 811
802// ################################################################################################# 812// #################################################################################################
@@ -1102,6 +1112,7 @@ namespace {
1102 { "get", LG_linda_get }, 1112 { "get", LG_linda_get },
1103 { "limit", LG_linda_limit }, 1113 { "limit", LG_linda_limit },
1104 { "receive", LG_linda_receive }, 1114 { "receive", LG_linda_receive },
1115 { "receive_batched", LG_linda_receive_batched },
1105 { "restrict", LG_linda_restrict }, 1116 { "restrict", LG_linda_restrict },
1106 { "send", LG_linda_send }, 1117 { "send", LG_linda_send },
1107 { "set", LG_linda_set }, 1118 { "set", LG_linda_set },
diff --git a/src/linda.hpp b/src/linda.hpp
index 01ca7e1..2d5c9dc 100644
--- a/src/linda.hpp
+++ b/src/linda.hpp
@@ -8,11 +8,6 @@ struct Keeper;
8 8
9// ################################################################################################# 9// #################################################################################################
10 10
11// xxh64 of string "kLindaBatched" generated at https://www.pelock.com/products/hash-calculator
12static constexpr UniqueKey kLindaBatched{ 0xB8234DF772646567ull, "linda.batched" };
13
14// #################################################################################################
15
16DECLARE_UNIQUE_TYPE(LindaGroup, int); 11DECLARE_UNIQUE_TYPE(LindaGroup, int);
17 12
18class Linda final 13class Linda final
diff --git a/src/lindafactory.cpp b/src/lindafactory.cpp
index 6e3f759..42d0984 100644
--- a/src/lindafactory.cpp
+++ b/src/lindafactory.cpp
@@ -53,10 +53,6 @@ void LindaFactory::createMetatable(lua_State* L_) const
53 // the linda functions 53 // the linda functions
54 luaG_registerlibfuncs(L_, mLindaMT); 54 luaG_registerlibfuncs(L_, mLindaMT);
55 55
56 // some constants
57 kLindaBatched.pushKey(L_); // L_: mt kLindaBatched
58 lua_setfield(L_, -2, "batched"); // L_: mt
59
60 kNilSentinel.pushKey(L_); // L_: mt kNilSentinel 56 kNilSentinel.pushKey(L_); // L_: mt kNilSentinel
61 lua_setfield(L_, -2, "null"); // L_: mt 57 lua_setfield(L_, -2, "null"); // L_: mt
62 58