diff options
Diffstat (limited to '')
-rw-r--r-- | src/linda.cpp | 293 |
1 files changed, 152 insertions, 141 deletions
diff --git a/src/linda.cpp b/src/linda.cpp index 1119d71..0cdacfa 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -62,7 +62,7 @@ namespace { | |||
62 | 62 | ||
63 | case LuaType::LIGHTUSERDATA: | 63 | case LuaType::LIGHTUSERDATA: |
64 | { | 64 | { |
65 | static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; | 65 | static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel }; |
66 | for (UniqueKey const& _key : kKeysToCheck) { | 66 | for (UniqueKey const& _key : kKeysToCheck) { |
67 | if (_key.equals(L_, _i)) { | 67 | if (_key.equals(L_, _i)) { |
68 | raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); | 68 | raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); |
@@ -123,6 +123,8 @@ namespace { | |||
123 | return 0; | 123 | return 0; |
124 | } | 124 | } |
125 | 125 | ||
126 | // ############################################################################################# | ||
127 | |||
126 | // a helper to process the timeout argument of linda:send() and linda:receive() | 128 | // a helper to process the timeout argument of linda:send() and linda:receive() |
127 | [[nodiscard]] | 129 | [[nodiscard]] |
128 | static auto ProcessTimeoutArg(lua_State* const L_) | 130 | static auto ProcessTimeoutArg(lua_State* const L_) |
@@ -145,6 +147,142 @@ namespace { | |||
145 | } | 147 | } |
146 | 148 | ||
147 | // ############################################################################################# | 149 | // ############################################################################################# |
150 | |||
151 | // the implementation for linda:receive() and linda:receive_batched() | ||
152 | static int ReceiveInternal(lua_State* const L_, bool const batched_) | ||
153 | { | ||
154 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | ||
155 | |||
156 | auto const [_key_i, _until] = ProcessTimeoutArg(L_); | ||
157 | |||
158 | keeper_api_t _selected_keeper_receive{ nullptr }; | ||
159 | int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; | ||
160 | // are we in batched mode? | ||
161 | if (batched_) { | ||
162 | // make sure the keys are of a valid type | ||
163 | CheckKeyTypes(L_, _key_i, _key_i); | ||
164 | // receive multiple values from a single slot | ||
165 | _selected_keeper_receive = KEEPER_API(receive_batched); | ||
166 | // we expect a user-defined amount of return value | ||
167 | _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1); | ||
168 | if (_expected_pushed_min < 1) { | ||
169 | raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count"); | ||
170 | } | ||
171 | _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min); | ||
172 | // don't forget to count the slot in addition to the values | ||
173 | ++_expected_pushed_min; | ||
174 | ++_expected_pushed_max; | ||
175 | if (_expected_pushed_min > _expected_pushed_max) { | ||
176 | raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error"); | ||
177 | } | ||
178 | } else { | ||
179 | // make sure the keys are of a valid type | ||
180 | CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) }); | ||
181 | // receive a single value, checking multiple slots | ||
182 | _selected_keeper_receive = KEEPER_API(receive); | ||
183 | // we expect a single (value, slot) pair of returned values | ||
184 | _expected_pushed_min = _expected_pushed_max = 2; | ||
185 | } | ||
186 | |||
187 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
188 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
189 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
190 | if (_K == nullptr) | ||
191 | return 0; | ||
192 | |||
193 | CancelRequest _cancel{ CancelRequest::None }; | ||
194 | KeeperCallResult _pushed{}; | ||
195 | |||
196 | STACK_CHECK_START_REL(_K, 0); | ||
197 | for (bool _try_again{ true };;) { | ||
198 | if (_lane != nullptr) { | ||
199 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); | ||
200 | } | ||
201 | _cancel = (_cancel != CancelRequest::None) | ||
202 | ? _cancel | ||
203 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | ||
204 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
205 | if (!_try_again || _cancel != CancelRequest::None) { | ||
206 | _pushed.emplace(0); | ||
207 | break; | ||
208 | } | ||
209 | |||
210 | // all arguments of receive() but the first are passed to the keeper's receive function | ||
211 | STACK_CHECK(_K, 0); | ||
212 | _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); | ||
213 | if (!_pushed.has_value()) { | ||
214 | break; | ||
215 | } | ||
216 | if (_pushed.value() > 0) { | ||
217 | LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max); | ||
218 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { | ||
219 | raise_luaL_error(L_, "Key is restricted"); | ||
220 | } | ||
221 | _linda->readHappened.notify_all(); | ||
222 | break; | ||
223 | } | ||
224 | |||
225 | if (std::chrono::steady_clock::now() >= _until) { | ||
226 | break; /* instant timeout */ | ||
227 | } | ||
228 | |||
229 | // nothing received, wait until timeout or signalled that we should try again | ||
230 | { | ||
231 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
232 | if (_lane != nullptr) { | ||
233 | // change status of lane to "waiting" | ||
234 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
235 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
236 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
237 | _lane->waiting_on = &_linda->writeHappened; | ||
238 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
239 | } | ||
240 | // not enough data to read: wakeup when data was sent, or when timeout is reached | ||
241 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
242 | std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) }; | ||
243 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
244 | _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups | ||
245 | if (_lane != nullptr) { | ||
246 | _lane->waiting_on = nullptr; | ||
247 | _lane->status.store(_prev_status, std::memory_order_release); | ||
248 | } | ||
249 | } | ||
250 | } | ||
251 | STACK_CHECK(_K, 0); | ||
252 | |||
253 | if (!_pushed.has_value()) { | ||
254 | raise_luaL_error(L_, "tried to copy unsupported types"); | ||
255 | } | ||
256 | |||
257 | switch (_cancel) { | ||
258 | case CancelRequest::None: | ||
259 | { | ||
260 | int const _nbPushed{ _pushed.value() }; | ||
261 | if (_nbPushed == 0) { | ||
262 | // not enough data in the linda slot to fulfill the request, return nil, "timeout" | ||
263 | lua_pushnil(L_); | ||
264 | luaG_pushstring(L_, "timeout"); | ||
265 | return 2; | ||
266 | } | ||
267 | return _nbPushed; | ||
268 | } | ||
269 | |||
270 | case CancelRequest::Soft: | ||
271 | // if user wants to soft-cancel, the call returns nil, kCancelError | ||
272 | lua_pushnil(L_); | ||
273 | kCancelError.pushKey(L_); | ||
274 | return 2; | ||
275 | |||
276 | case CancelRequest::Hard: | ||
277 | // raise an error interrupting execution only in case of hard cancel | ||
278 | raise_cancel_error(L_); // raises an error and doesn't return | ||
279 | |||
280 | default: | ||
281 | raise_luaL_error(L_, "internal error: unknown cancel request"); | ||
282 | } | ||
283 | } | ||
284 | |||
285 | // ############################################################################################# | ||
148 | // ############################################################################################# | 286 | // ############################################################################################# |
149 | } // namespace | 287 | } // namespace |
150 | // ################################################################################################# | 288 | // ################################################################################################# |
@@ -650,153 +788,25 @@ LUAG_FUNC(linda_limit) | |||
650 | // ################################################################################################# | 788 | // ################################################################################################# |
651 | 789 | ||
652 | /* | 790 | /* |
653 | * 2 modes of operation | 791 | * [val, slot] = linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) |
654 | * [val, slot]= linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) | ||
655 | * Consumes a single value from the Linda, in any slot. | 792 | * Consumes a single value from the Linda, in any slot. |
656 | * Returns: received value (which is consumed from the slot), and the slot which had it | 793 | * Returns: received value (which is consumed from the slot), and the slot which had it |
657 | |||
658 | * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) | ||
659 | * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot. | ||
660 | * returns the actual consumed values, or nil if there weren't enough values to consume | ||
661 | */ | 794 | */ |
662 | LUAG_FUNC(linda_receive) | 795 | LUAG_FUNC(linda_receive) |
663 | { | 796 | { |
664 | static constexpr lua_CFunction _receive{ | 797 | return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, false); }); |
665 | +[](lua_State* const L_) { | 798 | } |
666 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | ||
667 | |||
668 | auto [_key_i, _until] = ProcessTimeoutArg(L_); | ||
669 | |||
670 | keeper_api_t _selected_keeper_receive{ nullptr }; | ||
671 | int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; | ||
672 | // are we in batched mode? | ||
673 | if (kLindaBatched.equals(L_, _key_i)) { | ||
674 | // no need to pass linda.batched in the keeper state | ||
675 | ++_key_i; | ||
676 | // make sure the keys are of a valid type | ||
677 | CheckKeyTypes(L_, _key_i, _key_i); | ||
678 | // receive multiple values from a single slot | ||
679 | _selected_keeper_receive = KEEPER_API(receive_batched); | ||
680 | // we expect a user-defined amount of return value | ||
681 | _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1); | ||
682 | if (_expected_pushed_min < 1) { | ||
683 | raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count"); | ||
684 | } | ||
685 | _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min); | ||
686 | // don't forget to count the slot in addition to the values | ||
687 | ++_expected_pushed_min; | ||
688 | ++_expected_pushed_max; | ||
689 | if (_expected_pushed_min > _expected_pushed_max) { | ||
690 | raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error"); | ||
691 | } | ||
692 | } else { | ||
693 | // make sure the keys are of a valid type | ||
694 | CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) }); | ||
695 | // receive a single value, checking multiple slots | ||
696 | _selected_keeper_receive = KEEPER_API(receive); | ||
697 | // we expect a single (value, slot) pair of returned values | ||
698 | _expected_pushed_min = _expected_pushed_max = 2; | ||
699 | } | ||
700 | |||
701 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
702 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
703 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
704 | if (_K == nullptr) | ||
705 | return 0; | ||
706 | |||
707 | CancelRequest _cancel{ CancelRequest::None }; | ||
708 | KeeperCallResult _pushed{}; | ||
709 | |||
710 | STACK_CHECK_START_REL(_K, 0); | ||
711 | for (bool _try_again{ true };;) { | ||
712 | if (_lane != nullptr) { | ||
713 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); | ||
714 | } | ||
715 | _cancel = (_cancel != CancelRequest::None) | ||
716 | ? _cancel | ||
717 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | ||
718 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
719 | if (!_try_again || _cancel != CancelRequest::None) { | ||
720 | _pushed.emplace(0); | ||
721 | break; | ||
722 | } | ||
723 | |||
724 | // all arguments of receive() but the first are passed to the keeper's receive function | ||
725 | STACK_CHECK(_K, 0); | ||
726 | _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); | ||
727 | if (!_pushed.has_value()) { | ||
728 | break; | ||
729 | } | ||
730 | if (_pushed.value() > 0) { | ||
731 | LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max); | ||
732 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { | ||
733 | raise_luaL_error(L_, "Key is restricted"); | ||
734 | } | ||
735 | _linda->readHappened.notify_all(); | ||
736 | break; | ||
737 | } | ||
738 | |||
739 | if (std::chrono::steady_clock::now() >= _until) { | ||
740 | break; /* instant timeout */ | ||
741 | } | ||
742 | |||
743 | // nothing received, wait until timeout or signalled that we should try again | ||
744 | { | ||
745 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
746 | if (_lane != nullptr) { | ||
747 | // change status of lane to "waiting" | ||
748 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
749 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
750 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
751 | _lane->waiting_on = &_linda->writeHappened; | ||
752 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
753 | } | ||
754 | // not enough data to read: wakeup when data was sent, or when timeout is reached | ||
755 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
756 | std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) }; | ||
757 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
758 | _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups | ||
759 | if (_lane != nullptr) { | ||
760 | _lane->waiting_on = nullptr; | ||
761 | _lane->status.store(_prev_status, std::memory_order_release); | ||
762 | } | ||
763 | } | ||
764 | } | ||
765 | STACK_CHECK(_K, 0); | ||
766 | |||
767 | if (!_pushed.has_value()) { | ||
768 | raise_luaL_error(L_, "tried to copy unsupported types"); | ||
769 | } | ||
770 | |||
771 | switch (_cancel) { | ||
772 | case CancelRequest::None: | ||
773 | { | ||
774 | int const _nbPushed{ _pushed.value() }; | ||
775 | if (_nbPushed == 0) { | ||
776 | // not enough data in the linda slot to fulfill the request, return nil, "timeout" | ||
777 | lua_pushnil(L_); | ||
778 | luaG_pushstring(L_, "timeout"); | ||
779 | return 2; | ||
780 | } | ||
781 | return _nbPushed; | ||
782 | } | ||
783 | |||
784 | case CancelRequest::Soft: | ||
785 | // if user wants to soft-cancel, the call returns nil, kCancelError | ||
786 | lua_pushnil(L_); | ||
787 | kCancelError.pushKey(L_); | ||
788 | return 2; | ||
789 | 799 | ||
790 | case CancelRequest::Hard: | 800 | // ################################################################################################# |
791 | // raise an error interrupting execution only in case of hard cancel | ||
792 | raise_cancel_error(L_); // raises an error and doesn't return | ||
793 | 801 | ||
794 | default: | 802 | /* |
795 | raise_luaL_error(L_, "internal error: unknown cancel request"); | 803 | * [val1, ... valCOUNT] = linda_receive_batched( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) |
796 | } | 804 | * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot. |
797 | } | 805 | * returns the actual consumed values, or nil if there weren't enough values to consume |
798 | }; | 806 | */ |
799 | return Linda::ProtectedCall(L_, _receive); | 807 | LUAG_FUNC(linda_receive_batched) |
808 | { | ||
809 | return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, true); }); | ||
800 | } | 810 | } |
801 | 811 | ||
802 | // ################################################################################################# | 812 | // ################################################################################################# |
@@ -1102,6 +1112,7 @@ namespace { | |||
1102 | { "get", LG_linda_get }, | 1112 | { "get", LG_linda_get }, |
1103 | { "limit", LG_linda_limit }, | 1113 | { "limit", LG_linda_limit }, |
1104 | { "receive", LG_linda_receive }, | 1114 | { "receive", LG_linda_receive }, |
1115 | { "receive_batched", LG_linda_receive_batched }, | ||
1105 | { "restrict", LG_linda_restrict }, | 1116 | { "restrict", LG_linda_restrict }, |
1106 | { "send", LG_linda_send }, | 1117 | { "send", LG_linda_send }, |
1107 | { "set", LG_linda_set }, | 1118 | { "set", LG_linda_set }, |