diff options
Diffstat (limited to 'src/linda.cpp')
-rw-r--r-- | src/linda.cpp | 734 |
1 files changed, 369 insertions, 365 deletions
diff --git a/src/linda.cpp b/src/linda.cpp index 103f4ed..8005211 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -57,6 +57,8 @@ class LindaFactory : public DeepFactory | |||
57 | // I'm not totally happy with having a global variable. But since it's stateless, it will do for the time being. | 57 | // I'm not totally happy with having a global variable. But since it's stateless, it will do for the time being. |
58 | static LindaFactory g_LindaFactory; | 58 | static LindaFactory g_LindaFactory; |
59 | 59 | ||
60 | // ################################################################################################# | ||
61 | |||
60 | /* | 62 | /* |
61 | * Actual data is kept within a keeper state, which is hashed by the 'Linda' | 63 | * Actual data is kept within a keeper state, which is hashed by the 'Linda' |
62 | * pointer (which is same to all userdatas pointing to it). | 64 | * pointer (which is same to all userdatas pointing to it). |
@@ -110,7 +112,9 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header | |||
110 | } | 112 | } |
111 | } | 113 | } |
112 | 114 | ||
113 | private: | 115 | static int ProtectedCall(lua_State* L, lua_CFunction f_); |
116 | |||
117 | private : | ||
114 | 118 | ||
115 | void setName(char const* name_, size_t len_) | 119 | void setName(char const* name_, size_t len_) |
116 | { | 120 | { |
@@ -203,7 +207,8 @@ static void check_key_types(lua_State* L, int start_, int end_) | |||
203 | 207 | ||
204 | // ################################################################################################# | 208 | // ################################################################################################# |
205 | 209 | ||
206 | LUAG_FUNC(linda_protected_call) | 210 | // used to perform all linda operations that access keepers |
211 | int Linda::ProtectedCall(lua_State* L, lua_CFunction f_) | ||
207 | { | 212 | { |
208 | Linda* const linda{ ToLinda<false>(L, 1) }; | 213 | Linda* const linda{ ToLinda<false>(L, 1) }; |
209 | 214 | ||
@@ -215,8 +220,8 @@ LUAG_FUNC(linda_protected_call) | |||
215 | // if we didn't do anything wrong, the keeper stack should be clean | 220 | // if we didn't do anything wrong, the keeper stack should be clean |
216 | ASSERT_L(lua_gettop(KL) == 0); | 221 | ASSERT_L(lua_gettop(KL) == 0); |
217 | 222 | ||
218 | // retrieve the actual function to be called and move it before the arguments | 223 | // push the function to be called and move it before the arguments |
219 | lua_pushvalue(L, lua_upvalueindex(1)); | 224 | lua_pushcfunction(L, f_); |
220 | lua_insert(L, 1); | 225 | lua_insert(L, 1); |
221 | // do a protected call | 226 | // do a protected call |
222 | int const rc{ lua_pcall(L, lua_gettop(L) - 1, LUA_MULTRET, 0) }; | 227 | int const rc{ lua_pcall(L, lua_gettop(L) - 1, LUA_MULTRET, 0) }; |
@@ -248,62 +253,232 @@ LUAG_FUNC(linda_protected_call) | |||
248 | */ | 253 | */ |
249 | LUAG_FUNC(linda_send) | 254 | LUAG_FUNC(linda_send) |
250 | { | 255 | { |
251 | Linda* const linda{ ToLinda<false>(L, 1) }; | 256 | auto send = [](lua_State* L) |
252 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
253 | int key_i{ 2 }; // index of first key, if timeout not there | ||
254 | |||
255 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion | ||
256 | { | 257 | { |
257 | lua_Duration const duration{ lua_tonumber(L, 2) }; | 258 | Linda* const linda{ ToLinda<false>(L, 1) }; |
258 | if (duration.count() >= 0.0) | 259 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; |
260 | int key_i{ 2 }; // index of first key, if timeout not there | ||
261 | |||
262 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion | ||
263 | { | ||
264 | lua_Duration const duration{ lua_tonumber(L, 2) }; | ||
265 | if (duration.count() >= 0.0) | ||
266 | { | ||
267 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); | ||
268 | } | ||
269 | ++key_i; | ||
270 | } | ||
271 | else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key | ||
259 | { | 272 | { |
260 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); | 273 | ++key_i; |
261 | } | 274 | } |
262 | ++key_i; | ||
263 | } | ||
264 | else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key | ||
265 | { | ||
266 | ++key_i; | ||
267 | } | ||
268 | 275 | ||
269 | bool const as_nil_sentinel{ NIL_SENTINEL.equals(L, key_i) }; // if not nullptr, send() will silently send a single nil if nothing is provided | 276 | bool const as_nil_sentinel{ NIL_SENTINEL.equals(L, key_i) }; // if not nullptr, send() will silently send a single nil if nothing is provided |
270 | if (as_nil_sentinel) | 277 | if (as_nil_sentinel) |
271 | { | 278 | { |
272 | // the real key to send data to is after the NIL_SENTINEL marker | 279 | // the real key to send data to is after the NIL_SENTINEL marker |
273 | ++key_i; | 280 | ++key_i; |
274 | } | 281 | } |
282 | |||
283 | // make sure the key is of a valid type | ||
284 | check_key_types(L, key_i, key_i); | ||
285 | |||
286 | STACK_GROW(L, 1); | ||
287 | |||
288 | // make sure there is something to send | ||
289 | if (lua_gettop(L) == key_i) | ||
290 | { | ||
291 | if (as_nil_sentinel) | ||
292 | { | ||
293 | // send a single nil if nothing is provided | ||
294 | NIL_SENTINEL.pushKey(L); | ||
295 | } | ||
296 | else | ||
297 | { | ||
298 | return luaL_error(L, "no data to send"); | ||
299 | } | ||
300 | } | ||
301 | |||
302 | // convert nils to some special non-nil sentinel in sent values | ||
303 | keeper_toggle_nil_sentinels(L, key_i + 1, LookupMode::ToKeeper); | ||
304 | bool ret{ false }; | ||
305 | CancelRequest cancel{ CancelRequest::None }; | ||
306 | KeeperCallResult pushed; | ||
307 | { | ||
308 | Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; | ||
309 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | ||
310 | lua_State* const KL{ K ? K->L : nullptr }; | ||
311 | if (KL == nullptr) | ||
312 | return 0; | ||
313 | |||
314 | STACK_CHECK_START_REL(KL, 0); | ||
315 | for (bool try_again{ true };;) | ||
316 | { | ||
317 | if (lane != nullptr) | ||
318 | { | ||
319 | cancel = lane->cancel_request; | ||
320 | } | ||
321 | cancel = (cancel != CancelRequest::None) ? cancel : linda->simulate_cancel; | ||
322 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
323 | if (!try_again || cancel != CancelRequest::None) | ||
324 | { | ||
325 | pushed.emplace(0); | ||
326 | break; | ||
327 | } | ||
328 | |||
329 | STACK_CHECK(KL, 0); | ||
330 | pushed = keeper_call(linda->U, KL, KEEPER_API(send), L, linda, key_i); | ||
331 | if (!pushed.has_value()) | ||
332 | { | ||
333 | break; | ||
334 | } | ||
335 | ASSERT_L(pushed.value() == 1); | ||
336 | |||
337 | ret = lua_toboolean(L, -1) ? true : false; | ||
338 | lua_pop(L, 1); | ||
275 | 339 | ||
276 | // make sure the key is of a valid type | 340 | if (ret) |
277 | check_key_types(L, key_i, key_i); | 341 | { |
342 | // Wake up ALL waiting threads | ||
343 | linda->m_write_happened.notify_all(); | ||
344 | break; | ||
345 | } | ||
278 | 346 | ||
279 | STACK_GROW(L, 1); | 347 | // instant timout to bypass the wait syscall |
348 | if (std::chrono::steady_clock::now() >= until) | ||
349 | { | ||
350 | break; /* no wait; instant timeout */ | ||
351 | } | ||
280 | 352 | ||
281 | // make sure there is something to send | 353 | // storage limit hit, wait until timeout or signalled that we should try again |
282 | if (lua_gettop(L) == key_i) | 354 | { |
355 | Lane::Status prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
356 | if (lane != nullptr) | ||
357 | { | ||
358 | // change status of lane to "waiting" | ||
359 | prev_status = lane->m_status; // Running, most likely | ||
360 | ASSERT_L(prev_status == Lane::Running); // but check, just in case | ||
361 | lane->m_status = Lane::Waiting; | ||
362 | ASSERT_L(lane->m_waiting_on == nullptr); | ||
363 | lane->m_waiting_on = &linda->m_read_happened; | ||
364 | } | ||
365 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached | ||
366 | std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; | ||
367 | std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) }; | ||
368 | keeper_lock.release(); // we don't want to release the lock! | ||
369 | try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
370 | if (lane != nullptr) | ||
371 | { | ||
372 | lane->m_waiting_on = nullptr; | ||
373 | lane->m_status = prev_status; | ||
374 | } | ||
375 | } | ||
376 | } | ||
377 | STACK_CHECK(KL, 0); | ||
378 | } | ||
379 | |||
380 | if (!pushed.has_value()) | ||
381 | { | ||
382 | luaL_error(L, "tried to copy unsupported types"); // doesn't return | ||
383 | } | ||
384 | |||
385 | switch (cancel) | ||
386 | { | ||
387 | case CancelRequest::Soft: | ||
388 | // if user wants to soft-cancel, the call returns lanes.cancel_error | ||
389 | CANCEL_ERROR.pushKey(L); | ||
390 | return 1; | ||
391 | |||
392 | case CancelRequest::Hard: | ||
393 | // raise an error interrupting execution only in case of hard cancel | ||
394 | raise_cancel_error(L); // raises an error and doesn't return | ||
395 | |||
396 | default: | ||
397 | lua_pushboolean(L, ret); // true (success) or false (timeout) | ||
398 | return 1; | ||
399 | } | ||
400 | }; | ||
401 | return Linda::ProtectedCall(L, send); | ||
402 | } | ||
403 | |||
404 | // ################################################################################################# | ||
405 | |||
406 | /* | ||
407 | * 2 modes of operation | ||
408 | * [val, key]= linda_receive( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata [, ...] ) | ||
409 | * Consumes a single value from the Linda, in any key. | ||
410 | * Returns: received value (which is consumed from the slot), and the key which had it | ||
411 | |||
412 | * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) | ||
413 | * Consumes between min_COUNT and max_COUNT values from the linda, from a single key. | ||
414 | * returns the actual consumed values, or nil if there weren't enough values to consume | ||
415 | * | ||
416 | */ | ||
417 | LUAG_FUNC(linda_receive) | ||
418 | { | ||
419 | auto receive = [](lua_State* L) | ||
283 | { | 420 | { |
284 | if (as_nil_sentinel) | 421 | Linda* const linda{ ToLinda<false>(L, 1) }; |
422 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
423 | int key_i{ 2 }; // index of first key, if timeout not there | ||
424 | |||
425 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion | ||
426 | { | ||
427 | lua_Duration const duration{ lua_tonumber(L, 2) }; | ||
428 | if (duration.count() >= 0.0) | ||
429 | { | ||
430 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); | ||
431 | } | ||
432 | ++key_i; | ||
433 | } | ||
434 | else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key | ||
285 | { | 435 | { |
286 | // send a single nil if nothing is provided | 436 | ++key_i; |
287 | NIL_SENTINEL.pushKey(L); | 437 | } |
438 | |||
439 | keeper_api_t selected_keeper_receive{ nullptr }; | ||
440 | int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; | ||
441 | // are we in batched mode? | ||
442 | BATCH_SENTINEL.pushKey(L); | ||
443 | int const is_batched{ lua501_equal(L, key_i, -1) }; | ||
444 | lua_pop(L, 1); | ||
445 | if (is_batched) | ||
446 | { | ||
447 | // no need to pass linda.batched in the keeper state | ||
448 | ++key_i; | ||
449 | // make sure the keys are of a valid type | ||
450 | check_key_types(L, key_i, key_i); | ||
451 | // receive multiple values from a single slot | ||
452 | selected_keeper_receive = KEEPER_API(receive_batched); | ||
453 | // we expect a user-defined amount of return value | ||
454 | expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); | ||
455 | expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); | ||
456 | // don't forget to count the key in addition to the values | ||
457 | ++expected_pushed_min; | ||
458 | ++expected_pushed_max; | ||
459 | if (expected_pushed_min > expected_pushed_max) | ||
460 | { | ||
461 | return luaL_error(L, "batched min/max error"); | ||
462 | } | ||
288 | } | 463 | } |
289 | else | 464 | else |
290 | { | 465 | { |
291 | return luaL_error(L, "no data to send"); | 466 | // make sure the keys are of a valid type |
467 | check_key_types(L, key_i, lua_gettop(L)); | ||
468 | // receive a single value, checking multiple slots | ||
469 | selected_keeper_receive = KEEPER_API(receive); | ||
470 | // we expect a single (value, key) pair of returned values | ||
471 | expected_pushed_min = expected_pushed_max = 2; | ||
292 | } | 472 | } |
293 | } | ||
294 | 473 | ||
295 | // convert nils to some special non-nil sentinel in sent values | ||
296 | keeper_toggle_nil_sentinels(L, key_i + 1, LookupMode::ToKeeper); | ||
297 | bool ret{ false }; | ||
298 | CancelRequest cancel{ CancelRequest::None }; | ||
299 | KeeperCallResult pushed; | ||
300 | { | ||
301 | Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; | 474 | Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; |
302 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | 475 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; |
303 | lua_State* const KL{ K ? K->L : nullptr }; | 476 | lua_State* const KL{ K ? K->L : nullptr }; |
304 | if (KL == nullptr) | 477 | if (KL == nullptr) |
305 | return 0; | 478 | return 0; |
306 | 479 | ||
480 | CancelRequest cancel{ CancelRequest::None }; | ||
481 | KeeperCallResult pushed; | ||
307 | STACK_CHECK_START_REL(KL, 0); | 482 | STACK_CHECK_START_REL(KL, 0); |
308 | for (bool try_again{ true };;) | 483 | for (bool try_again{ true };;) |
309 | { | 484 | { |
@@ -319,31 +494,29 @@ LUAG_FUNC(linda_send) | |||
319 | break; | 494 | break; |
320 | } | 495 | } |
321 | 496 | ||
322 | STACK_CHECK(KL, 0); | 497 | // all arguments of receive() but the first are passed to the keeper's receive function |
323 | pushed = keeper_call(linda->U, KL, KEEPER_API(send), L, linda, key_i); | 498 | pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i); |
324 | if (!pushed.has_value()) | 499 | if (!pushed.has_value()) |
325 | { | 500 | { |
326 | break; | 501 | break; |
327 | } | 502 | } |
328 | ASSERT_L(pushed.value() == 1); | 503 | if (pushed.value() > 0) |
329 | |||
330 | ret = lua_toboolean(L, -1) ? true : false; | ||
331 | lua_pop(L, 1); | ||
332 | |||
333 | if (ret) | ||
334 | { | 504 | { |
335 | // Wake up ALL waiting threads | 505 | ASSERT_L(pushed.value() >= expected_pushed_min && pushed.value() <= expected_pushed_max); |
336 | linda->m_write_happened.notify_all(); | 506 | // replace sentinels with real nils |
507 | keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed.value(), LookupMode::FromKeeper); | ||
508 | // To be done from within the 'K' locking area | ||
509 | // | ||
510 | linda->m_read_happened.notify_all(); | ||
337 | break; | 511 | break; |
338 | } | 512 | } |
339 | 513 | ||
340 | // instant timout to bypass the wait syscall | ||
341 | if (std::chrono::steady_clock::now() >= until) | 514 | if (std::chrono::steady_clock::now() >= until) |
342 | { | 515 | { |
343 | break; /* no wait; instant timeout */ | 516 | break; /* instant timeout */ |
344 | } | 517 | } |
345 | 518 | ||
346 | // storage limit hit, wait until timeout or signalled that we should try again | 519 | // nothing received, wait until timeout or signalled that we should try again |
347 | { | 520 | { |
348 | Lane::Status prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | 521 | Lane::Status prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings |
349 | if (lane != nullptr) | 522 | if (lane != nullptr) |
@@ -353,11 +526,11 @@ LUAG_FUNC(linda_send) | |||
353 | ASSERT_L(prev_status == Lane::Running); // but check, just in case | 526 | ASSERT_L(prev_status == Lane::Running); // but check, just in case |
354 | lane->m_status = Lane::Waiting; | 527 | lane->m_status = Lane::Waiting; |
355 | ASSERT_L(lane->m_waiting_on == nullptr); | 528 | ASSERT_L(lane->m_waiting_on == nullptr); |
356 | lane->m_waiting_on = &linda->m_read_happened; | 529 | lane->m_waiting_on = &linda->m_write_happened; |
357 | } | 530 | } |
358 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached | 531 | // not enough data to read: wakeup when data was sent, or when timeout is reached |
359 | std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; | 532 | std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; |
360 | std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) }; | 533 | std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) }; |
361 | keeper_lock.release(); // we don't want to release the lock! | 534 | keeper_lock.release(); // we don't want to release the lock! |
362 | try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | 535 | try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups |
363 | if (lane != nullptr) | 536 | if (lane != nullptr) |
@@ -368,188 +541,28 @@ LUAG_FUNC(linda_send) | |||
368 | } | 541 | } |
369 | } | 542 | } |
370 | STACK_CHECK(KL, 0); | 543 | STACK_CHECK(KL, 0); |
371 | } | ||
372 | |||
373 | if (!pushed.has_value()) | ||
374 | { | ||
375 | luaL_error(L, "tried to copy unsupported types"); // doesn't return | ||
376 | } | ||
377 | |||
378 | switch (cancel) | ||
379 | { | ||
380 | case CancelRequest::Soft: | ||
381 | // if user wants to soft-cancel, the call returns lanes.cancel_error | ||
382 | CANCEL_ERROR.pushKey(L); | ||
383 | return 1; | ||
384 | |||
385 | case CancelRequest::Hard: | ||
386 | // raise an error interrupting execution only in case of hard cancel | ||
387 | raise_cancel_error(L); // raises an error and doesn't return | ||
388 | |||
389 | default: | ||
390 | lua_pushboolean(L, ret); // true (success) or false (timeout) | ||
391 | return 1; | ||
392 | } | ||
393 | } | ||
394 | |||
395 | // ################################################################################################# | ||
396 | |||
397 | /* | ||
398 | * 2 modes of operation | ||
399 | * [val, key]= linda_receive( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata [, ...] ) | ||
400 | * Consumes a single value from the Linda, in any key. | ||
401 | * Returns: received value (which is consumed from the slot), and the key which had it | ||
402 | |||
403 | * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) | ||
404 | * Consumes between min_COUNT and max_COUNT values from the linda, from a single key. | ||
405 | * returns the actual consumed values, or nil if there weren't enough values to consume | ||
406 | * | ||
407 | */ | ||
408 | LUAG_FUNC(linda_receive) | ||
409 | { | ||
410 | Linda* const linda{ ToLinda<false>(L, 1) }; | ||
411 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
412 | int key_i{ 2 }; // index of first key, if timeout not there | ||
413 | |||
414 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion | ||
415 | { | ||
416 | lua_Duration const duration{ lua_tonumber(L, 2) }; | ||
417 | if (duration.count() >= 0.0) | ||
418 | { | ||
419 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); | ||
420 | } | ||
421 | ++key_i; | ||
422 | } | ||
423 | else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key | ||
424 | { | ||
425 | ++key_i; | ||
426 | } | ||
427 | |||
428 | keeper_api_t selected_keeper_receive{ nullptr }; | ||
429 | int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; | ||
430 | // are we in batched mode? | ||
431 | BATCH_SENTINEL.pushKey(L); | ||
432 | int const is_batched{ lua501_equal(L, key_i, -1) }; | ||
433 | lua_pop(L, 1); | ||
434 | if (is_batched) | ||
435 | { | ||
436 | // no need to pass linda.batched in the keeper state | ||
437 | ++key_i; | ||
438 | // make sure the keys are of a valid type | ||
439 | check_key_types(L, key_i, key_i); | ||
440 | // receive multiple values from a single slot | ||
441 | selected_keeper_receive = KEEPER_API(receive_batched); | ||
442 | // we expect a user-defined amount of return value | ||
443 | expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); | ||
444 | expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); | ||
445 | // don't forget to count the key in addition to the values | ||
446 | ++expected_pushed_min; | ||
447 | ++expected_pushed_max; | ||
448 | if (expected_pushed_min > expected_pushed_max) | ||
449 | { | ||
450 | return luaL_error(L, "batched min/max error"); | ||
451 | } | ||
452 | } | ||
453 | else | ||
454 | { | ||
455 | // make sure the keys are of a valid type | ||
456 | check_key_types(L, key_i, lua_gettop(L)); | ||
457 | // receive a single value, checking multiple slots | ||
458 | selected_keeper_receive = KEEPER_API(receive); | ||
459 | // we expect a single (value, key) pair of returned values | ||
460 | expected_pushed_min = expected_pushed_max = 2; | ||
461 | } | ||
462 | |||
463 | Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; | ||
464 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | ||
465 | lua_State* const KL{ K ? K->L : nullptr }; | ||
466 | if (KL == nullptr) | ||
467 | return 0; | ||
468 | |||
469 | CancelRequest cancel{ CancelRequest::None }; | ||
470 | KeeperCallResult pushed; | ||
471 | STACK_CHECK_START_REL(KL, 0); | ||
472 | for (bool try_again{ true };;) | ||
473 | { | ||
474 | if (lane != nullptr) | ||
475 | { | ||
476 | cancel = lane->cancel_request; | ||
477 | } | ||
478 | cancel = (cancel != CancelRequest::None) ? cancel : linda->simulate_cancel; | ||
479 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | ||
480 | if (!try_again || cancel != CancelRequest::None) | ||
481 | { | ||
482 | pushed.emplace(0); | ||
483 | break; | ||
484 | } | ||
485 | 544 | ||
486 | // all arguments of receive() but the first are passed to the keeper's receive function | ||
487 | pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i); | ||
488 | if (!pushed.has_value()) | 545 | if (!pushed.has_value()) |
489 | { | 546 | { |
490 | break; | 547 | return luaL_error(L, "tried to copy unsupported types"); |
491 | } | ||
492 | if (pushed.value() > 0) | ||
493 | { | ||
494 | ASSERT_L(pushed.value() >= expected_pushed_min && pushed.value() <= expected_pushed_max); | ||
495 | // replace sentinels with real nils | ||
496 | keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed.value(), LookupMode::FromKeeper); | ||
497 | // To be done from within the 'K' locking area | ||
498 | // | ||
499 | linda->m_read_happened.notify_all(); | ||
500 | break; | ||
501 | } | 548 | } |
502 | 549 | ||
503 | if (std::chrono::steady_clock::now() >= until) | 550 | switch (cancel) |
504 | { | 551 | { |
505 | break; /* instant timeout */ | 552 | case CancelRequest::Soft: |
506 | } | 553 | // if user wants to soft-cancel, the call returns CANCEL_ERROR |
554 | CANCEL_ERROR.pushKey(L); | ||
555 | return 1; | ||
507 | 556 | ||
508 | // nothing received, wait until timeout or signalled that we should try again | 557 | case CancelRequest::Hard: |
509 | { | 558 | // raise an error interrupting execution only in case of hard cancel |
510 | Lane::Status prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | 559 | raise_cancel_error(L); // raises an error and doesn't return |
511 | if (lane != nullptr) | ||
512 | { | ||
513 | // change status of lane to "waiting" | ||
514 | prev_status = lane->m_status; // Running, most likely | ||
515 | ASSERT_L(prev_status == Lane::Running); // but check, just in case | ||
516 | lane->m_status = Lane::Waiting; | ||
517 | ASSERT_L(lane->m_waiting_on == nullptr); | ||
518 | lane->m_waiting_on = &linda->m_write_happened; | ||
519 | } | ||
520 | // not enough data to read: wakeup when data was sent, or when timeout is reached | ||
521 | std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; | ||
522 | std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) }; | ||
523 | keeper_lock.release(); // we don't want to release the lock! | ||
524 | try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
525 | if (lane != nullptr) | ||
526 | { | ||
527 | lane->m_waiting_on = nullptr; | ||
528 | lane->m_status = prev_status; | ||
529 | } | ||
530 | } | ||
531 | } | ||
532 | STACK_CHECK(KL, 0); | ||
533 | 560 | ||
534 | if (!pushed.has_value()) | 561 | default: |
535 | { | 562 | return pushed.value(); |
536 | return luaL_error(L, "tried to copy unsupported types"); | 563 | } |
537 | } | 564 | }; |
538 | 565 | return Linda::ProtectedCall(L, receive); | |
539 | switch (cancel) | ||
540 | { | ||
541 | case CancelRequest::Soft: | ||
542 | // if user wants to soft-cancel, the call returns CANCEL_ERROR | ||
543 | CANCEL_ERROR.pushKey(L); | ||
544 | return 1; | ||
545 | |||
546 | case CancelRequest::Hard: | ||
547 | // raise an error interrupting execution only in case of hard cancel | ||
548 | raise_cancel_error(L); // raises an error and doesn't return | ||
549 | |||
550 | default: | ||
551 | return pushed.value(); | ||
552 | } | ||
553 | } | 566 | } |
554 | 567 | ||
555 | // ################################################################################################# | 568 | // ################################################################################################# |
@@ -564,47 +577,51 @@ LUAG_FUNC(linda_receive) | |||
564 | */ | 577 | */ |
565 | LUAG_FUNC(linda_set) | 578 | LUAG_FUNC(linda_set) |
566 | { | 579 | { |
567 | Linda* const linda{ ToLinda<false>(L, 1) }; | 580 | auto set = [](lua_State* L) |
568 | bool const has_value{ lua_gettop(L) > 2 }; | ||
569 | // make sure the key is of a valid type (throws an error if not the case) | ||
570 | check_key_types(L, 2, 2); | ||
571 | |||
572 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | ||
573 | KeeperCallResult pushed; | ||
574 | if (linda->simulate_cancel == CancelRequest::None) | ||
575 | { | 581 | { |
576 | if (has_value) | 582 | Linda* const linda{ ToLinda<false>(L, 1) }; |
577 | { | 583 | bool const has_value{ lua_gettop(L) > 2 }; |
578 | // convert nils to some special non-nil sentinel in sent values | 584 | // make sure the key is of a valid type (throws an error if not the case) |
579 | keeper_toggle_nil_sentinels(L, 3, LookupMode::ToKeeper); | 585 | check_key_types(L, 2, 2); |
580 | } | ||
581 | pushed = keeper_call(linda->U, K->L, KEEPER_API(set), L, linda, 2); | ||
582 | if (pushed.has_value()) // no error? | ||
583 | { | ||
584 | ASSERT_L(pushed.value() == 0 || pushed.value() == 1); | ||
585 | 586 | ||
587 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | ||
588 | KeeperCallResult pushed; | ||
589 | if (linda->simulate_cancel == CancelRequest::None) | ||
590 | { | ||
586 | if (has_value) | 591 | if (has_value) |
587 | { | 592 | { |
588 | // we put some data in the slot, tell readers that they should wake | 593 | // convert nils to some special non-nil sentinel in sent values |
589 | linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area | 594 | keeper_toggle_nil_sentinels(L, 3, LookupMode::ToKeeper); |
590 | } | 595 | } |
591 | if (pushed.value() == 1) | 596 | pushed = keeper_call(linda->U, K->L, KEEPER_API(set), L, linda, 2); |
597 | if (pushed.has_value()) // no error? | ||
592 | { | 598 | { |
593 | // the key was full, but it is no longer the case, tell writers they should wake | 599 | ASSERT_L(pushed.value() == 0 || pushed.value() == 1); |
594 | ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); | 600 | |
595 | linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area | 601 | if (has_value) |
602 | { | ||
603 | // we put some data in the slot, tell readers that they should wake | ||
604 | linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area | ||
605 | } | ||
606 | if (pushed.value() == 1) | ||
607 | { | ||
608 | // the key was full, but it is no longer the case, tell writers they should wake | ||
609 | ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); | ||
610 | linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area | ||
611 | } | ||
596 | } | 612 | } |
597 | } | 613 | } |
598 | } | 614 | else // linda is cancelled |
599 | else // linda is cancelled | 615 | { |
600 | { | 616 | // do nothing and return lanes.cancel_error |
601 | // do nothing and return lanes.cancel_error | 617 | CANCEL_ERROR.pushKey(L); |
602 | CANCEL_ERROR.pushKey(L); | 618 | pushed.emplace(1); |
603 | pushed.emplace(1); | 619 | } |
604 | } | ||
605 | 620 | ||
606 | // must trigger any error after keeper state has been released | 621 | // must trigger any error after keeper state has been released |
607 | return OptionalValue(pushed, L, "tried to copy unsupported types"); | 622 | return OptionalValue(pushed, L, "tried to copy unsupported types"); |
623 | }; | ||
624 | return Linda::ProtectedCall(L, set); | ||
608 | } | 625 | } |
609 | 626 | ||
610 | // ################################################################################################# | 627 | // ################################################################################################# |
@@ -616,13 +633,17 @@ LUAG_FUNC(linda_set) | |||
616 | */ | 633 | */ |
617 | LUAG_FUNC(linda_count) | 634 | LUAG_FUNC(linda_count) |
618 | { | 635 | { |
619 | Linda* const linda{ ToLinda<false>(L, 1) }; | 636 | auto count = [](lua_State* L) |
620 | // make sure the keys are of a valid type | 637 | { |
621 | check_key_types(L, 2, lua_gettop(L)); | 638 | Linda* const linda{ ToLinda<false>(L, 1) }; |
639 | // make sure the keys are of a valid type | ||
640 | check_key_types(L, 2, lua_gettop(L)); | ||
622 | 641 | ||
623 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | 642 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; |
624 | KeeperCallResult const pushed{ keeper_call(linda->U, K->L, KEEPER_API(count), L, linda, 2) }; | 643 | KeeperCallResult const pushed{ keeper_call(linda->U, K->L, KEEPER_API(count), L, linda, 2) }; |
625 | return OptionalValue(pushed, L, "tried to count an invalid key"); | 644 | return OptionalValue(pushed, L, "tried to count an invalid key"); |
645 | }; | ||
646 | return Linda::ProtectedCall(L, count); | ||
626 | } | 647 | } |
627 | 648 | ||
628 | // ################################################################################################# | 649 | // ################################################################################################# |
@@ -634,31 +655,35 @@ LUAG_FUNC(linda_count) | |||
634 | */ | 655 | */ |
635 | LUAG_FUNC(linda_get) | 656 | LUAG_FUNC(linda_get) |
636 | { | 657 | { |
637 | Linda* const linda{ ToLinda<false>(L, 1) }; | 658 | auto get = [](lua_State* L) |
638 | lua_Integer const count{ luaL_optinteger(L, 3, 1) }; | ||
639 | luaL_argcheck(L, count >= 1, 3, "count should be >= 1"); | ||
640 | luaL_argcheck(L, lua_gettop(L) <= 3, 4, "too many arguments"); | ||
641 | // make sure the key is of a valid type (throws an error if not the case) | ||
642 | check_key_types(L, 2, 2); | ||
643 | |||
644 | KeeperCallResult pushed; | ||
645 | if (linda->simulate_cancel == CancelRequest::None) | ||
646 | { | 659 | { |
647 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | 660 | Linda* const linda{ ToLinda<false>(L, 1) }; |
648 | pushed = keeper_call(linda->U, K->L, KEEPER_API(get), L, linda, 2); | 661 | lua_Integer const count{ luaL_optinteger(L, 3, 1) }; |
649 | if (pushed.value_or(0) > 0) | 662 | luaL_argcheck(L, count >= 1, 3, "count should be >= 1"); |
663 | luaL_argcheck(L, lua_gettop(L) <= 3, 4, "too many arguments"); | ||
664 | // make sure the key is of a valid type (throws an error if not the case) | ||
665 | check_key_types(L, 2, 2); | ||
666 | |||
667 | KeeperCallResult pushed; | ||
668 | if (linda->simulate_cancel == CancelRequest::None) | ||
650 | { | 669 | { |
651 | keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed.value(), LookupMode::FromKeeper); | 670 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; |
671 | pushed = keeper_call(linda->U, K->L, KEEPER_API(get), L, linda, 2); | ||
672 | if (pushed.value_or(0) > 0) | ||
673 | { | ||
674 | keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed.value(), LookupMode::FromKeeper); | ||
675 | } | ||
652 | } | 676 | } |
653 | } | 677 | else // linda is cancelled |
654 | else // linda is cancelled | 678 | { |
655 | { | 679 | // do nothing and return lanes.cancel_error |
656 | // do nothing and return lanes.cancel_error | 680 | CANCEL_ERROR.pushKey(L); |
657 | CANCEL_ERROR.pushKey(L); | 681 | pushed.emplace(1); |
658 | pushed.emplace(1); | 682 | } |
659 | } | 683 | // an error can be raised if we attempt to read an unregistered function |
660 | // an error can be raised if we attempt to read an unregistered function | 684 | return OptionalValue(pushed, L, "tried to copy unsupported types"); |
661 | return OptionalValue(pushed, L, "tried to copy unsupported types"); | 685 | }; |
686 | return Linda::ProtectedCall(L, get); | ||
662 | } | 687 | } |
663 | 688 | ||
664 | // ################################################################################################# | 689 | // ################################################################################################# |
@@ -671,34 +696,38 @@ LUAG_FUNC(linda_get) | |||
671 | */ | 696 | */ |
672 | LUAG_FUNC(linda_limit) | 697 | LUAG_FUNC(linda_limit) |
673 | { | 698 | { |
674 | Linda* const linda{ ToLinda<false>(L, 1) }; | 699 | auto limit = [](lua_State* L) |
675 | // make sure we got 3 arguments: the linda, a key and a limit | ||
676 | luaL_argcheck( L, lua_gettop( L) == 3, 2, "wrong number of arguments"); | ||
677 | // make sure we got a numeric limit | ||
678 | luaL_checknumber( L, 3); | ||
679 | // make sure the key is of a valid type | ||
680 | check_key_types( L, 2, 2); | ||
681 | |||
682 | KeeperCallResult pushed; | ||
683 | if (linda->simulate_cancel == CancelRequest::None) | ||
684 | { | 700 | { |
685 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | 701 | Linda* const linda{ ToLinda<false>(L, 1) }; |
686 | pushed = keeper_call(linda->U, K->L, KEEPER_API(limit), L, linda, 2); | 702 | // make sure we got 3 arguments: the linda, a key and a limit |
687 | ASSERT_L( pushed.has_value() && (pushed.value() == 0 || pushed.value() == 1)); // no error, optional boolean value saying if we should wake blocked writer threads | 703 | luaL_argcheck( L, lua_gettop( L) == 3, 2, "wrong number of arguments"); |
688 | if (pushed.value() == 1) | 704 | // make sure we got a numeric limit |
705 | luaL_checknumber( L, 3); | ||
706 | // make sure the key is of a valid type | ||
707 | check_key_types( L, 2, 2); | ||
708 | |||
709 | KeeperCallResult pushed; | ||
710 | if (linda->simulate_cancel == CancelRequest::None) | ||
689 | { | 711 | { |
690 | ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); | 712 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; |
691 | linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area | 713 | pushed = keeper_call(linda->U, K->L, KEEPER_API(limit), L, linda, 2); |
714 | ASSERT_L( pushed.has_value() && (pushed.value() == 0 || pushed.value() == 1)); // no error, optional boolean value saying if we should wake blocked writer threads | ||
715 | if (pushed.value() == 1) | ||
716 | { | ||
717 | ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); | ||
718 | linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area | ||
719 | } | ||
692 | } | 720 | } |
693 | } | 721 | else // linda is cancelled |
694 | else // linda is cancelled | 722 | { |
695 | { | 723 | // do nothing and return lanes.cancel_error |
696 | // do nothing and return lanes.cancel_error | 724 | CANCEL_ERROR.pushKey(L); |
697 | CANCEL_ERROR.pushKey(L); | 725 | pushed.emplace(1); |
698 | pushed.emplace(1); | 726 | } |
699 | } | 727 | // propagate pushed boolean if any |
700 | // propagate pushed boolean if any | 728 | return pushed.value(); |
701 | return pushed.value(); | 729 | }; |
730 | return Linda::ProtectedCall(L, limit); | ||
702 | } | 731 | } |
703 | 732 | ||
704 | // ################################################################################################# | 733 | // ################################################################################################# |
@@ -831,8 +860,12 @@ LUAG_FUNC(linda_concat) | |||
831 | */ | 860 | */ |
832 | LUAG_FUNC(linda_dump) | 861 | LUAG_FUNC(linda_dump) |
833 | { | 862 | { |
834 | Linda* const linda{ ToLinda<false>(L, 1) }; | 863 | auto dump = [](lua_State* L) |
835 | return keeper_push_linda_storage(linda->U, Dest{ L }, linda, linda->hashSeed()); | 864 | { |
865 | Linda* const linda{ ToLinda<false>(L, 1) }; | ||
866 | return keeper_push_linda_storage(linda->U, Dest{ L }, linda, linda->hashSeed()); | ||
867 | }; | ||
868 | return Linda::ProtectedCall(L, dump); | ||
836 | } | 869 | } |
837 | 870 | ||
838 | // ################################################################################################# | 871 | // ################################################################################################# |
@@ -922,6 +955,23 @@ void LindaFactory::deleteDeepObjectInternal(lua_State* L, DeepPrelude* o_) const | |||
922 | 955 | ||
923 | // ################################################################################################# | 956 | // ################################################################################################# |
924 | 957 | ||
958 | static luaL_Reg const s_LindaMT[] = | ||
959 | { | ||
960 | { "__concat", LG_linda_concat }, | ||
961 | { "__tostring", LG_linda_tostring }, | ||
962 | { "__towatch", LG_linda_towatch }, // Decoda __towatch support | ||
963 | { "cancel", LG_linda_cancel }, | ||
964 | { "count", LG_linda_count }, | ||
965 | { "deep", LG_linda_deep }, | ||
966 | { "dump", LG_linda_dump }, | ||
967 | { "get", LG_linda_get }, | ||
968 | { "limit", LG_linda_limit }, | ||
969 | { "receive", LG_linda_receive }, | ||
970 | { "send", LG_linda_send }, | ||
971 | { "set", LG_linda_set }, | ||
972 | { nullptr, nullptr } | ||
973 | }; | ||
974 | |||
925 | void LindaFactory::createMetatable(lua_State* L) const | 975 | void LindaFactory::createMetatable(lua_State* L) const |
926 | { | 976 | { |
927 | STACK_CHECK_START_REL(L, 0); | 977 | STACK_CHECK_START_REL(L, 0); |
@@ -934,54 +984,8 @@ void LindaFactory::createMetatable(lua_State* L) const | |||
934 | lua_pushliteral(L, "Linda"); | 984 | lua_pushliteral(L, "Linda"); |
935 | lua_setfield(L, -2, "__metatable"); | 985 | lua_setfield(L, -2, "__metatable"); |
936 | 986 | ||
937 | lua_pushcfunction(L, LG_linda_tostring); | 987 | // the linda functions |
938 | lua_setfield(L, -2, "__tostring"); | 988 | luaL_setfuncs(L, s_LindaMT, 0); |
939 | |||
940 | // Decoda __towatch support | ||
941 | lua_pushcfunction(L, LG_linda_towatch); | ||
942 | lua_setfield(L, -2, "__towatch"); | ||
943 | |||
944 | lua_pushcfunction(L, LG_linda_concat); | ||
945 | lua_setfield(L, -2, "__concat"); | ||
946 | |||
947 | // protected calls, to ensure associated keeper is always released even in case of error | ||
948 | // all function are the protected call wrapper, where the actual operation is provided as upvalue | ||
949 | // note that this kind of thing can break function lookup as we use the function pointer here and there | ||
950 | // TODO: change that and use different functions! | ||
951 | |||
952 | lua_pushcfunction(L, LG_linda_send); | ||
953 | lua_pushcclosure(L, LG_linda_protected_call, 1); | ||
954 | lua_setfield(L, -2, "send"); | ||
955 | |||
956 | lua_pushcfunction(L, LG_linda_receive); | ||
957 | lua_pushcclosure(L, LG_linda_protected_call, 1); | ||
958 | lua_setfield(L, -2, "receive"); | ||
959 | |||
960 | lua_pushcfunction(L, LG_linda_limit); | ||
961 | lua_pushcclosure(L, LG_linda_protected_call, 1); | ||
962 | lua_setfield(L, -2, "limit"); | ||
963 | |||
964 | lua_pushcfunction(L, LG_linda_set); | ||
965 | lua_pushcclosure(L, LG_linda_protected_call, 1); | ||
966 | lua_setfield(L, -2, "set"); | ||
967 | |||
968 | lua_pushcfunction(L, LG_linda_count); | ||
969 | lua_pushcclosure(L, LG_linda_protected_call, 1); | ||
970 | lua_setfield(L, -2, "count"); | ||
971 | |||
972 | lua_pushcfunction(L, LG_linda_get); | ||
973 | lua_pushcclosure(L, LG_linda_protected_call, 1); | ||
974 | lua_setfield(L, -2, "get"); | ||
975 | |||
976 | lua_pushcfunction(L, LG_linda_cancel); | ||
977 | lua_setfield(L, -2, "cancel"); | ||
978 | |||
979 | lua_pushcfunction(L, LG_linda_deep); | ||
980 | lua_setfield(L, -2, "deep"); | ||
981 | |||
982 | lua_pushcfunction(L, LG_linda_dump); | ||
983 | lua_pushcclosure(L, LG_linda_protected_call, 1); | ||
984 | lua_setfield(L, -2, "dump"); | ||
985 | 989 | ||
986 | // some constants | 990 | // some constants |
987 | BATCH_SENTINEL.pushKey(L); | 991 | BATCH_SENTINEL.pushKey(L); |