diff options
Diffstat (limited to 'src/linda.cpp')
-rw-r--r-- | src/linda.cpp | 640 |
1 files changed, 322 insertions, 318 deletions
diff --git a/src/linda.cpp b/src/linda.cpp index a094a8f..1f4b19d 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -43,12 +43,11 @@ namespace { | |||
43 | // ############################################################################################# | 43 | // ############################################################################################# |
44 | // ############################################################################################# | 44 | // ############################################################################################# |
45 | 45 | ||
46 | |||
47 | static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_) | 46 | static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_) |
48 | { | 47 | { |
49 | STACK_CHECK_START_REL(L_, 0); | 48 | STACK_CHECK_START_REL(L_, 0); |
50 | for (StackIndex const _i : std::ranges::iota_view{ start_, StackIndex{ end_ + 1 } }) { | 49 | for (StackIndex const _i : std::ranges::iota_view{ start_, StackIndex{ end_ + 1 } }) { |
51 | LuaType const _t{ luaG_type(L_, _i) }; | 50 | LuaType const _t{ luaW_type(L_, _i) }; |
52 | switch (_t) { | 51 | switch (_t) { |
53 | case LuaType::BOOLEAN: | 52 | case LuaType::BOOLEAN: |
54 | case LuaType::NUMBER: | 53 | case LuaType::NUMBER: |
@@ -63,7 +62,7 @@ namespace { | |||
63 | 62 | ||
64 | case LuaType::LIGHTUSERDATA: | 63 | case LuaType::LIGHTUSERDATA: |
65 | { | 64 | { |
66 | static constexpr std::array<std::reference_wrapper<UniqueKey const>, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; | 65 | static constexpr std::array<std::reference_wrapper<UniqueKey const>, 2> kKeysToCheck{ kCancelError, kNilSentinel }; |
67 | for (UniqueKey const& _key : kKeysToCheck) { | 66 | for (UniqueKey const& _key : kKeysToCheck) { |
68 | if (_key.equals(L_, _i)) { | 67 | if (_key.equals(L_, _i)) { |
69 | raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); | 68 | raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); |
@@ -110,13 +109,13 @@ namespace { | |||
110 | { | 109 | { |
111 | Linda* const _linda{ ToLinda<OPT>(L_, idx_) }; | 110 | Linda* const _linda{ ToLinda<OPT>(L_, idx_) }; |
112 | if (_linda != nullptr) { | 111 | if (_linda != nullptr) { |
113 | luaG_pushstring(L_, "Linda: "); | 112 | luaW_pushstring(L_, "Linda: "); |
114 | std::string_view const _lindaName{ _linda->getName() }; | 113 | std::string_view const _lindaName{ _linda->getName() }; |
115 | if (!_lindaName.empty()) { | 114 | if (!_lindaName.empty()) { |
116 | luaG_pushstring(L_, _lindaName); | 115 | luaW_pushstring(L_, _lindaName); |
117 | } else { | 116 | } else { |
118 | // obfuscate the pointer so that we can't read the value with our eyes out of a script | 117 | // obfuscate the pointer so that we can't read the value with our eyes out of a script |
119 | luaG_pushstring(L_, "%p", _linda->obfuscated()); | 118 | luaW_pushstring(L_, "%p", _linda->obfuscated()); |
120 | } | 119 | } |
121 | lua_concat(L_, 2); | 120 | lua_concat(L_, 2); |
122 | return 1; | 121 | return 1; |
@@ -125,6 +124,183 @@ namespace { | |||
125 | } | 124 | } |
126 | 125 | ||
127 | // ############################################################################################# | 126 | // ############################################################################################# |
127 | |||
128 | // a helper to process the timeout argument of linda:send() and linda:receive() | ||
129 | [[nodiscard]] | ||
130 | static auto ProcessTimeoutArg(lua_State* const L_) | ||
131 | { | ||
132 | StackIndex _key_i{ 2 }; // index of first slot, if timeout not there | ||
133 | |||
134 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
135 | if (luaW_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion | ||
136 | lua_Duration const _duration{ lua_tonumber(L_, 2) }; | ||
137 | if (_duration.count() >= 0.0) { | ||
138 | _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration); | ||
139 | } else { | ||
140 | raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); | ||
141 | } | ||
142 | ++_key_i; | ||
143 | } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot | ||
144 | ++_key_i; | ||
145 | } | ||
146 | return std::make_pair(_key_i, _until); | ||
147 | } | ||
148 | |||
149 | // ############################################################################################# | ||
150 | static bool WaitInternal([[maybe_unused]] lua_State* const L_, Lane* const lane_, Linda* const linda_, Keeper* const keeper_, std::condition_variable& waitingOn_, std::chrono::time_point<std::chrono::steady_clock> until_) | ||
151 | { | ||
152 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
153 | if (lane_ != nullptr) { | ||
154 | // change status of lane to "waiting" | ||
155 | _prev_status = lane_->status.load(std::memory_order_acquire); // Running, most likely | ||
156 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
157 | LUA_ASSERT(L_, lane_->waiting_on == nullptr); | ||
158 | lane_->waiting_on = &waitingOn_; | ||
159 | lane_->status.store(Lane::Waiting, std::memory_order_release); | ||
160 | } | ||
161 | |||
162 | // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, | ||
163 | // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it | ||
164 | auto const [_forceTryAgain, _until_check_cancel] = std::invoke([until_, wakePeriod = linda_->getWakePeriod()] { | ||
165 | auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
166 | if (wakePeriod.count() > 0.0f) { | ||
167 | _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod); | ||
168 | } | ||
169 | bool const _forceTryAgain{ _until_check_cancel < until_ }; | ||
170 | return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : until_); | ||
171 | }); | ||
172 | |||
173 | // operation can't complete: wake when it is signalled to be possible, or when timeout is reached | ||
174 | std::unique_lock<std::mutex> _guard{ keeper_->mutex, std::adopt_lock }; | ||
175 | std::cv_status const _status{ waitingOn_.wait_until(_guard, _until_check_cancel) }; | ||
176 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
177 | bool const _try_again{ _forceTryAgain || (_status == std::cv_status::no_timeout) }; // detect spurious wakeups | ||
178 | if (lane_ != nullptr) { | ||
179 | lane_->waiting_on = nullptr; | ||
180 | lane_->status.store(_prev_status, std::memory_order_release); | ||
181 | } | ||
182 | return _try_again; | ||
183 | } | ||
184 | |||
185 | // ############################################################################################# | ||
186 | |||
187 | // the implementation for linda:receive() and linda:receive_batched() | ||
188 | static int ReceiveInternal(lua_State* const L_, bool const batched_) | ||
189 | { | ||
190 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | ||
191 | |||
192 | auto const [_key_i, _until] = ProcessTimeoutArg(L_); | ||
193 | |||
194 | keeper_api_t _selected_keeper_receive{ nullptr }; | ||
195 | int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; | ||
196 | // are we in batched mode? | ||
197 | if (batched_) { | ||
198 | // make sure the keys are of a valid type | ||
199 | CheckKeyTypes(L_, _key_i, _key_i); | ||
200 | // receive multiple values from a single slot | ||
201 | _selected_keeper_receive = KEEPER_API(receive_batched); | ||
202 | // we expect a user-defined amount of return value | ||
203 | _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1); | ||
204 | if (_expected_pushed_min < 1) { | ||
205 | raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count"); | ||
206 | } | ||
207 | _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min); | ||
208 | // don't forget to count the slot in addition to the values | ||
209 | ++_expected_pushed_min; | ||
210 | ++_expected_pushed_max; | ||
211 | if (_expected_pushed_min > _expected_pushed_max) { | ||
212 | raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error"); | ||
213 | } | ||
214 | } else { | ||
215 | // make sure the keys are of a valid type | ||
216 | CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) }); | ||
217 | // receive a single value, checking multiple slots | ||
218 | _selected_keeper_receive = KEEPER_API(receive); | ||
219 | // we expect a single (value, slot) pair of returned values | ||
220 | _expected_pushed_min = _expected_pushed_max = 2; | ||
221 | } | ||
222 | |||
223 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
224 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
225 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
226 | if (_K == nullptr) | ||
227 | return 0; | ||
228 | |||
229 | CancelRequest _cancel{ CancelRequest::None }; | ||
230 | KeeperCallResult _pushed{}; | ||
231 | |||
232 | STACK_CHECK_START_REL(_K, 0); | ||
233 | for (bool _try_again{ true };;) { | ||
234 | if (_lane != nullptr) { | ||
235 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); | ||
236 | } | ||
237 | _cancel = (_cancel != CancelRequest::None) | ||
238 | ? _cancel | ||
239 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | ||
240 | |||
241 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
242 | if (!_try_again || _cancel != CancelRequest::None) { | ||
243 | _pushed.emplace(0); | ||
244 | break; | ||
245 | } | ||
246 | |||
247 | // all arguments of receive() but the first are passed to the keeper's receive function | ||
248 | STACK_CHECK(_K, 0); | ||
249 | _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); | ||
250 | if (!_pushed.has_value()) { | ||
251 | break; | ||
252 | } | ||
253 | if (_pushed.value() > 0) { | ||
254 | LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max); | ||
255 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { | ||
256 | raise_luaL_error(L_, "Key is restricted"); | ||
257 | } | ||
258 | _linda->readHappened.notify_all(); | ||
259 | break; | ||
260 | } | ||
261 | |||
262 | if (std::chrono::steady_clock::now() >= _until) { | ||
263 | break; /* instant timeout */ | ||
264 | } | ||
265 | |||
266 | // nothing received, wait until timeout or signalled that we should try again | ||
267 | _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->writeHappened, _until); | ||
268 | } | ||
269 | STACK_CHECK(_K, 0); | ||
270 | |||
271 | if (!_pushed.has_value()) { | ||
272 | raise_luaL_error(L_, "tried to copy unsupported types"); | ||
273 | } | ||
274 | |||
275 | switch (_cancel) { | ||
276 | case CancelRequest::None: | ||
277 | { | ||
278 | int const _nbPushed{ _pushed.value() }; | ||
279 | if (_nbPushed == 0) { | ||
280 | // not enough data in the linda slot to fulfill the request, return nil, "timeout" | ||
281 | lua_pushnil(L_); | ||
282 | luaW_pushstring(L_, "timeout"); | ||
283 | return 2; | ||
284 | } | ||
285 | return _nbPushed; | ||
286 | } | ||
287 | |||
288 | case CancelRequest::Soft: | ||
289 | // if user wants to soft-cancel, the call returns nil, kCancelError | ||
290 | lua_pushnil(L_); | ||
291 | kCancelError.pushKey(L_); | ||
292 | return 2; | ||
293 | |||
294 | case CancelRequest::Hard: | ||
295 | // raise an error interrupting execution only in case of hard cancel | ||
296 | raise_cancel_error(L_); // raises an error and doesn't return | ||
297 | |||
298 | default: | ||
299 | raise_luaL_error(L_, "internal error: unknown cancel request"); | ||
300 | } | ||
301 | } | ||
302 | |||
303 | // ############################################################################################# | ||
128 | // ############################################################################################# | 304 | // ############################################################################################# |
129 | } // namespace | 305 | } // namespace |
130 | // ################################################################################################# | 306 | // ################################################################################################# |
@@ -138,9 +314,10 @@ LUAG_FUNC(linda); | |||
138 | // ################################################################################################# | 314 | // ################################################################################################# |
139 | // ################################################################################################# | 315 | // ################################################################################################# |
140 | 316 | ||
141 | Linda::Linda(Universe* const U_, LindaGroup const group_, std::string_view const& name_) | 317 | Linda::Linda(Universe* const U_, std::string_view const& name_, lua_Duration const wake_period_, LindaGroup const group_) |
142 | : DeepPrelude{ LindaFactory::Instance } | 318 | : DeepPrelude{ LindaFactory::Instance } |
143 | , U{ U_ } | 319 | , U{ U_ } |
320 | , wakePeriod{ wake_period_ } | ||
144 | , keeperIndex{ group_ % U_->keepers.getNbKeepers() } | 321 | , keeperIndex{ group_ % U_->keepers.getNbKeepers() } |
145 | { | 322 | { |
146 | setName(name_); | 323 | setName(name_); |
@@ -161,6 +338,7 @@ Keeper* Linda::acquireKeeper() const | |||
161 | Keeper* const _keeper{ whichKeeper() }; | 338 | Keeper* const _keeper{ whichKeeper() }; |
162 | if (_keeper) { | 339 | if (_keeper) { |
163 | _keeper->mutex.lock(); | 340 | _keeper->mutex.lock(); |
341 | keeperOperationCount.fetch_add(1, std::memory_order_seq_cst); | ||
164 | } | 342 | } |
165 | return _keeper; | 343 | return _keeper; |
166 | } | 344 | } |
@@ -172,13 +350,17 @@ Linda* Linda::CreateTimerLinda(lua_State* const L_) | |||
172 | STACK_CHECK_START_REL(L_, 0); // L_: | 350 | STACK_CHECK_START_REL(L_, 0); // L_: |
173 | // Initialize 'timerLinda'; a common Linda object shared by all states | 351 | // Initialize 'timerLinda'; a common Linda object shared by all states |
174 | lua_pushcfunction(L_, LG_linda); // L_: lanes.linda | 352 | lua_pushcfunction(L_, LG_linda); // L_: lanes.linda |
175 | luaG_pushstring(L_, "lanes-timer"); // L_: lanes.linda "lanes-timer" | 353 | lua_createtable(L_, 0, 3); // L_: lanes.linda {} |
176 | lua_pushinteger(L_, 0); // L_: lanes.linda "lanes-timer" 0 | 354 | luaW_pushstring(L_, "lanes-timer"); // L_: lanes.linda {} "lanes-timer" |
177 | lua_call(L_, 2, 1); // L_: linda | 355 | luaW_setfield(L_, StackIndex{ -2 }, std::string_view{ "name" }); // L_: lanes.linda { .name="lanes-timer" } |
356 | lua_pushinteger(L_, 0); // L_: lanes.linda { .name="lanes-timer" } 0 | ||
357 | luaW_setfield(L_, StackIndex{ -2 }, std::string_view{ "group" }); // L_: lanes.linda { .name="lanes-timer" .group = 0 } | ||
358 | // note that wake_period is not set (will default to the value in the universe) | ||
359 | lua_call(L_, 1, 1); // L_: linda | ||
178 | STACK_CHECK(L_, 1); | 360 | STACK_CHECK(L_, 1); |
179 | 361 | ||
180 | // Proxy userdata contents is only a 'DeepPrelude*' pointer | 362 | // Proxy userdata contents is only a 'DeepPrelude*' pointer |
181 | auto const _timerLinda{ *luaG_tofulluserdata<Linda*>(L_, kIdxTop) }; | 363 | auto const _timerLinda{ *luaW_tofulluserdata<Linda*>(L_, kIdxTop) }; |
182 | // increment refcount so that this linda remains alive as long as the universe exists. | 364 | // increment refcount so that this linda remains alive as long as the universe exists. |
183 | _timerLinda->refcount.fetch_add(1, std::memory_order_relaxed); | 365 | _timerLinda->refcount.fetch_add(1, std::memory_order_relaxed); |
184 | lua_pop(L_, 1); // L_: | 366 | lua_pop(L_, 1); // L_: |
@@ -241,7 +423,6 @@ int Linda::ProtectedCall(lua_State* const L_, lua_CFunction const f_) | |||
241 | // doing LindaFactory::deleteDeepObjectInternal -> keeper_call(clear) | 423 | // doing LindaFactory::deleteDeepObjectInternal -> keeper_call(clear) |
242 | lua_gc(L_, LUA_GCSTOP, 0); | 424 | lua_gc(L_, LUA_GCSTOP, 0); |
243 | 425 | ||
244 | LUA_ASSERT_CODE(auto const _koip{ _linda->startKeeperOperation(L_) }); | ||
245 | // if we didn't do anything wrong, the keeper stack should be clean | 426 | // if we didn't do anything wrong, the keeper stack should be clean |
246 | LUA_ASSERT(L_, lua_gettop(_K) == 0); | 427 | LUA_ASSERT(L_, lua_gettop(_K) == 0); |
247 | 428 | ||
@@ -249,7 +430,7 @@ int Linda::ProtectedCall(lua_State* const L_, lua_CFunction const f_) | |||
249 | lua_pushcfunction(L_, f_); | 430 | lua_pushcfunction(L_, f_); |
250 | lua_insert(L_, 1); | 431 | lua_insert(L_, 1); |
251 | // do a protected call | 432 | // do a protected call |
252 | LuaError const _rc{ lua_pcall(L_, lua_gettop(L_) - 1, LUA_MULTRET, 0) }; | 433 | LuaError const _rc{ ToLuaError(lua_pcall(L_, lua_gettop(L_) - 1, LUA_MULTRET, 0)) }; |
253 | // whatever happens, the keeper state stack must be empty when we are done | 434 | // whatever happens, the keeper state stack must be empty when we are done |
254 | lua_settop(_K, 0); | 435 | lua_settop(_K, 0); |
255 | 436 | ||
@@ -271,7 +452,7 @@ int Linda::ProtectedCall(lua_State* const L_, lua_CFunction const f_) | |||
271 | 452 | ||
272 | void Linda::pushCancelString(lua_State* L_) const | 453 | void Linda::pushCancelString(lua_State* L_) const |
273 | { | 454 | { |
274 | luaG_pushstring(L_, cancelStatus == Status::Cancelled ? "cancelled" : "active"); | 455 | luaW_pushstring(L_, cancelStatus == Status::Cancelled ? "cancelled" : "active"); |
275 | } | 456 | } |
276 | 457 | ||
277 | // ################################################################################################# | 458 | // ################################################################################################# |
@@ -280,6 +461,7 @@ void Linda::releaseKeeper(Keeper* const keeper_) const | |||
280 | { | 461 | { |
281 | if (keeper_) { // can be nullptr if we tried to acquire during shutdown | 462 | if (keeper_) { // can be nullptr if we tried to acquire during shutdown |
282 | assert(keeper_ == whichKeeper()); | 463 | assert(keeper_ == whichKeeper()); |
464 | keeperOperationCount.fetch_sub(1, std::memory_order_seq_cst); | ||
283 | keeper_->mutex.unlock(); | 465 | keeper_->mutex.unlock(); |
284 | } | 466 | } |
285 | } | 467 | } |
@@ -323,7 +505,7 @@ void Linda::setName(std::string_view const& name_) | |||
323 | LUAG_FUNC(linda_cancel) | 505 | LUAG_FUNC(linda_cancel) |
324 | { | 506 | { |
325 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | 507 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; |
326 | std::string_view const _who{ luaG_optstring(L_, StackIndex{ 2 }, "both") }; | 508 | std::string_view const _who{ luaW_optstring(L_, StackIndex{ 2 }, "both") }; |
327 | // make sure we got 2 arguments: the linda and the cancellation mode | 509 | // make sure we got 2 arguments: the linda and the cancellation mode |
328 | luaL_argcheck(L_, lua_gettop(L_) <= 2, 2, "wrong number of arguments"); | 510 | luaL_argcheck(L_, lua_gettop(L_) <= 2, 2, "wrong number of arguments"); |
329 | 511 | ||
@@ -411,13 +593,13 @@ static int linda_index_string(lua_State* L_) | |||
411 | Linda* const _linda{ ToLinda<false>(L_, kIdxSelf) }; | 593 | Linda* const _linda{ ToLinda<false>(L_, kIdxSelf) }; |
412 | LUA_ASSERT(L_, lua_gettop(L_) == 2); // L_: linda "key" | 594 | LUA_ASSERT(L_, lua_gettop(L_) == 2); // L_: linda "key" |
413 | 595 | ||
414 | std::string_view const _keystr{ luaG_tostring(L_, kIdxKey) }; | 596 | std::string_view const _keystr{ luaW_tostring(L_, kIdxKey) }; |
415 | lua_settop(L_, 2); // keep only our original arguments on the stack | 597 | lua_settop(L_, 2); // keep only our original arguments on the stack |
416 | 598 | ||
417 | // look in metatable first | 599 | // look in metatable first |
418 | lua_getmetatable(L_, kIdxSelf); // L_: linda "key" mt | 600 | lua_getmetatable(L_, kIdxSelf); // L_: linda "key" mt |
419 | lua_replace(L_, -3); // L_: mt "key" | 601 | lua_replace(L_, -3); // L_: mt "key" |
420 | if (luaG_rawget(L_, StackIndex{ -2 }) != LuaType::NIL) { // found something? // L_: mt value | 602 | if (luaW_rawget(L_, StackIndex{ -2 }) != LuaType::NIL) { // found something? // L_: mt value |
421 | return 1; // done | 603 | return 1; // done |
422 | } | 604 | } |
423 | 605 | ||
@@ -437,12 +619,12 @@ static LUAG_FUNC(linda_index) | |||
437 | static constexpr StackIndex kIdxKey{ 2 }; | 619 | static constexpr StackIndex kIdxKey{ 2 }; |
438 | LUA_ASSERT(L_, lua_gettop(L_) == 2); | 620 | LUA_ASSERT(L_, lua_gettop(L_) == 2); |
439 | 621 | ||
440 | switch (luaG_type(L_, kIdxKey)) { | 622 | switch (luaW_type(L_, kIdxKey)) { |
441 | case LuaType::STRING: | 623 | case LuaType::STRING: |
442 | return linda_index_string(L_); // stack modification is undefined, returned value is at the top | 624 | return linda_index_string(L_); // stack modification is undefined, returned value is at the top |
443 | 625 | ||
444 | default: // unknown key | 626 | default: // unknown key |
445 | raise_luaL_error(L_, "Unsupported linda indexing key type %s", luaG_typename(L_, kIdxKey).data()); | 627 | raise_luaL_error(L_, "Unsupported linda indexing key type %s", luaW_typename(L_, kIdxKey).data()); |
446 | } | 628 | } |
447 | } | 629 | } |
448 | 630 | ||
@@ -585,7 +767,7 @@ LUAG_FUNC(linda_limit) | |||
585 | int const _nargs{ lua_gettop(L_) }; | 767 | int const _nargs{ lua_gettop(L_) }; |
586 | luaL_argcheck(L_, _nargs == 2 || _nargs == 3, 2, "wrong number of arguments"); | 768 | luaL_argcheck(L_, _nargs == 2 || _nargs == 3, 2, "wrong number of arguments"); |
587 | // make sure we got a numeric limit, or "unlimited", (or nothing) | 769 | // make sure we got a numeric limit, or "unlimited", (or nothing) |
588 | bool const _unlimited{ luaG_tostring(L_, StackIndex{ 3 }) == "unlimited" }; | 770 | bool const _unlimited{ luaW_tostring(L_, StackIndex{ 3 }) == "unlimited" }; |
589 | LindaLimit const _val{ _unlimited ? std::numeric_limits<LindaLimit::type>::max() : static_cast<LindaLimit::type>(luaL_optinteger(L_, 3, 0)) }; | 771 | LindaLimit const _val{ _unlimited ? std::numeric_limits<LindaLimit::type>::max() : static_cast<LindaLimit::type>(luaL_optinteger(L_, 3, 0)) }; |
590 | if (_val < 0) { | 772 | if (_val < 0) { |
591 | raise_luaL_argerror(L_, StackIndex{ 3 }, "limit must be >= 0"); | 773 | raise_luaL_argerror(L_, StackIndex{ 3 }, "limit must be >= 0"); |
@@ -596,23 +778,23 @@ LUAG_FUNC(linda_limit) | |||
596 | KeeperCallResult _pushed; | 778 | KeeperCallResult _pushed; |
597 | if (_linda->cancelStatus == Linda::Active) { | 779 | if (_linda->cancelStatus == Linda::Active) { |
598 | if (_unlimited) { | 780 | if (_unlimited) { |
599 | LUA_ASSERT(L_, lua_gettop(L_) == 3 && luaG_tostring(L_, StackIndex{ 3 }) == "unlimited"); | 781 | LUA_ASSERT(L_, lua_gettop(L_) == 3 && luaW_tostring(L_, StackIndex{ 3 }) == "unlimited"); |
600 | // inside the Keeper, unlimited is signified with a -1 limit (can't use nil because of nil kNilSentinel conversions!) | 782 | // inside the Keeper, unlimited is signified with a -1 limit (can't use nil because of nil kNilSentinel conversions!) |
601 | lua_pop(L_, 1); // L_: linda slot | 783 | lua_pop(L_, 1); // L_: linda slot |
602 | lua_pushinteger(L_, -1); // L_: linda slot nil | 784 | lua_pushinteger(L_, -1); // L_: linda slot nil |
603 | } | 785 | } |
604 | Keeper* const _keeper{ _linda->whichKeeper() }; | 786 | Keeper* const _keeper{ _linda->whichKeeper() }; |
605 | _pushed = keeper_call(_keeper->K, KEEPER_API(limit), L_, _linda, StackIndex{ 2 }); | 787 | _pushed = keeper_call(_keeper->K, KEEPER_API(limit), L_, _linda, StackIndex{ 2 }); |
606 | LUA_ASSERT(L_, _pushed.has_value() && (_pushed.value() == 2) && luaG_type(L_, kIdxTop) == LuaType::STRING); | 788 | LUA_ASSERT(L_, _pushed.has_value() && (_pushed.value() == 2) && luaW_type(L_, kIdxTop) == LuaType::STRING); |
607 | if (_nargs == 3) { // 3 args: setting the limit | 789 | if (_nargs == 3) { // 3 args: setting the limit |
608 | // changing the limit: no error, boolean value saying if we should wake blocked writer threads | 790 | // changing the limit: no error, boolean value saying if we should wake blocked writer threads |
609 | LUA_ASSERT(L_, luaG_type(L_, StackIndex{ -2 }) == LuaType::BOOLEAN); // L_: bool string | 791 | LUA_ASSERT(L_, luaW_type(L_, StackIndex{ -2 }) == LuaType::BOOLEAN); // L_: bool string |
610 | if (lua_toboolean(L_, -2)) { | 792 | if (lua_toboolean(L_, -2)) { |
611 | _linda->readHappened.notify_all(); // To be done from within the 'K' locking area | 793 | _linda->readHappened.notify_all(); // To be done from within the 'K' locking area |
612 | } | 794 | } |
613 | } else { // 2 args: reading the limit | 795 | } else { // 2 args: reading the limit |
614 | // reading the limit: a number >=0 or "unlimited" | 796 | // reading the limit: a number >=0 or "unlimited" |
615 | LUA_ASSERT(L_, luaG_type(L_, StackIndex{ -2 }) == LuaType::NUMBER || luaG_tostring(L_, StackIndex{ -2 }) == "unlimited"); | 797 | LUA_ASSERT(L_, luaW_type(L_, StackIndex{ -2 }) == LuaType::NUMBER || luaW_tostring(L_, StackIndex{ -2 }) == "unlimited"); |
616 | } | 798 | } |
617 | } else { // linda is cancelled | 799 | } else { // linda is cancelled |
618 | // do nothing and return nil,lanes.cancel_error | 800 | // do nothing and return nil,lanes.cancel_error |
@@ -630,163 +812,25 @@ LUAG_FUNC(linda_limit) | |||
630 | // ################################################################################################# | 812 | // ################################################################################################# |
631 | 813 | ||
632 | /* | 814 | /* |
633 | * 2 modes of operation | 815 | * [val, slot] = linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) |
634 | * [val, slot]= linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) | ||
635 | * Consumes a single value from the Linda, in any slot. | 816 | * Consumes a single value from the Linda, in any slot. |
636 | * Returns: received value (which is consumed from the slot), and the slot which had it | 817 | * Returns: received value (which is consumed from the slot), and the slot which had it |
637 | |||
638 | * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) | ||
639 | * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot. | ||
640 | * returns the actual consumed values, or nil if there weren't enough values to consume | ||
641 | */ | 818 | */ |
642 | LUAG_FUNC(linda_receive) | 819 | LUAG_FUNC(linda_receive) |
643 | { | 820 | { |
644 | static constexpr lua_CFunction _receive{ | 821 | return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, false); }); |
645 | +[](lua_State* const L_) { | 822 | } |
646 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | ||
647 | StackIndex _key_i{ 2 }; // index of first slot, if timeout not there | ||
648 | |||
649 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
650 | if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion | ||
651 | lua_Duration const _duration{ lua_tonumber(L_, 2) }; | ||
652 | if (_duration.count() >= 0.0) { | ||
653 | _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration); | ||
654 | } else { | ||
655 | raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); | ||
656 | } | ||
657 | ++_key_i; | ||
658 | } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot | ||
659 | ++_key_i; | ||
660 | } | ||
661 | |||
662 | keeper_api_t _selected_keeper_receive{ nullptr }; | ||
663 | int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; | ||
664 | // are we in batched mode? | ||
665 | if (kLindaBatched.equals(L_, _key_i)) { | ||
666 | // no need to pass linda.batched in the keeper state | ||
667 | ++_key_i; | ||
668 | // make sure the keys are of a valid type | ||
669 | CheckKeyTypes(L_, _key_i, _key_i); | ||
670 | // receive multiple values from a single slot | ||
671 | _selected_keeper_receive = KEEPER_API(receive_batched); | ||
672 | // we expect a user-defined amount of return value | ||
673 | _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1); | ||
674 | if (_expected_pushed_min < 1) { | ||
675 | raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count"); | ||
676 | } | ||
677 | _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min); | ||
678 | // don't forget to count the slot in addition to the values | ||
679 | ++_expected_pushed_min; | ||
680 | ++_expected_pushed_max; | ||
681 | if (_expected_pushed_min > _expected_pushed_max) { | ||
682 | raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error"); | ||
683 | } | ||
684 | } else { | ||
685 | // make sure the keys are of a valid type | ||
686 | CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) }); | ||
687 | // receive a single value, checking multiple slots | ||
688 | _selected_keeper_receive = KEEPER_API(receive); | ||
689 | // we expect a single (value, slot) pair of returned values | ||
690 | _expected_pushed_min = _expected_pushed_max = 2; | ||
691 | } | ||
692 | |||
693 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
694 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
695 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
696 | if (_K == nullptr) | ||
697 | return 0; | ||
698 | |||
699 | CancelRequest _cancel{ CancelRequest::None }; | ||
700 | KeeperCallResult _pushed{}; | ||
701 | STACK_CHECK_START_REL(_K, 0); | ||
702 | for (bool _try_again{ true };;) { | ||
703 | if (_lane != nullptr) { | ||
704 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); | ||
705 | } | ||
706 | _cancel = (_cancel != CancelRequest::None) | ||
707 | ? _cancel | ||
708 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | ||
709 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
710 | if (!_try_again || _cancel != CancelRequest::None) { | ||
711 | _pushed.emplace(0); | ||
712 | break; | ||
713 | } | ||
714 | |||
715 | // all arguments of receive() but the first are passed to the keeper's receive function | ||
716 | _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); | ||
717 | if (!_pushed.has_value()) { | ||
718 | break; | ||
719 | } | ||
720 | if (_pushed.value() > 0) { | ||
721 | LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max); | ||
722 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { | ||
723 | raise_luaL_error(L_, "Key is restricted"); | ||
724 | } | ||
725 | _linda->readHappened.notify_all(); | ||
726 | break; | ||
727 | } | ||
728 | |||
729 | if (std::chrono::steady_clock::now() >= _until) { | ||
730 | break; /* instant timeout */ | ||
731 | } | ||
732 | |||
733 | // nothing received, wait until timeout or signalled that we should try again | ||
734 | { | ||
735 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
736 | if (_lane != nullptr) { | ||
737 | // change status of lane to "waiting" | ||
738 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
739 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
740 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
741 | _lane->waiting_on = &_linda->writeHappened; | ||
742 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
743 | } | ||
744 | // not enough data to read: wakeup when data was sent, or when timeout is reached | ||
745 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
746 | std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) }; | ||
747 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
748 | _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups | ||
749 | if (_lane != nullptr) { | ||
750 | _lane->waiting_on = nullptr; | ||
751 | _lane->status.store(_prev_status, std::memory_order_release); | ||
752 | } | ||
753 | } | ||
754 | } | ||
755 | STACK_CHECK(_K, 0); | ||
756 | |||
757 | if (!_pushed.has_value()) { | ||
758 | raise_luaL_error(L_, "tried to copy unsupported types"); | ||
759 | } | ||
760 | |||
761 | switch (_cancel) { | ||
762 | case CancelRequest::None: | ||
763 | { | ||
764 | int const _nbPushed{ _pushed.value() }; | ||
765 | if (_nbPushed == 0) { | ||
766 | // not enough data in the linda slot to fulfill the request, return nil, "timeout" | ||
767 | lua_pushnil(L_); | ||
768 | luaG_pushstring(L_, "timeout"); | ||
769 | return 2; | ||
770 | } | ||
771 | return _nbPushed; | ||
772 | } | ||
773 | |||
774 | case CancelRequest::Soft: | ||
775 | // if user wants to soft-cancel, the call returns nil, kCancelError | ||
776 | lua_pushnil(L_); | ||
777 | kCancelError.pushKey(L_); | ||
778 | return 2; | ||
779 | 823 | ||
780 | case CancelRequest::Hard: | 824 | // ################################################################################################# |
781 | // raise an error interrupting execution only in case of hard cancel | ||
782 | raise_cancel_error(L_); // raises an error and doesn't return | ||
783 | 825 | ||
784 | default: | 826 | /* |
785 | raise_luaL_error(L_, "internal error: unknown cancel request"); | 827 | * [val1, ... valCOUNT] = linda_receive_batched( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) |
786 | } | 828 | * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot. |
787 | } | 829 | * returns the actual consumed values, or nil if there weren't enough values to consume |
788 | }; | 830 | */ |
789 | return Linda::ProtectedCall(L_, _receive); | 831 | LUAG_FUNC(linda_receive_batched) |
832 | { | ||
833 | return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, true); }); | ||
790 | } | 834 | } |
791 | 835 | ||
792 | // ################################################################################################# | 836 | // ################################################################################################# |
@@ -806,7 +850,7 @@ LUAG_FUNC(linda_restrict) | |||
806 | int const _nargs{ lua_gettop(L_) }; | 850 | int const _nargs{ lua_gettop(L_) }; |
807 | luaL_argcheck(L_, _nargs == 2 || _nargs == 3, 2, "wrong number of arguments"); | 851 | luaL_argcheck(L_, _nargs == 2 || _nargs == 3, 2, "wrong number of arguments"); |
808 | // make sure we got a known restrict mode, (or nothing) | 852 | // make sure we got a known restrict mode, (or nothing) |
809 | std::string_view const _mode{ luaG_tostring(L_, StackIndex{ 3 }) }; | 853 | std::string_view const _mode{ luaW_tostring(L_, StackIndex{ 3 }) }; |
810 | if (!_mode.empty() && (_mode != "none" && _mode != "set/get" && _mode != "send/receive")) { | 854 | if (!_mode.empty() && (_mode != "none" && _mode != "set/get" && _mode != "send/receive")) { |
811 | raise_luaL_argerror(L_, StackIndex{ 3 }, "unknown restrict mode"); | 855 | raise_luaL_argerror(L_, StackIndex{ 3 }, "unknown restrict mode"); |
812 | } | 856 | } |
@@ -818,7 +862,7 @@ LUAG_FUNC(linda_restrict) | |||
818 | Keeper* const _keeper{ _linda->whichKeeper() }; | 862 | Keeper* const _keeper{ _linda->whichKeeper() }; |
819 | _pushed = keeper_call(_keeper->K, KEEPER_API(restrict), L_, _linda, StackIndex{ 2 }); | 863 | _pushed = keeper_call(_keeper->K, KEEPER_API(restrict), L_, _linda, StackIndex{ 2 }); |
820 | // we should get a single return value: the string describing the previous restrict mode | 864 | // we should get a single return value: the string describing the previous restrict mode |
821 | LUA_ASSERT(L_, _pushed.has_value() && (_pushed.value() == 1) && luaG_type(L_, kIdxTop) == LuaType::STRING); | 865 | LUA_ASSERT(L_, _pushed.has_value() && (_pushed.value() == 1) && luaW_type(L_, kIdxTop) == LuaType::STRING); |
822 | } else { // linda is cancelled | 866 | } else { // linda is cancelled |
823 | // do nothing and return nil,lanes.cancel_error | 867 | // do nothing and return nil,lanes.cancel_error |
824 | lua_pushnil(L_); | 868 | lua_pushnil(L_); |
@@ -848,20 +892,8 @@ LUAG_FUNC(linda_send) | |||
848 | static constexpr lua_CFunction _send{ | 892 | static constexpr lua_CFunction _send{ |
849 | +[](lua_State* const L_) { | 893 | +[](lua_State* const L_) { |
850 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | 894 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; |
851 | StackIndex _key_i{ 2 }; // index of first slot, if timeout not there | ||
852 | 895 | ||
853 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | 896 | auto const [_key_i, _until] = ProcessTimeoutArg(L_); |
854 | if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion | ||
855 | lua_Duration const _duration{ lua_tonumber(L_, 2) }; | ||
856 | if (_duration.count() >= 0.0) { | ||
857 | _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration); | ||
858 | } else { | ||
859 | raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); | ||
860 | } | ||
861 | ++_key_i; | ||
862 | } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot | ||
863 | ++_key_i; | ||
864 | } | ||
865 | 897 | ||
866 | // make sure the slot is of a valid type | 898 | // make sure the slot is of a valid type |
867 | CheckKeyTypes(L_, _key_i, _key_i); | 899 | CheckKeyTypes(L_, _key_i, _key_i); |
@@ -873,78 +905,60 @@ LUAG_FUNC(linda_send) | |||
873 | raise_luaL_error(L_, "no data to send"); | 905 | raise_luaL_error(L_, "no data to send"); |
874 | } | 906 | } |
875 | 907 | ||
908 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
909 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
910 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
911 | if (_K == nullptr) | ||
912 | return 0; | ||
913 | |||
876 | bool _ret{ false }; | 914 | bool _ret{ false }; |
877 | CancelRequest _cancel{ CancelRequest::None }; | 915 | CancelRequest _cancel{ CancelRequest::None }; |
878 | KeeperCallResult _pushed; | 916 | KeeperCallResult _pushed{}; |
879 | { | ||
880 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
881 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
882 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
883 | if (_K == nullptr) | ||
884 | return 0; | ||
885 | |||
886 | STACK_CHECK_START_REL(_K, 0); | ||
887 | for (bool _try_again{ true };;) { | ||
888 | if (_lane != nullptr) { | ||
889 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); | ||
890 | } | ||
891 | _cancel = (_cancel != CancelRequest::None) | ||
892 | ? _cancel | ||
893 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | ||
894 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
895 | if (!_try_again || _cancel != CancelRequest::None) { | ||
896 | _pushed.emplace(0); | ||
897 | break; | ||
898 | } | ||
899 | 917 | ||
900 | STACK_CHECK(_K, 0); | 918 | STACK_CHECK_START_REL(_K, 0); |
901 | _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i); | 919 | for (bool _try_again{ true };;) { |
902 | if (!_pushed.has_value()) { | 920 | if (_lane != nullptr) { |
903 | break; | 921 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); |
904 | } | 922 | } |
905 | LUA_ASSERT(L_, _pushed.value() == 1); | 923 | _cancel = (_cancel != CancelRequest::None) |
924 | ? _cancel | ||
925 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | ||
906 | 926 | ||
907 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { | 927 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything |
908 | raise_luaL_error(L_, "Key is restricted"); | 928 | if (!_try_again || _cancel != CancelRequest::None) { |
909 | } | 929 | _pushed.emplace(0); |
910 | _ret = lua_toboolean(L_, -1) ? true : false; | 930 | break; |
911 | lua_pop(L_, 1); | 931 | } |
912 | 932 | ||
913 | if (_ret) { | 933 | // all arguments of send() but the first are passed to the keeper's send function |
914 | // Wake up ALL waiting threads | 934 | STACK_CHECK(_K, 0); |
915 | _linda->writeHappened.notify_all(); | 935 | _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i); |
916 | break; | 936 | if (!_pushed.has_value()) { |
917 | } | 937 | break; |
938 | } | ||
939 | LUA_ASSERT(L_, _pushed.value() == 1); | ||
918 | 940 | ||
919 | // instant timout to bypass the wait syscall | 941 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { |
920 | if (std::chrono::steady_clock::now() >= _until) { | 942 | raise_luaL_error(L_, "Key is restricted"); |
921 | break; /* no wait; instant timeout */ | 943 | } |
922 | } | 944 | _ret = lua_toboolean(L_, -1) ? true : false; |
945 | lua_pop(L_, 1); | ||
923 | 946 | ||
924 | // storage limit hit, wait until timeout or signalled that we should try again | 947 | if (_ret) { |
925 | { | 948 | // Wake up ALL waiting threads |
926 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | 949 | _linda->writeHappened.notify_all(); |
927 | if (_lane != nullptr) { | 950 | break; |
928 | // change status of lane to "waiting" | ||
929 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
930 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
931 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
932 | _lane->waiting_on = &_linda->readHappened; | ||
933 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
934 | } | ||
935 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached | ||
936 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
937 | std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) }; | ||
938 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
939 | _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
940 | if (_lane != nullptr) { | ||
941 | _lane->waiting_on = nullptr; | ||
942 | _lane->status.store(_prev_status, std::memory_order_release); | ||
943 | } | ||
944 | } | ||
945 | } | 951 | } |
946 | STACK_CHECK(_K, 0); | 952 | |
953 | // instant timout to bypass the wait syscall | ||
954 | if (std::chrono::steady_clock::now() >= _until) { | ||
955 | break; /* no wait; instant timeout */ | ||
956 | } | ||
957 | |||
958 | // storage limit hit, wait until timeout or signalled that we should try again | ||
959 | _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->readHappened, _until); | ||
947 | } | 960 | } |
961 | STACK_CHECK(_K, 0); | ||
948 | 962 | ||
949 | if (!_pushed.has_value()) { | 963 | if (!_pushed.has_value()) { |
950 | raise_luaL_error(L_, "tried to copy unsupported types"); | 964 | raise_luaL_error(L_, "tried to copy unsupported types"); |
@@ -968,7 +982,7 @@ LUAG_FUNC(linda_send) | |||
968 | } else { | 982 | } else { |
969 | // not enough room in the Linda slot to fulfill the request, return nil, "timeout" | 983 | // not enough room in the Linda slot to fulfill the request, return nil, "timeout" |
970 | lua_pushnil(L_); | 984 | lua_pushnil(L_); |
971 | luaG_pushstring(L_, "timeout"); | 985 | luaW_pushstring(L_, "timeout"); |
972 | return 2; | 986 | return 2; |
973 | } | 987 | } |
974 | } | 988 | } |
@@ -1003,7 +1017,7 @@ LUAG_FUNC(linda_set) | |||
1003 | if (kRestrictedChannel.equals(L_, kIdxTop)) { | 1017 | if (kRestrictedChannel.equals(L_, kIdxTop)) { |
1004 | raise_luaL_error(L_, "Key is restricted"); | 1018 | raise_luaL_error(L_, "Key is restricted"); |
1005 | } | 1019 | } |
1006 | LUA_ASSERT(L_, _pushed.value() == 2 && luaG_type(L_, kIdxTop) == LuaType::STRING && luaG_type(L_, StackIndex{ -2 }) == LuaType::BOOLEAN); | 1020 | LUA_ASSERT(L_, _pushed.value() == 2 && luaW_type(L_, kIdxTop) == LuaType::STRING && luaW_type(L_, StackIndex{ -2 }) == LuaType::BOOLEAN); |
1007 | 1021 | ||
1008 | if (_has_data) { | 1022 | if (_has_data) { |
1009 | // we put some data in the slot, tell readers that they should wake | 1023 | // we put some data in the slot, tell readers that they should wake |
@@ -1065,7 +1079,7 @@ LUAG_FUNC(linda_towatch) | |||
1065 | LUAG_FUNC(linda_wake) | 1079 | LUAG_FUNC(linda_wake) |
1066 | { | 1080 | { |
1067 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | 1081 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; |
1068 | std::string_view const _who{ luaG_optstring(L_, StackIndex{ 2 }, "both") }; | 1082 | std::string_view const _who{ luaW_optstring(L_, StackIndex{ 2 }, "both") }; |
1069 | // make sure we got 2 arguments: the linda and the wake targets | 1083 | // make sure we got 2 arguments: the linda and the wake targets |
1070 | luaL_argcheck(L_, lua_gettop(L_) <= 2, 2, "wrong number of arguments"); | 1084 | luaL_argcheck(L_, lua_gettop(L_) <= 2, 2, "wrong number of arguments"); |
1071 | 1085 | ||
@@ -1104,6 +1118,7 @@ namespace { | |||
1104 | { "get", LG_linda_get }, | 1118 | { "get", LG_linda_get }, |
1105 | { "limit", LG_linda_limit }, | 1119 | { "limit", LG_linda_limit }, |
1106 | { "receive", LG_linda_receive }, | 1120 | { "receive", LG_linda_receive }, |
1121 | { "receive_batched", LG_linda_receive_batched }, | ||
1107 | { "restrict", LG_linda_restrict }, | 1122 | { "restrict", LG_linda_restrict }, |
1108 | { "send", LG_linda_send }, | 1123 | { "send", LG_linda_send }, |
1109 | { "set", LG_linda_set }, | 1124 | { "set", LG_linda_set }, |
@@ -1120,88 +1135,77 @@ namespace { | |||
1120 | // ################################################################################################# | 1135 | // ################################################################################################# |
1121 | 1136 | ||
1122 | /* | 1137 | /* |
1123 | * ud = lanes.linda( [name[,group[,close_handler]]]) | 1138 | * ud = lanes.linda{.name = <string>, .group = <number>, .close_handler = <callable>, .wake_period = <number>} |
1124 | * | 1139 | * |
1125 | * returns a linda object, or raises an error if creation failed | 1140 | * returns a linda object, or raises an error if creation failed |
1126 | */ | 1141 | */ |
1127 | LUAG_FUNC(linda) | 1142 | LUAG_FUNC(linda) |
1128 | { | 1143 | { |
1129 | static constexpr StackIndex kLastArg{ LUA_VERSION_NUM >= 504 ? 3 : 2 }; | 1144 | // unpack the received table on the stack, putting name wake_period group close_handler in that order |
1130 | StackIndex const _top{ lua_gettop(L_) }; | 1145 | StackIndex const _top{ lua_gettop(L_) }; |
1131 | luaL_argcheck(L_, _top <= kLastArg, _top, "too many arguments"); | 1146 | luaL_argcheck(L_, _top <= 1, _top, "too many arguments"); |
1132 | StackIndex _closeHandlerIdx{}; | 1147 | if (_top == 0) { |
1133 | StackIndex _nameIdx{}; | 1148 | lua_settop(L_, 3); // L_: nil nil nil |
1134 | StackIndex _groupIdx{}; | 1149 | } |
1135 | for (StackIndex const _i : std::ranges::iota_view{ StackIndex{ 1 }, StackIndex{ _top + 1 }}) { | 1150 | else if (!lua_istable(L_, kIdxTop)) { |
1136 | switch (luaG_type(L_, _i)) { | 1151 | luaL_argerror(L_, 1, "expecting a table"); |
1152 | } else { | ||
1153 | auto* const _U{ Universe::Get(L_) }; | ||
1154 | lua_getfield(L_, 1, "wake_period"); // L_: {} wake_period | ||
1155 | if (lua_isnil(L_, kIdxTop)) { | ||
1156 | lua_pop(L_, 1); | ||
1157 | lua_pushnumber(L_, _U->lindaWakePeriod.count()); | ||
1158 | } else if (luaW_type(L_, kIdxTop) == LuaType::STRING) { | ||
1159 | if (luaW_tostring(L_, kIdxTop) != "never") { | ||
1160 | luaL_argerror(L_, 1, "invalid wake_period"); | ||
1161 | } else { | ||
1162 | lua_pop(L_, 1); | ||
1163 | lua_pushnumber(L_, 0); | ||
1164 | } | ||
1165 | } | ||
1166 | else { | ||
1167 | luaL_argcheck(L_, luaL_optnumber(L_, 2, 0) > 0, 1, "wake_period must be > 0"); | ||
1168 | } | ||
1169 | |||
1170 | lua_getfield(L_, 1, "group"); // L_: {} wake_period group | ||
1171 | int const _nbKeepers{ _U->keepers.getNbKeepers() }; | ||
1172 | if (lua_isnil(L_, kIdxTop)) { | ||
1173 | luaL_argcheck(L_, _nbKeepers < 2, 0, "Group is mandatory in multiple Keeper scenarios"); | ||
1174 | } else { | ||
1175 | int const _group{ static_cast<int>(lua_tointeger(L_, kIdxTop)) }; | ||
1176 | luaL_argcheck(L_, _group >= 0 && _group < _nbKeepers, 1, "group out of range"); | ||
1177 | } | ||
1178 | |||
1137 | #if LUA_VERSION_NUM >= 504 // to-be-closed support starts with Lua 5.4 | 1179 | #if LUA_VERSION_NUM >= 504 // to-be-closed support starts with Lua 5.4 |
1138 | case LuaType::FUNCTION: | 1180 | lua_getfield(L_, 1, "close_handler"); // L_: {} wake_period group close_handler |
1139 | luaL_argcheck(L_, _closeHandlerIdx == 0, _i, "More than one __close handler"); | 1181 | LuaType const _handlerType{ luaW_type(L_, kIdxTop) }; |
1140 | _closeHandlerIdx = _i; | 1182 | if (_handlerType == LuaType::NIL) { |
1141 | break; | 1183 | lua_pop(L_, 1); // L_: {} wake_period group |
1142 | 1184 | } else if (_handlerType == LuaType::USERDATA || _handlerType == LuaType::TABLE) { | |
1143 | case LuaType::USERDATA: | 1185 | luaL_argcheck(L_, luaL_getmetafield(L_, kIdxTop, "__call") != 0, 1, "__close handler is not callable"); |
1144 | case LuaType::TABLE: | ||
1145 | luaL_argcheck(L_, _closeHandlerIdx == 0, _i, "More than one __close handler"); | ||
1146 | luaL_argcheck(L_, luaL_getmetafield(L_, _i, "__call") != 0, _i, "__close handler is not callable"); | ||
1147 | lua_pop(L_, 1); // luaL_getmetafield() pushed the field, we need to pop it | 1186 | lua_pop(L_, 1); // luaL_getmetafield() pushed the field, we need to pop it |
1148 | _closeHandlerIdx = _i; | 1187 | } else { |
1149 | break; | 1188 | luaL_argcheck(L_, _handlerType == LuaType::FUNCTION, 1, "__close handler is not a function"); |
1189 | } | ||
1150 | #endif // LUA_VERSION_NUM >= 504 | 1190 | #endif // LUA_VERSION_NUM >= 504 |
1151 | 1191 | ||
1152 | case LuaType::STRING: | 1192 | auto const _nameType{ luaW_getfield(L_, StackIndex{ 1 }, "name") }; // L_: {} wake_period group [close_handler] name |
1153 | luaL_argcheck(L_, _nameIdx == 0, _i, "More than one name"); | 1193 | luaL_argcheck(L_, _nameType == LuaType::NIL || _nameType == LuaType::STRING, 1, "name is not a string"); |
1154 | _nameIdx = _i; | 1194 | lua_replace(L_, 1); // L_: name wake_period group [close_handler] |
1155 | break; | ||
1156 | |||
1157 | case LuaType::NUMBER: | ||
1158 | luaL_argcheck(L_, _groupIdx == 0, _i, "More than one group"); | ||
1159 | _groupIdx = _i; | ||
1160 | break; | ||
1161 | |||
1162 | default: | ||
1163 | luaL_argcheck(L_, false, _i, "Bad argument type (should be a string, a number, or a callable type)"); | ||
1164 | } | ||
1165 | } | ||
1166 | |||
1167 | int const _nbKeepers{ Universe::Get(L_)->keepers.getNbKeepers() }; | ||
1168 | if (!_groupIdx) { | ||
1169 | luaL_argcheck(L_, _nbKeepers < 2, 0, "Group is mandatory in multiple Keeper scenarios"); | ||
1170 | } else { | ||
1171 | int const _group{ static_cast<int>(lua_tointeger(L_, _groupIdx)) }; | ||
1172 | luaL_argcheck(L_, _group >= 0 && _group < _nbKeepers, _groupIdx, "Group out of range"); | ||
1173 | } | 1195 | } |
1174 | 1196 | ||
1175 | // done with argument checking, let's proceed | 1197 | // done with argument checking, let's proceed |
1176 | if constexpr (LUA_VERSION_NUM >= 504) { | 1198 | if (lua_gettop(L_) == 4) { |
1177 | // make sure we have kMaxArgs arguments on the stack for processing, with name, group, and handler, in that order | ||
1178 | lua_settop(L_, kLastArg); // L_: a b c | ||
1179 | // If either index is 0, lua_settop() adjusted the stack with a nil in slot kLastArg | ||
1180 | lua_pushvalue(L_, _closeHandlerIdx ? _closeHandlerIdx : kLastArg); // L_: a b c close_handler | ||
1181 | lua_pushvalue(L_, _groupIdx ? _groupIdx : kLastArg); // L_: a b c close_handler group | ||
1182 | lua_pushvalue(L_, _nameIdx ? _nameIdx : kLastArg); // L_: a b c close_handler group name | ||
1183 | lua_replace(L_, 1); // L_: name b c close_handler group | ||
1184 | lua_replace(L_, 2); // L_: name group c close_handler | ||
1185 | lua_replace(L_, 3); // L_: name group close_handler | ||
1186 | |||
1187 | // if we have a __close handler, we need a uservalue slot to store it | 1199 | // if we have a __close handler, we need a uservalue slot to store it |
1188 | UserValueCount const _nuv{ _closeHandlerIdx ? 1 : 0 }; | 1200 | LindaFactory::Instance.pushDeepUserdata(DestState{ L_ }, UserValueCount{ 1 }); // L_: name wake_period group [close_handler] linda |
1189 | LindaFactory::Instance.pushDeepUserdata(DestState{ L_ }, _nuv); // L_: name group close_handler linda | 1201 | lua_replace(L_, 3); // L_: name wake_period linda close_handler |
1190 | if (_closeHandlerIdx != 0) { | 1202 | lua_setiuservalue(L_, StackIndex{ 3 }, UserValueIndex{ 1 }); // L_: name wake_period linda |
1191 | lua_replace(L_, 2); // L_: name linda close_handler | ||
1192 | lua_setiuservalue(L_, StackIndex{ 2 }, UserValueIndex{ 1 }); // L_: name linda | ||
1193 | } | ||
1194 | // depending on whether we have a handler or not, the stack is not in the same state at this point | 1203 | // depending on whether we have a handler or not, the stack is not in the same state at this point |
1195 | // just make sure we have our Linda at the top | 1204 | // just make sure we have our Linda at the top |
1196 | LUA_ASSERT(L_, ToLinda<true>(L_, kIdxTop)); | 1205 | LUA_ASSERT(L_, ToLinda<true>(L_, kIdxTop)); |
1197 | return 1; | 1206 | return 1; |
1198 | } else { // no to-be-closed support | 1207 | } else { // no to-be-closed support |
1199 | // ensure we have name, group in that order on the stack | 1208 | LindaFactory::Instance.pushDeepUserdata(DestState{ L_ }, UserValueCount{ 0 }); // L_: name wake_period group linda |
1200 | if (_nameIdx > _groupIdx) { | ||
1201 | lua_insert(L_, 1); // L_: name group | ||
1202 | } | ||
1203 | LindaFactory::Instance.pushDeepUserdata(DestState{ L_ }, UserValueCount{ 0 }); // L_: name group linda | ||
1204 | return 1; | 1209 | return 1; |
1205 | } | 1210 | } |
1206 | |||
1207 | } | 1211 | } |