aboutsummaryrefslogtreecommitdiff
path: root/src/linda.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/linda.cpp293
1 files changed, 152 insertions, 141 deletions
diff --git a/src/linda.cpp b/src/linda.cpp
index 1119d71..0cdacfa 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -62,7 +62,7 @@ namespace {
62 62
63 case LuaType::LIGHTUSERDATA: 63 case LuaType::LIGHTUSERDATA:
64 { 64 {
65 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; 65 static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel };
66 for (UniqueKey const& _key : kKeysToCheck) { 66 for (UniqueKey const& _key : kKeysToCheck) {
67 if (_key.equals(L_, _i)) { 67 if (_key.equals(L_, _i)) {
68 raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); 68 raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data());
@@ -123,6 +123,8 @@ namespace {
123 return 0; 123 return 0;
124 } 124 }
125 125
126 // #############################################################################################
127
126 // a helper to process the timeout argument of linda:send() and linda:receive() 128 // a helper to process the timeout argument of linda:send() and linda:receive()
127 [[nodiscard]] 129 [[nodiscard]]
128 static auto ProcessTimeoutArg(lua_State* const L_) 130 static auto ProcessTimeoutArg(lua_State* const L_)
@@ -145,6 +147,142 @@ namespace {
145 } 147 }
146 148
147 // ############################################################################################# 149 // #############################################################################################
150
151 // the implementation for linda:receive() and linda:receive_batched()
152 static int ReceiveInternal(lua_State* const L_, bool const batched_)
153 {
154 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
155
156 auto const [_key_i, _until] = ProcessTimeoutArg(L_);
157
158 keeper_api_t _selected_keeper_receive{ nullptr };
159 int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 };
160 // are we in batched mode?
161 if (batched_) {
162 // make sure the keys are of a valid type
163 CheckKeyTypes(L_, _key_i, _key_i);
164 // receive multiple values from a single slot
165 _selected_keeper_receive = KEEPER_API(receive_batched);
166 // we expect a user-defined amount of return value
167 _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1);
168 if (_expected_pushed_min < 1) {
169 raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count");
170 }
171 _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min);
172 // don't forget to count the slot in addition to the values
173 ++_expected_pushed_min;
174 ++_expected_pushed_max;
175 if (_expected_pushed_min > _expected_pushed_max) {
176 raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error");
177 }
178 } else {
179 // make sure the keys are of a valid type
180 CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) });
181 // receive a single value, checking multiple slots
182 _selected_keeper_receive = KEEPER_API(receive);
183 // we expect a single (value, slot) pair of returned values
184 _expected_pushed_min = _expected_pushed_max = 2;
185 }
186
187 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
188 Keeper* const _keeper{ _linda->whichKeeper() };
189 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
190 if (_K == nullptr)
191 return 0;
192
193 CancelRequest _cancel{ CancelRequest::None };
194 KeeperCallResult _pushed{};
195
196 STACK_CHECK_START_REL(_K, 0);
197 for (bool _try_again{ true };;) {
198 if (_lane != nullptr) {
199 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed);
200 }
201 _cancel = (_cancel != CancelRequest::None)
202 ? _cancel
203 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
204 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
205 if (!_try_again || _cancel != CancelRequest::None) {
206 _pushed.emplace(0);
207 break;
208 }
209
210 // all arguments of receive() but the first are passed to the keeper's receive function
211 STACK_CHECK(_K, 0);
212 _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i);
213 if (!_pushed.has_value()) {
214 break;
215 }
216 if (_pushed.value() > 0) {
217 LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max);
218 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) {
219 raise_luaL_error(L_, "Key is restricted");
220 }
221 _linda->readHappened.notify_all();
222 break;
223 }
224
225 if (std::chrono::steady_clock::now() >= _until) {
226 break; /* instant timeout */
227 }
228
229 // nothing received, wait until timeout or signalled that we should try again
230 {
231 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
232 if (_lane != nullptr) {
233 // change status of lane to "waiting"
234 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
235 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
236 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
237 _lane->waiting_on = &_linda->writeHappened;
238 _lane->status.store(Lane::Waiting, std::memory_order_release);
239 }
240 // not enough data to read: wakeup when data was sent, or when timeout is reached
241 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
242 std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) };
243 _guard.release(); // we don't want to unlock the mutex on exit!
244 _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups
245 if (_lane != nullptr) {
246 _lane->waiting_on = nullptr;
247 _lane->status.store(_prev_status, std::memory_order_release);
248 }
249 }
250 }
251 STACK_CHECK(_K, 0);
252
253 if (!_pushed.has_value()) {
254 raise_luaL_error(L_, "tried to copy unsupported types");
255 }
256
257 switch (_cancel) {
258 case CancelRequest::None:
259 {
260 int const _nbPushed{ _pushed.value() };
261 if (_nbPushed == 0) {
262 // not enough data in the linda slot to fulfill the request, return nil, "timeout"
263 lua_pushnil(L_);
264 luaG_pushstring(L_, "timeout");
265 return 2;
266 }
267 return _nbPushed;
268 }
269
270 case CancelRequest::Soft:
271 // if user wants to soft-cancel, the call returns nil, kCancelError
272 lua_pushnil(L_);
273 kCancelError.pushKey(L_);
274 return 2;
275
276 case CancelRequest::Hard:
277 // raise an error interrupting execution only in case of hard cancel
278 raise_cancel_error(L_); // raises an error and doesn't return
279
280 default:
281 raise_luaL_error(L_, "internal error: unknown cancel request");
282 }
283 }
284
285 // #############################################################################################
148 // ############################################################################################# 286 // #############################################################################################
149} // namespace 287} // namespace
150// ################################################################################################# 288// #################################################################################################
@@ -650,153 +788,25 @@ LUAG_FUNC(linda_limit)
650// ################################################################################################# 788// #################################################################################################
651 789
652/* 790/*
653 * 2 modes of operation 791 * [val, slot] = linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] )
654 * [val, slot]= linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] )
655 * Consumes a single value from the Linda, in any slot. 792 * Consumes a single value from the Linda, in any slot.
656 * Returns: received value (which is consumed from the slot), and the slot which had it 793 * Returns: received value (which is consumed from the slot), and the slot which had it
657
658 * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
659 * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot.
660 * returns the actual consumed values, or nil if there weren't enough values to consume
661 */ 794 */
662LUAG_FUNC(linda_receive) 795LUAG_FUNC(linda_receive)
663{ 796{
664 static constexpr lua_CFunction _receive{ 797 return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, false); });
665 +[](lua_State* const L_) { 798}
666 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
667
668 auto [_key_i, _until] = ProcessTimeoutArg(L_);
669
670 keeper_api_t _selected_keeper_receive{ nullptr };
671 int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 };
672 // are we in batched mode?
673 if (kLindaBatched.equals(L_, _key_i)) {
674 // no need to pass linda.batched in the keeper state
675 ++_key_i;
676 // make sure the keys are of a valid type
677 CheckKeyTypes(L_, _key_i, _key_i);
678 // receive multiple values from a single slot
679 _selected_keeper_receive = KEEPER_API(receive_batched);
680 // we expect a user-defined amount of return value
681 _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1);
682 if (_expected_pushed_min < 1) {
683 raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count");
684 }
685 _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min);
686 // don't forget to count the slot in addition to the values
687 ++_expected_pushed_min;
688 ++_expected_pushed_max;
689 if (_expected_pushed_min > _expected_pushed_max) {
690 raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error");
691 }
692 } else {
693 // make sure the keys are of a valid type
694 CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) });
695 // receive a single value, checking multiple slots
696 _selected_keeper_receive = KEEPER_API(receive);
697 // we expect a single (value, slot) pair of returned values
698 _expected_pushed_min = _expected_pushed_max = 2;
699 }
700
701 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
702 Keeper* const _keeper{ _linda->whichKeeper() };
703 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
704 if (_K == nullptr)
705 return 0;
706
707 CancelRequest _cancel{ CancelRequest::None };
708 KeeperCallResult _pushed{};
709
710 STACK_CHECK_START_REL(_K, 0);
711 for (bool _try_again{ true };;) {
712 if (_lane != nullptr) {
713 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed);
714 }
715 _cancel = (_cancel != CancelRequest::None)
716 ? _cancel
717 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
718 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
719 if (!_try_again || _cancel != CancelRequest::None) {
720 _pushed.emplace(0);
721 break;
722 }
723
724 // all arguments of receive() but the first are passed to the keeper's receive function
725 STACK_CHECK(_K, 0);
726 _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i);
727 if (!_pushed.has_value()) {
728 break;
729 }
730 if (_pushed.value() > 0) {
731 LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max);
732 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) {
733 raise_luaL_error(L_, "Key is restricted");
734 }
735 _linda->readHappened.notify_all();
736 break;
737 }
738
739 if (std::chrono::steady_clock::now() >= _until) {
740 break; /* instant timeout */
741 }
742
743 // nothing received, wait until timeout or signalled that we should try again
744 {
745 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
746 if (_lane != nullptr) {
747 // change status of lane to "waiting"
748 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
749 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
750 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
751 _lane->waiting_on = &_linda->writeHappened;
752 _lane->status.store(Lane::Waiting, std::memory_order_release);
753 }
754 // not enough data to read: wakeup when data was sent, or when timeout is reached
755 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
756 std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) };
757 _guard.release(); // we don't want to unlock the mutex on exit!
758 _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups
759 if (_lane != nullptr) {
760 _lane->waiting_on = nullptr;
761 _lane->status.store(_prev_status, std::memory_order_release);
762 }
763 }
764 }
765 STACK_CHECK(_K, 0);
766
767 if (!_pushed.has_value()) {
768 raise_luaL_error(L_, "tried to copy unsupported types");
769 }
770
771 switch (_cancel) {
772 case CancelRequest::None:
773 {
774 int const _nbPushed{ _pushed.value() };
775 if (_nbPushed == 0) {
776 // not enough data in the linda slot to fulfill the request, return nil, "timeout"
777 lua_pushnil(L_);
778 luaG_pushstring(L_, "timeout");
779 return 2;
780 }
781 return _nbPushed;
782 }
783
784 case CancelRequest::Soft:
785 // if user wants to soft-cancel, the call returns nil, kCancelError
786 lua_pushnil(L_);
787 kCancelError.pushKey(L_);
788 return 2;
789 799
790 case CancelRequest::Hard: 800// #################################################################################################
791 // raise an error interrupting execution only in case of hard cancel
792 raise_cancel_error(L_); // raises an error and doesn't return
793 801
794 default: 802/*
795 raise_luaL_error(L_, "internal error: unknown cancel request"); 803 * [val1, ... valCOUNT] = linda_receive_batched( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
796 } 804 * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot.
797 } 805 * returns the actual consumed values, or nil if there weren't enough values to consume
798 }; 806 */
799 return Linda::ProtectedCall(L_, _receive); 807LUAG_FUNC(linda_receive_batched)
808{
809 return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, true); });
800} 810}
801 811
802// ################################################################################################# 812// #################################################################################################
@@ -1102,6 +1112,7 @@ namespace {
1102 { "get", LG_linda_get }, 1112 { "get", LG_linda_get },
1103 { "limit", LG_linda_limit }, 1113 { "limit", LG_linda_limit },
1104 { "receive", LG_linda_receive }, 1114 { "receive", LG_linda_receive },
1115 { "receive_batched", LG_linda_receive_batched },
1105 { "restrict", LG_linda_restrict }, 1116 { "restrict", LG_linda_restrict },
1106 { "send", LG_linda_send }, 1117 { "send", LG_linda_send },
1107 { "set", LG_linda_set }, 1118 { "set", LG_linda_set },