diff options
author | Benoit Germain <benoit.germain@ubisoft.com> | 2025-04-15 18:21:41 +0200 |
---|---|---|
committer | Benoit Germain <benoit.germain@ubisoft.com> | 2025-04-15 18:21:41 +0200 |
commit | 1bff784b474261212a996ac9fc59389d53a69590 (patch) | |
tree | 5fb048bcc6963fabbce57a6f8e7ec351bed6f828 | |
parent | 706d064e17d19c888c6d1e4d404ad34ac808589c (diff) | |
download | lanes-1bff784b474261212a996ac9fc59389d53a69590.tar.gz lanes-1bff784b474261212a996ac9fc59389d53a69590.tar.bz2 lanes-1bff784b474261212a996ac9fc59389d53a69590.zip |
Linda batched mode rework
* linda.batched special value is removed
* new function linda:receive_batched
Diffstat (limited to '')
-rw-r--r-- | CHANGES | 1 | ||||
-rw-r--r-- | docs/index.html | 10 | ||||
-rw-r--r-- | src/intercopycontext.cpp | 2 | ||||
-rw-r--r-- | src/lanes.lua | 5 | ||||
-rw-r--r-- | src/linda.cpp | 293 | ||||
-rw-r--r-- | src/linda.hpp | 5 | ||||
-rw-r--r-- | src/lindafactory.cpp | 4 | ||||
-rw-r--r-- | tests/basic.lua | 4 | ||||
-rw-r--r-- | tests/errhangtest.lua | 9 | ||||
-rw-r--r-- | tests/linda_perf.lua | 8 | ||||
-rw-r--r-- | tests/protect_allocator.lua | 2 | ||||
-rw-r--r-- | unit_tests/linda_tests.cpp | 1 | ||||
-rw-r--r-- | unit_tests/scripts/lane/tasking_cancelling.lua | 2 | ||||
-rw-r--r-- | unit_tests/scripts/lane/tasking_communications.lua | 2 |
14 files changed, 174 insertions, 174 deletions
@@ -45,6 +45,7 @@ CHANGE 2: BGe 27-Nov-24 | |||
45 | - Providing "auto" as name when constructing a Linda cause Lanes to provide a name built from the source location of the construction. | 45 | - Providing "auto" as name when constructing a Linda cause Lanes to provide a name built from the source location of the construction. |
46 | - Specifying a group to lanes.linda() is mandatory when Lanes is configured with user Keepers. | 46 | - Specifying a group to lanes.linda() is mandatory when Lanes is configured with user Keepers. |
47 | - linda:deep() result no longer contains the raw C pointer of the Linda object. | 47 | - linda:deep() result no longer contains the raw C pointer of the Linda object. |
48 | - new function linda:receive_batched() to replace linda:receive(linda.batched). linda.batched special value is removed. | ||
48 | - linda :receive(), :send(), :get(), :set(), :limit() return nil, error in case of problem. Returned values in case of success change too. | 49 | - linda :receive(), :send(), :get(), :set(), :limit() return nil, error in case of problem. Returned values in case of success change too. |
49 | - linda:limit() can be used to read the value if no new limit is provided. | 50 | - linda:limit() can be used to read the value if no new limit is provided. |
50 | - linda:restrict() can restrain the use of send/receive or set/get on any key. | 51 | - linda:restrict() can restrain the use of send/receive or set/get on any key. |
diff --git a/docs/index.html b/docs/index.html index 8826b57..4dd5848 100644 --- a/docs/index.html +++ b/docs/index.html | |||
@@ -1107,7 +1107,7 @@ | |||
1107 | b = f() | 1107 | b = f() |
1108 | c = f() | 1108 | c = f() |
1109 | 1109 | ||
1110 | sync_linda:receive(nil, sync_linda.batched, "done", 3) -- wait for 3 lanes to write something in "done" slot of sync_linda | 1110 | sync_linda:receive_batched(nil, "done", 3) -- wait for 3 lanes to write something in "done" slot of sync_linda |
1111 | </pre></td></tr></table> | 1111 | </pre></td></tr></table> |
1112 | 1112 | ||
1113 | <!-- cancelling +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ --> | 1113 | <!-- cancelling +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ --> |
@@ -1260,7 +1260,7 @@ | |||
1260 | <li>Two producer-side methods: <tt>:send</tt> and <tt>:set</tt> (not out).</li> | 1260 | <li>Two producer-side methods: <tt>:send</tt> and <tt>:set</tt> (not out).</li> |
1261 | <li><tt>send</tt> allows for sending multiple values -atomically- to a given slot.</li> | 1261 | <li><tt>send</tt> allows for sending multiple values -atomically- to a given slot.</li> |
1262 | <li><tt>receive</tt> can wait for multiple slots at once.</li> | 1262 | <li><tt>receive</tt> can wait for multiple slots at once.</li> |
1263 | <li><tt>receive</tt> has a batched mode to consume more than one value from a single slot, as in <tt>linda:receive(1.0, linda.batched, "slot", 3, 6).</tt></li> | 1263 | <li><tt>receive_batched</tt> can be used to consume more than one value from a single slot, as in <tt>linda:receive_batched(1.0, "slot", 3, 6).</tt></li> |
1264 | <li><tt>restrict</tt> can restrain a particular slot to function either with <tt>send/receive</tt> or <tt>set/get</tt>.</li> | 1264 | <li><tt>restrict</tt> can restrain a particular slot to function either with <tt>send/receive</tt> or <tt>set/get</tt>.</li> |
1265 | <li>Individual slots' queue length can be limited, balancing speed differences in a producer/consumer scenario (making <tt>:send</tt> wait).</li> | 1265 | <li>Individual slots' queue length can be limited, balancing speed differences in a producer/consumer scenario (making <tt>:send</tt> wait).</li> |
1266 | <li><tt>tostring(linda)</tt> returns a string of the form <tt>"Linda: <opt_name>"</tt></li> | 1266 | <li><tt>tostring(linda)</tt> returns a string of the form <tt>"Linda: <opt_name>"</tt></li> |
@@ -1349,12 +1349,12 @@ | |||
1349 | <table border="1" bgcolor="#E0E0FF" cellpadding="10" style="width:50%"><tr><td><pre> | 1349 | <table border="1" bgcolor="#E0E0FF" cellpadding="10" style="width:50%"><tr><td><pre> |
1350 | slot, val = h:receive([timeout_secs,] slot [, slot...]) | 1350 | slot, val = h:receive([timeout_secs,] slot [, slot...]) |
1351 | 1351 | ||
1352 | slot, val [, val...] = h:receive([timeout,] h.batched, slot, n_uint_min[, n_uint_max]) | 1352 | slot, val [, val...] = h:receive_batched([timeout,] slot, n_uint_min[, n_uint_max]) |
1353 | </pre></td></tr></table> | 1353 | </pre></td></tr></table> |
1354 | 1354 | ||
1355 | <p> | 1355 | <p> |
1356 | <tt>receive()</tt> raises an error if called when a restriction forbids its use on any provided slot.<br /> | 1356 | <tt>receive()</tt> and <tt>receive_batched()</tt> raise an error if called when a restriction forbids their use on any provided slot.<br /> |
1357 | In batched mode, <tt>receive()</tt> will raise an error if <tt>min_count < 1</tt> or <tt>max_count < min_count</tt>. | 1357 | <tt>receive_batched()</tt> will raise an error if <tt>min_count < 1</tt> or <tt>max_count < min_count</tt>. |
1358 | </p> | 1358 | </p> |
1359 | 1359 | ||
1360 | <p> | 1360 | <p> |
diff --git a/src/intercopycontext.cpp b/src/intercopycontext.cpp index 93a8160..a93615b 100644 --- a/src/intercopycontext.cpp +++ b/src/intercopycontext.cpp | |||
@@ -994,7 +994,7 @@ bool InterCopyContext::interCopyLightuserdata() const | |||
994 | // recognize and print known UniqueKey names here | 994 | // recognize and print known UniqueKey names here |
995 | if constexpr (USE_DEBUG_SPEW()) { | 995 | if constexpr (USE_DEBUG_SPEW()) { |
996 | bool _found{ false }; | 996 | bool _found{ false }; |
997 | static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; | 997 | static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel }; |
998 | for (UniqueKey const& _key : kKeysToCheck) { | 998 | for (UniqueKey const& _key : kKeysToCheck) { |
999 | if (_key.equals(L1, L1_i)) { | 999 | if (_key.equals(L1, L1_i)) { |
1000 | DEBUGSPEW_CODE(DebugSpew(nullptr) << _key.debugName); | 1000 | DEBUGSPEW_CODE(DebugSpew(nullptr) << _key.debugName); |
diff --git a/src/lanes.lua b/src/lanes.lua index 98f8c20..bd94a14 100644 --- a/src/lanes.lua +++ b/src/lanes.lua | |||
@@ -603,7 +603,6 @@ local configure_timers = function() | |||
603 | return next_wakeup -- may be 'nil' | 603 | return next_wakeup -- may be 'nil' |
604 | end -- check_timers() | 604 | end -- check_timers() |
605 | 605 | ||
606 | local timer_gateway_batched = timerLinda.batched | ||
607 | set_finalizer(function(err, stk) | 606 | set_finalizer(function(err, stk) |
608 | if err and type(err) ~= "userdata" then | 607 | if err and type(err) ~= "userdata" then |
609 | error("LanesTimer error: "..tostring(err)) | 608 | error("LanesTimer error: "..tostring(err)) |
@@ -628,7 +627,7 @@ local configure_timers = function() | |||
628 | 627 | ||
629 | if _timerKey == TGW_KEY then | 628 | if _timerKey == TGW_KEY then |
630 | assert(getmetatable(_what) == "Linda") -- '_what' should be a linda on which the client sets a timer | 629 | assert(getmetatable(_what) == "Linda") -- '_what' should be a linda on which the client sets a timer |
631 | local _, key, wakeup_at, period = timerLinda:receive(0, timer_gateway_batched, TGW_KEY, 3) | 630 | local _, key, wakeup_at, period = timerLinda:receive_batched(0, TGW_KEY, 3) |
632 | assert(key) | 631 | assert(key) |
633 | set_timer(_what, key, wakeup_at, period and period > 0 and period or nil) | 632 | set_timer(_what, key, wakeup_at, period and period > 0 and period or nil) |
634 | elseif _timerKey == TGW_QUERY then | 633 | elseif _timerKey == TGW_QUERY then |
@@ -758,7 +757,7 @@ local genlock = function(linda_, key_, N) | |||
758 | -- 'nil' timeout allows 'key_' to be numeric | 757 | -- 'nil' timeout allows 'key_' to be numeric |
759 | return linda_:send(timeout, key_, trues(M_)) -- suspends until been able to push them | 758 | return linda_:send(timeout, key_, trues(M_)) -- suspends until been able to push them |
760 | else | 759 | else |
761 | local _k, _v = linda_:receive(nil, linda_.batched, key_, -M_) | 760 | local _k, _v = linda_:receive_batched(nil, key_, -M_) |
762 | -- propagate cancel_error if we got it, else return true or false | 761 | -- propagate cancel_error if we got it, else return true or false |
763 | return (_v == cancel_error and _v) or (_k and true or false) | 762 | return (_v == cancel_error and _v) or (_k and true or false) |
764 | end | 763 | end |
diff --git a/src/linda.cpp b/src/linda.cpp index 1119d71..0cdacfa 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -62,7 +62,7 @@ namespace { | |||
62 | 62 | ||
63 | case LuaType::LIGHTUSERDATA: | 63 | case LuaType::LIGHTUSERDATA: |
64 | { | 64 | { |
65 | static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; | 65 | static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel }; |
66 | for (UniqueKey const& _key : kKeysToCheck) { | 66 | for (UniqueKey const& _key : kKeysToCheck) { |
67 | if (_key.equals(L_, _i)) { | 67 | if (_key.equals(L_, _i)) { |
68 | raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); | 68 | raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); |
@@ -123,6 +123,8 @@ namespace { | |||
123 | return 0; | 123 | return 0; |
124 | } | 124 | } |
125 | 125 | ||
126 | // ############################################################################################# | ||
127 | |||
126 | // a helper to process the timeout argument of linda:send() and linda:receive() | 128 | // a helper to process the timeout argument of linda:send() and linda:receive() |
127 | [[nodiscard]] | 129 | [[nodiscard]] |
128 | static auto ProcessTimeoutArg(lua_State* const L_) | 130 | static auto ProcessTimeoutArg(lua_State* const L_) |
@@ -145,6 +147,142 @@ namespace { | |||
145 | } | 147 | } |
146 | 148 | ||
147 | // ############################################################################################# | 149 | // ############################################################################################# |
150 | |||
151 | // the implementation for linda:receive() and linda:receive_batched() | ||
152 | static int ReceiveInternal(lua_State* const L_, bool const batched_) | ||
153 | { | ||
154 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | ||
155 | |||
156 | auto const [_key_i, _until] = ProcessTimeoutArg(L_); | ||
157 | |||
158 | keeper_api_t _selected_keeper_receive{ nullptr }; | ||
159 | int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; | ||
160 | // are we in batched mode? | ||
161 | if (batched_) { | ||
162 | // make sure the keys are of a valid type | ||
163 | CheckKeyTypes(L_, _key_i, _key_i); | ||
164 | // receive multiple values from a single slot | ||
165 | _selected_keeper_receive = KEEPER_API(receive_batched); | ||
166 | // we expect a user-defined amount of return value | ||
167 | _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1); | ||
168 | if (_expected_pushed_min < 1) { | ||
169 | raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count"); | ||
170 | } | ||
171 | _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min); | ||
172 | // don't forget to count the slot in addition to the values | ||
173 | ++_expected_pushed_min; | ||
174 | ++_expected_pushed_max; | ||
175 | if (_expected_pushed_min > _expected_pushed_max) { | ||
176 | raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error"); | ||
177 | } | ||
178 | } else { | ||
179 | // make sure the keys are of a valid type | ||
180 | CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) }); | ||
181 | // receive a single value, checking multiple slots | ||
182 | _selected_keeper_receive = KEEPER_API(receive); | ||
183 | // we expect a single (value, slot) pair of returned values | ||
184 | _expected_pushed_min = _expected_pushed_max = 2; | ||
185 | } | ||
186 | |||
187 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
188 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
189 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
190 | if (_K == nullptr) | ||
191 | return 0; | ||
192 | |||
193 | CancelRequest _cancel{ CancelRequest::None }; | ||
194 | KeeperCallResult _pushed{}; | ||
195 | |||
196 | STACK_CHECK_START_REL(_K, 0); | ||
197 | for (bool _try_again{ true };;) { | ||
198 | if (_lane != nullptr) { | ||
199 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); | ||
200 | } | ||
201 | _cancel = (_cancel != CancelRequest::None) | ||
202 | ? _cancel | ||
203 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | ||
204 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
205 | if (!_try_again || _cancel != CancelRequest::None) { | ||
206 | _pushed.emplace(0); | ||
207 | break; | ||
208 | } | ||
209 | |||
210 | // all arguments of receive() but the first are passed to the keeper's receive function | ||
211 | STACK_CHECK(_K, 0); | ||
212 | _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); | ||
213 | if (!_pushed.has_value()) { | ||
214 | break; | ||
215 | } | ||
216 | if (_pushed.value() > 0) { | ||
217 | LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max); | ||
218 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { | ||
219 | raise_luaL_error(L_, "Key is restricted"); | ||
220 | } | ||
221 | _linda->readHappened.notify_all(); | ||
222 | break; | ||
223 | } | ||
224 | |||
225 | if (std::chrono::steady_clock::now() >= _until) { | ||
226 | break; /* instant timeout */ | ||
227 | } | ||
228 | |||
229 | // nothing received, wait until timeout or signalled that we should try again | ||
230 | { | ||
231 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
232 | if (_lane != nullptr) { | ||
233 | // change status of lane to "waiting" | ||
234 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
235 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
236 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
237 | _lane->waiting_on = &_linda->writeHappened; | ||
238 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
239 | } | ||
240 | // not enough data to read: wakeup when data was sent, or when timeout is reached | ||
241 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
242 | std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) }; | ||
243 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
244 | _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups | ||
245 | if (_lane != nullptr) { | ||
246 | _lane->waiting_on = nullptr; | ||
247 | _lane->status.store(_prev_status, std::memory_order_release); | ||
248 | } | ||
249 | } | ||
250 | } | ||
251 | STACK_CHECK(_K, 0); | ||
252 | |||
253 | if (!_pushed.has_value()) { | ||
254 | raise_luaL_error(L_, "tried to copy unsupported types"); | ||
255 | } | ||
256 | |||
257 | switch (_cancel) { | ||
258 | case CancelRequest::None: | ||
259 | { | ||
260 | int const _nbPushed{ _pushed.value() }; | ||
261 | if (_nbPushed == 0) { | ||
262 | // not enough data in the linda slot to fulfill the request, return nil, "timeout" | ||
263 | lua_pushnil(L_); | ||
264 | luaG_pushstring(L_, "timeout"); | ||
265 | return 2; | ||
266 | } | ||
267 | return _nbPushed; | ||
268 | } | ||
269 | |||
270 | case CancelRequest::Soft: | ||
271 | // if user wants to soft-cancel, the call returns nil, kCancelError | ||
272 | lua_pushnil(L_); | ||
273 | kCancelError.pushKey(L_); | ||
274 | return 2; | ||
275 | |||
276 | case CancelRequest::Hard: | ||
277 | // raise an error interrupting execution only in case of hard cancel | ||
278 | raise_cancel_error(L_); // raises an error and doesn't return | ||
279 | |||
280 | default: | ||
281 | raise_luaL_error(L_, "internal error: unknown cancel request"); | ||
282 | } | ||
283 | } | ||
284 | |||
285 | // ############################################################################################# | ||
148 | // ############################################################################################# | 286 | // ############################################################################################# |
149 | } // namespace | 287 | } // namespace |
150 | // ################################################################################################# | 288 | // ################################################################################################# |
@@ -650,153 +788,25 @@ LUAG_FUNC(linda_limit) | |||
650 | // ################################################################################################# | 788 | // ################################################################################################# |
651 | 789 | ||
652 | /* | 790 | /* |
653 | * 2 modes of operation | 791 | * [val, slot] = linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) |
654 | * [val, slot]= linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) | ||
655 | * Consumes a single value from the Linda, in any slot. | 792 | * Consumes a single value from the Linda, in any slot. |
656 | * Returns: received value (which is consumed from the slot), and the slot which had it | 793 | * Returns: received value (which is consumed from the slot), and the slot which had it |
657 | |||
658 | * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) | ||
659 | * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot. | ||
660 | * returns the actual consumed values, or nil if there weren't enough values to consume | ||
661 | */ | 794 | */ |
662 | LUAG_FUNC(linda_receive) | 795 | LUAG_FUNC(linda_receive) |
663 | { | 796 | { |
664 | static constexpr lua_CFunction _receive{ | 797 | return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, false); }); |
665 | +[](lua_State* const L_) { | 798 | } |
666 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | ||
667 | |||
668 | auto [_key_i, _until] = ProcessTimeoutArg(L_); | ||
669 | |||
670 | keeper_api_t _selected_keeper_receive{ nullptr }; | ||
671 | int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; | ||
672 | // are we in batched mode? | ||
673 | if (kLindaBatched.equals(L_, _key_i)) { | ||
674 | // no need to pass linda.batched in the keeper state | ||
675 | ++_key_i; | ||
676 | // make sure the keys are of a valid type | ||
677 | CheckKeyTypes(L_, _key_i, _key_i); | ||
678 | // receive multiple values from a single slot | ||
679 | _selected_keeper_receive = KEEPER_API(receive_batched); | ||
680 | // we expect a user-defined amount of return value | ||
681 | _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1); | ||
682 | if (_expected_pushed_min < 1) { | ||
683 | raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count"); | ||
684 | } | ||
685 | _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min); | ||
686 | // don't forget to count the slot in addition to the values | ||
687 | ++_expected_pushed_min; | ||
688 | ++_expected_pushed_max; | ||
689 | if (_expected_pushed_min > _expected_pushed_max) { | ||
690 | raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error"); | ||
691 | } | ||
692 | } else { | ||
693 | // make sure the keys are of a valid type | ||
694 | CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) }); | ||
695 | // receive a single value, checking multiple slots | ||
696 | _selected_keeper_receive = KEEPER_API(receive); | ||
697 | // we expect a single (value, slot) pair of returned values | ||
698 | _expected_pushed_min = _expected_pushed_max = 2; | ||
699 | } | ||
700 | |||
701 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
702 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
703 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
704 | if (_K == nullptr) | ||
705 | return 0; | ||
706 | |||
707 | CancelRequest _cancel{ CancelRequest::None }; | ||
708 | KeeperCallResult _pushed{}; | ||
709 | |||
710 | STACK_CHECK_START_REL(_K, 0); | ||
711 | for (bool _try_again{ true };;) { | ||
712 | if (_lane != nullptr) { | ||
713 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); | ||
714 | } | ||
715 | _cancel = (_cancel != CancelRequest::None) | ||
716 | ? _cancel | ||
717 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | ||
718 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
719 | if (!_try_again || _cancel != CancelRequest::None) { | ||
720 | _pushed.emplace(0); | ||
721 | break; | ||
722 | } | ||
723 | |||
724 | // all arguments of receive() but the first are passed to the keeper's receive function | ||
725 | STACK_CHECK(_K, 0); | ||
726 | _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); | ||
727 | if (!_pushed.has_value()) { | ||
728 | break; | ||
729 | } | ||
730 | if (_pushed.value() > 0) { | ||
731 | LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max); | ||
732 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { | ||
733 | raise_luaL_error(L_, "Key is restricted"); | ||
734 | } | ||
735 | _linda->readHappened.notify_all(); | ||
736 | break; | ||
737 | } | ||
738 | |||
739 | if (std::chrono::steady_clock::now() >= _until) { | ||
740 | break; /* instant timeout */ | ||
741 | } | ||
742 | |||
743 | // nothing received, wait until timeout or signalled that we should try again | ||
744 | { | ||
745 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
746 | if (_lane != nullptr) { | ||
747 | // change status of lane to "waiting" | ||
748 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
749 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
750 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
751 | _lane->waiting_on = &_linda->writeHappened; | ||
752 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
753 | } | ||
754 | // not enough data to read: wakeup when data was sent, or when timeout is reached | ||
755 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
756 | std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) }; | ||
757 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
758 | _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups | ||
759 | if (_lane != nullptr) { | ||
760 | _lane->waiting_on = nullptr; | ||
761 | _lane->status.store(_prev_status, std::memory_order_release); | ||
762 | } | ||
763 | } | ||
764 | } | ||
765 | STACK_CHECK(_K, 0); | ||
766 | |||
767 | if (!_pushed.has_value()) { | ||
768 | raise_luaL_error(L_, "tried to copy unsupported types"); | ||
769 | } | ||
770 | |||
771 | switch (_cancel) { | ||
772 | case CancelRequest::None: | ||
773 | { | ||
774 | int const _nbPushed{ _pushed.value() }; | ||
775 | if (_nbPushed == 0) { | ||
776 | // not enough data in the linda slot to fulfill the request, return nil, "timeout" | ||
777 | lua_pushnil(L_); | ||
778 | luaG_pushstring(L_, "timeout"); | ||
779 | return 2; | ||
780 | } | ||
781 | return _nbPushed; | ||
782 | } | ||
783 | |||
784 | case CancelRequest::Soft: | ||
785 | // if user wants to soft-cancel, the call returns nil, kCancelError | ||
786 | lua_pushnil(L_); | ||
787 | kCancelError.pushKey(L_); | ||
788 | return 2; | ||
789 | 799 | ||
790 | case CancelRequest::Hard: | 800 | // ################################################################################################# |
791 | // raise an error interrupting execution only in case of hard cancel | ||
792 | raise_cancel_error(L_); // raises an error and doesn't return | ||
793 | 801 | ||
794 | default: | 802 | /* |
795 | raise_luaL_error(L_, "internal error: unknown cancel request"); | 803 | * [val1, ... valCOUNT] = linda_receive_batched( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) |
796 | } | 804 | * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot. |
797 | } | 805 | * returns the actual consumed values, or nil if there weren't enough values to consume |
798 | }; | 806 | */ |
799 | return Linda::ProtectedCall(L_, _receive); | 807 | LUAG_FUNC(linda_receive_batched) |
808 | { | ||
809 | return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, true); }); | ||
800 | } | 810 | } |
801 | 811 | ||
802 | // ################################################################################################# | 812 | // ################################################################################################# |
@@ -1102,6 +1112,7 @@ namespace { | |||
1102 | { "get", LG_linda_get }, | 1112 | { "get", LG_linda_get }, |
1103 | { "limit", LG_linda_limit }, | 1113 | { "limit", LG_linda_limit }, |
1104 | { "receive", LG_linda_receive }, | 1114 | { "receive", LG_linda_receive }, |
1115 | { "receive_batched", LG_linda_receive_batched }, | ||
1105 | { "restrict", LG_linda_restrict }, | 1116 | { "restrict", LG_linda_restrict }, |
1106 | { "send", LG_linda_send }, | 1117 | { "send", LG_linda_send }, |
1107 | { "set", LG_linda_set }, | 1118 | { "set", LG_linda_set }, |
diff --git a/src/linda.hpp b/src/linda.hpp index 01ca7e1..2d5c9dc 100644 --- a/src/linda.hpp +++ b/src/linda.hpp | |||
@@ -8,11 +8,6 @@ struct Keeper; | |||
8 | 8 | ||
9 | // ################################################################################################# | 9 | // ################################################################################################# |
10 | 10 | ||
11 | // xxh64 of string "kLindaBatched" generated at https://www.pelock.com/products/hash-calculator | ||
12 | static constexpr UniqueKey kLindaBatched{ 0xB8234DF772646567ull, "linda.batched" }; | ||
13 | |||
14 | // ################################################################################################# | ||
15 | |||
16 | DECLARE_UNIQUE_TYPE(LindaGroup, int); | 11 | DECLARE_UNIQUE_TYPE(LindaGroup, int); |
17 | 12 | ||
18 | class Linda final | 13 | class Linda final |
diff --git a/src/lindafactory.cpp b/src/lindafactory.cpp index 6e3f759..42d0984 100644 --- a/src/lindafactory.cpp +++ b/src/lindafactory.cpp | |||
@@ -53,10 +53,6 @@ void LindaFactory::createMetatable(lua_State* L_) const | |||
53 | // the linda functions | 53 | // the linda functions |
54 | luaG_registerlibfuncs(L_, mLindaMT); | 54 | luaG_registerlibfuncs(L_, mLindaMT); |
55 | 55 | ||
56 | // some constants | ||
57 | kLindaBatched.pushKey(L_); // L_: mt kLindaBatched | ||
58 | lua_setfield(L_, -2, "batched"); // L_: mt | ||
59 | |||
60 | kNilSentinel.pushKey(L_); // L_: mt kNilSentinel | 56 | kNilSentinel.pushKey(L_); // L_: mt kNilSentinel |
61 | lua_setfield(L_, -2, "null"); // L_: mt | 57 | lua_setfield(L_, -2, "null"); // L_: mt |
62 | 58 | ||
diff --git a/tests/basic.lua b/tests/basic.lua index 170e821..a9c85cc 100644 --- a/tests/basic.lua +++ b/tests/basic.lua | |||
@@ -209,7 +209,7 @@ PRINT "wait_receive_lane is cancelled" | |||
209 | local wait_receive_batched = function() | 209 | local wait_receive_batched = function() |
210 | local k, v1, v2 | 210 | local k, v1, v2 |
211 | set_finalizer(function() print("wait_receive_batched", k, v1, v2) end) | 211 | set_finalizer(function() print("wait_receive_batched", k, v1, v2) end) |
212 | k, v1, v2 = limited:receive(limited.batched, "dummy", 2) -- infinite timeout, returns only when lane is cancelled | 212 | k, v1, v2 = limited:receive_batched("dummy", 2) -- infinite timeout, returns only when lane is cancelled |
213 | end | 213 | end |
214 | 214 | ||
215 | local wait_receive_batched_lane = lanes.gen("*", { name = 'auto' }, wait_receive_batched)() | 215 | local wait_receive_batched_lane = lanes.gen("*", { name = 'auto' }, wait_receive_batched)() |
@@ -273,7 +273,7 @@ local b,x,y,z,w = linda:get("<->", 4) | |||
273 | assert(b == 3 and x == "x" and y == "y" and z == "z" and w == nil) | 273 | assert(b == 3 and x == "x" and y == "y" and z == "z" and w == nil) |
274 | local k, x = linda:receive("<->") | 274 | local k, x = linda:receive("<->") |
275 | assert(k == "<->" and x == "x") | 275 | assert(k == "<->" and x == "x") |
276 | local k,y,z = linda:receive(linda.batched, "<->", 2) | 276 | local k,y,z = linda:receive_batched("<->", 2) |
277 | assert(k == "<->" and y == "y" and z == "z") | 277 | assert(k == "<->" and y == "y" and z == "z") |
278 | linda:set("<->") | 278 | linda:set("<->") |
279 | local b,x,y,z,w = linda:get("<->", 4) | 279 | local b,x,y,z,w = linda:get("<->", 4) |
diff --git a/tests/errhangtest.lua b/tests/errhangtest.lua index fff0dee..5b3f0c0 100644 --- a/tests/errhangtest.lua +++ b/tests/errhangtest.lua | |||
@@ -19,7 +19,6 @@ end | |||
19 | if true then | 19 | if true then |
20 | print "\n#### reserved sentinels" | 20 | print "\n#### reserved sentinels" |
21 | print(pcall(linda.set, linda, lanes.cancel_error)) | 21 | print(pcall(linda.set, linda, lanes.cancel_error)) |
22 | print(pcall(linda.set, linda, linda.batched)) | ||
23 | local _count, _val = linda:get("test") | 22 | local _count, _val = linda:get("test") |
24 | assert(_count == 0 and _val == nil) | 23 | assert(_count == 0 and _val == nil) |
25 | print "OK" | 24 | print "OK" |
@@ -28,13 +27,13 @@ end | |||
28 | -- get/set a few values | 27 | -- get/set a few values |
29 | if true then | 28 | if true then |
30 | print "\n#### set 3 -> receive batched" | 29 | print "\n#### set 3 -> receive batched" |
31 | assert.fails(function() linda:receive(linda.batched, "some key", -1, 1) end) | 30 | assert.fails(function() linda:receive_batched("some key", -1, 1) end) |
32 | assert.fails(function() linda:receive(linda.batched, "some key", 2, 1) end) | 31 | assert.fails(function() linda:receive_batched("some key", 2, 1) end) |
33 | assert.failsnot(function() linda:receive(0, linda.batched, "some key", 1, 3) end) | 32 | assert.failsnot(function() linda:receive_batched(0, "some key", 1, 3) end) |
34 | local fun = function() print "function test ok" end | 33 | local fun = function() print "function test ok" end |
35 | print(pcall(linda.set, linda, 'test', true, nil, fun)) | 34 | print(pcall(linda.set, linda, 'test', true, nil, fun)) |
36 | -- read back the contents | 35 | -- read back the contents |
37 | local k,b,n,f = linda:receive(linda.batched, 'test', 3) | 36 | local k,b,n,f = linda:receive_batched('test', 3) |
38 | local _count, _val = linda:get("test") | 37 | local _count, _val = linda:get("test") |
39 | assert(_count == 0 and _val == nil) | 38 | assert(_count == 0 and _val == nil) |
40 | -- check they are ok | 39 | -- check they are ok |
diff --git a/tests/linda_perf.lua b/tests/linda_perf.lua index bba1408..107fd25 100644 --- a/tests/linda_perf.lua +++ b/tests/linda_perf.lua | |||
@@ -68,7 +68,7 @@ local gobbler = function( l, loop, batch) | |||
68 | l:receive( "go") | 68 | l:receive( "go") |
69 | -- eat data in batches | 69 | -- eat data in batches |
70 | for i = 1, loop/batch do | 70 | for i = 1, loop/batch do |
71 | l:receive( l.batched, "key", batch) | 71 | l:receive_batched("key", batch) |
72 | -- print("gobbler:", batch) | 72 | -- print("gobbler:", batch) |
73 | end | 73 | end |
74 | print "loop is over" | 74 | print "loop is over" |
@@ -168,9 +168,9 @@ local function ziva2( preloop, loop, batch) | |||
168 | local l = lanes.linda("ziva2("..preloop..":"..loop..":"..tostring(batch)..")", group_uid) | 168 | local l = lanes.linda("ziva2("..preloop..":"..loop..":"..tostring(batch)..")", group_uid) |
169 | group_uid = (group_uid % config.nb_user_keepers) + 1 | 169 | group_uid = (group_uid % config.nb_user_keepers) + 1 |
170 | -- prefill the linda a bit to increase fifo stress | 170 | -- prefill the linda a bit to increase fifo stress |
171 | local top, step = math.max( preloop, loop), (l.batched and batch) and batch or 1 | 171 | local top, step = math.max( preloop, loop), batch or 1 |
172 | local batch_send, batch_read | 172 | local batch_send, batch_read |
173 | if l.batched and batch then | 173 | if batch then |
174 | local batch_values = {} | 174 | local batch_values = {} |
175 | for i = 1, batch do | 175 | for i = 1, batch do |
176 | table.insert( batch_values, i) | 176 | table.insert( batch_values, i) |
@@ -180,7 +180,7 @@ local function ziva2( preloop, loop, batch) | |||
180 | l:send( "key", table_unpack( batch_values)) | 180 | l:send( "key", table_unpack( batch_values)) |
181 | end | 181 | end |
182 | batch_read = function() | 182 | batch_read = function() |
183 | l:receive( l.batched, "key", batch) | 183 | l:receive_batched("key", batch) |
184 | end | 184 | end |
185 | else -- not batched | 185 | else -- not batched |
186 | batch_send = function() | 186 | batch_send = function() |
diff --git a/tests/protect_allocator.lua b/tests/protect_allocator.lua index e13a57c..325726a 100644 --- a/tests/protect_allocator.lua +++ b/tests/protect_allocator.lua | |||
@@ -52,7 +52,7 @@ end | |||
52 | 52 | ||
53 | -- wait for completion | 53 | -- wait for completion |
54 | print "wait for completion" | 54 | print "wait for completion" |
55 | linda:receive( linda.batched, "key", COUNT) | 55 | linda:receive_batched("key", COUNT) |
56 | print "waiting a bit more ..." | 56 | print "waiting a bit more ..." |
57 | SLEEP(1) | 57 | SLEEP(1) |
58 | print "SUCCESS" | 58 | print "SUCCESS" |
diff --git a/unit_tests/linda_tests.cpp b/unit_tests/linda_tests.cpp index 2308737..f2934eb 100644 --- a/unit_tests/linda_tests.cpp +++ b/unit_tests/linda_tests.cpp | |||
@@ -106,7 +106,6 @@ TEST_CASE("linda.single Keeper") | |||
106 | S.requireFailure("lanes.linda():send(0, io.stdin, 'v')"); | 106 | S.requireFailure("lanes.linda():send(0, io.stdin, 'v')"); |
107 | S.requireFailure("lanes.linda():send(0, lanes.null, 'v')"); | 107 | S.requireFailure("lanes.linda():send(0, lanes.null, 'v')"); |
108 | S.requireFailure("lanes.linda():send(0, lanes.cancel_error, 'v')"); | 108 | S.requireFailure("lanes.linda():send(0, lanes.cancel_error, 'v')"); |
109 | S.requireFailure("local l = lanes.linda(); l:send(0, l.batched, 'v')"); | ||
110 | } | 109 | } |
111 | 110 | ||
112 | // ----------------------------------------------------------------------------------------- | 111 | // ----------------------------------------------------------------------------------------- |
diff --git a/unit_tests/scripts/lane/tasking_cancelling.lua b/unit_tests/scripts/lane/tasking_cancelling.lua index 8bee3a6..ea4516e 100644 --- a/unit_tests/scripts/lane/tasking_cancelling.lua +++ b/unit_tests/scripts/lane/tasking_cancelling.lua | |||
@@ -61,7 +61,7 @@ PRINT "wait_receive_lane is cancelled" | |||
61 | local wait_receive_batched = function() | 61 | local wait_receive_batched = function() |
62 | local k, v1, v2 | 62 | local k, v1, v2 |
63 | set_finalizer(function() print("wait_receive_batched", k, v1, v2) end) | 63 | set_finalizer(function() print("wait_receive_batched", k, v1, v2) end) |
64 | k, v1, v2 = limited:receive(limited.batched, "dummy", 2) -- infinite timeout, returns only when lane is cancelled | 64 | k, v1, v2 = limited:receive_batched("dummy", 2) -- infinite timeout, returns only when lane is cancelled |
65 | end | 65 | end |
66 | 66 | ||
67 | local wait_receive_batched_lane = lanes_gen("*", { name = 'auto' }, wait_receive_batched)() | 67 | local wait_receive_batched_lane = lanes_gen("*", { name = 'auto' }, wait_receive_batched)() |
diff --git a/unit_tests/scripts/lane/tasking_communications.lua b/unit_tests/scripts/lane/tasking_communications.lua index 1fd43b0..631d105 100644 --- a/unit_tests/scripts/lane/tasking_communications.lua +++ b/unit_tests/scripts/lane/tasking_communications.lua | |||
@@ -90,7 +90,7 @@ local b,x,y,z,w = linda:get("<->", 4) | |||
90 | assert(b == 3 and x == "x" and y == "y" and z == "z" and w == nil) | 90 | assert(b == 3 and x == "x" and y == "y" and z == "z" and w == nil) |
91 | local k, x = linda:receive("<->") | 91 | local k, x = linda:receive("<->") |
92 | assert(k == "<->" and x == "x") | 92 | assert(k == "<->" and x == "x") |
93 | local k,y,z = linda:receive(linda.batched, "<->", 2) | 93 | local k,y,z = linda:receive_batched("<->", 2) |
94 | assert(k == "<->" and y == "y" and z == "z") | 94 | assert(k == "<->" and y == "y" and z == "z") |
95 | linda:set("<->") | 95 | linda:set("<->") |
96 | local b,x,y,z,w = linda:get("<->", 4) | 96 | local b,x,y,z,w = linda:get("<->", 4) |