diff options
Diffstat (limited to '')
-rw-r--r-- | src/linda.cpp | 92 |
1 files changed, 52 insertions, 40 deletions
diff --git a/src/linda.cpp b/src/linda.cpp index 5ee4768..ea1410e 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -61,8 +61,8 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header | |||
61 | 61 | ||
62 | public: | 62 | public: |
63 | 63 | ||
64 | SIGNAL_T read_happened; | 64 | std::condition_variable m_read_happened; |
65 | SIGNAL_T write_happened; | 65 | std::condition_variable m_write_happened; |
66 | Universe* const U; // the universe this linda belongs to | 66 | Universe* const U; // the universe this linda belongs to |
67 | uintptr_t const group; // a group to control keeper allocation between lindas | 67 | uintptr_t const group; // a group to control keeper allocation between lindas |
68 | CancelRequest simulate_cancel{ CancelRequest::None }; | 68 | CancelRequest simulate_cancel{ CancelRequest::None }; |
@@ -81,17 +81,11 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header | |||
81 | : U{ U_ } | 81 | : U{ U_ } |
82 | , group{ group_ << KEEPER_MAGIC_SHIFT } | 82 | , group{ group_ << KEEPER_MAGIC_SHIFT } |
83 | { | 83 | { |
84 | SIGNAL_INIT(&read_happened); | ||
85 | SIGNAL_INIT(&write_happened); | ||
86 | |||
87 | setName(name_, len_); | 84 | setName(name_, len_); |
88 | } | 85 | } |
89 | 86 | ||
90 | ~Linda() | 87 | ~Linda() |
91 | { | 88 | { |
92 | // There aren't any lanes waiting on these lindas, since all proxies have been gc'ed. Right? | ||
93 | SIGNAL_FREE(&read_happened); | ||
94 | SIGNAL_FREE(&write_happened); | ||
95 | if (std::holds_alternative<AllocatedName>(m_name)) | 89 | if (std::holds_alternative<AllocatedName>(m_name)) |
96 | { | 90 | { |
97 | AllocatedName& name = std::get<AllocatedName>(m_name); | 91 | AllocatedName& name = std::get<AllocatedName>(m_name); |
@@ -216,15 +210,19 @@ LUAG_FUNC(linda_protected_call) | |||
216 | LUAG_FUNC(linda_send) | 210 | LUAG_FUNC(linda_send) |
217 | { | 211 | { |
218 | Linda* const linda{ lua_toLinda<false>(L, 1) }; | 212 | Linda* const linda{ lua_toLinda<false>(L, 1) }; |
219 | time_d timeout{ -1.0 }; | 213 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; |
220 | int key_i{ 2 }; // index of first key, if timeout not there | 214 | int key_i{ 2 }; // index of first key, if timeout not there |
221 | 215 | ||
222 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion | 216 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion |
223 | { | 217 | { |
224 | timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); | 218 | lua_Duration const duration{ lua_tonumber(L, 2) }; |
219 | if (duration.count() >= 0.0) | ||
220 | { | ||
221 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); | ||
222 | } | ||
225 | ++key_i; | 223 | ++key_i; |
226 | } | 224 | } |
227 | else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key | 225 | else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key |
228 | { | 226 | { |
229 | ++key_i; | 227 | ++key_i; |
230 | } | 228 | } |
@@ -266,6 +264,7 @@ LUAG_FUNC(linda_send) | |||
266 | lua_State* const KL{ K ? K->L : nullptr }; | 264 | lua_State* const KL{ K ? K->L : nullptr }; |
267 | if (KL == nullptr) | 265 | if (KL == nullptr) |
268 | return 0; | 266 | return 0; |
267 | |||
269 | STACK_CHECK_START_REL(KL, 0); | 268 | STACK_CHECK_START_REL(KL, 0); |
270 | for (bool try_again{ true };;) | 269 | for (bool try_again{ true };;) |
271 | { | 270 | { |
@@ -295,12 +294,12 @@ LUAG_FUNC(linda_send) | |||
295 | if (ret) | 294 | if (ret) |
296 | { | 295 | { |
297 | // Wake up ALL waiting threads | 296 | // Wake up ALL waiting threads |
298 | SIGNAL_ALL(&linda->write_happened); | 297 | linda->m_write_happened.notify_all(); |
299 | break; | 298 | break; |
300 | } | 299 | } |
301 | 300 | ||
302 | // instant timout to bypass the wait syscall | 301 | // instant timout to bypass the wait syscall |
303 | if (timeout == 0.0) | 302 | if (std::chrono::steady_clock::now() >= until) |
304 | { | 303 | { |
305 | break; /* no wait; instant timeout */ | 304 | break; /* no wait; instant timeout */ |
306 | } | 305 | } |
@@ -314,14 +313,17 @@ LUAG_FUNC(linda_send) | |||
314 | prev_status = lane->status; // RUNNING, most likely | 313 | prev_status = lane->status; // RUNNING, most likely |
315 | ASSERT_L(prev_status == RUNNING); // but check, just in case | 314 | ASSERT_L(prev_status == RUNNING); // but check, just in case |
316 | lane->status = WAITING; | 315 | lane->status = WAITING; |
317 | ASSERT_L(lane->waiting_on == nullptr); | 316 | ASSERT_L(lane->m_waiting_on == nullptr); |
318 | lane->waiting_on = &linda->read_happened; | 317 | lane->m_waiting_on = &linda->m_read_happened; |
319 | } | 318 | } |
320 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached | 319 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached |
321 | try_again = SIGNAL_WAIT(&linda->read_happened, &K->keeper_cs, timeout); | 320 | std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; |
321 | std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) }; | ||
322 | keeper_lock.release(); // we don't want to release the lock! | ||
323 | try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
322 | if (lane != nullptr) | 324 | if (lane != nullptr) |
323 | { | 325 | { |
324 | lane->waiting_on = nullptr; | 326 | lane->m_waiting_on = nullptr; |
325 | lane->status = prev_status; | 327 | lane->status = prev_status; |
326 | } | 328 | } |
327 | } | 329 | } |
@@ -369,21 +371,24 @@ static constexpr UniqueKey BATCH_SENTINEL{ 0x2DDFEE0968C62AA7ull }; | |||
369 | LUAG_FUNC(linda_receive) | 371 | LUAG_FUNC(linda_receive) |
370 | { | 372 | { |
371 | Linda* const linda{ lua_toLinda<false>(L, 1) }; | 373 | Linda* const linda{ lua_toLinda<false>(L, 1) }; |
372 | 374 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | |
373 | time_d timeout{ -1.0 }; | 375 | int key_i{ 2 }; // index of first key, if timeout not there |
374 | int key_i{ 2 }; | ||
375 | 376 | ||
376 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion | 377 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion |
377 | { | 378 | { |
378 | timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); | 379 | lua_Duration const duration{ lua_tonumber(L, 2) }; |
380 | if (duration.count() >= 0.0) | ||
381 | { | ||
382 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); | ||
383 | } | ||
379 | ++key_i; | 384 | ++key_i; |
380 | } | 385 | } |
381 | else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key | 386 | else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key |
382 | { | 387 | { |
383 | ++key_i; | 388 | ++key_i; |
384 | } | 389 | } |
385 | 390 | ||
386 | keeper_api_t keeper_receive; | 391 | keeper_api_t selected_keeper_receive{ nullptr }; |
387 | int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; | 392 | int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; |
388 | // are we in batched mode? | 393 | // are we in batched mode? |
389 | BATCH_SENTINEL.pushKey(L); | 394 | BATCH_SENTINEL.pushKey(L); |
@@ -396,7 +401,7 @@ LUAG_FUNC(linda_receive) | |||
396 | // make sure the keys are of a valid type | 401 | // make sure the keys are of a valid type |
397 | check_key_types(L, key_i, key_i); | 402 | check_key_types(L, key_i, key_i); |
398 | // receive multiple values from a single slot | 403 | // receive multiple values from a single slot |
399 | keeper_receive = KEEPER_API(receive_batched); | 404 | selected_keeper_receive = KEEPER_API(receive_batched); |
400 | // we expect a user-defined amount of return value | 405 | // we expect a user-defined amount of return value |
401 | expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); | 406 | expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); |
402 | expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); | 407 | expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); |
@@ -413,17 +418,20 @@ LUAG_FUNC(linda_receive) | |||
413 | // make sure the keys are of a valid type | 418 | // make sure the keys are of a valid type |
414 | check_key_types(L, key_i, lua_gettop(L)); | 419 | check_key_types(L, key_i, lua_gettop(L)); |
415 | // receive a single value, checking multiple slots | 420 | // receive a single value, checking multiple slots |
416 | keeper_receive = KEEPER_API(receive); | 421 | selected_keeper_receive = KEEPER_API(receive); |
417 | // we expect a single (value, key) pair of returned values | 422 | // we expect a single (value, key) pair of returned values |
418 | expected_pushed_min = expected_pushed_max = 2; | 423 | expected_pushed_min = expected_pushed_max = 2; |
419 | } | 424 | } |
420 | 425 | ||
421 | Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; | 426 | Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; |
422 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | 427 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; |
423 | if (K == nullptr) | 428 | lua_State* const KL{ K ? K->L : nullptr }; |
429 | if (KL == nullptr) | ||
424 | return 0; | 430 | return 0; |
431 | |||
425 | CancelRequest cancel{ CancelRequest::None }; | 432 | CancelRequest cancel{ CancelRequest::None }; |
426 | int pushed{ 0 }; | 433 | int pushed{ 0 }; |
434 | STACK_CHECK_START_REL(KL, 0); | ||
427 | for (bool try_again{ true };;) | 435 | for (bool try_again{ true };;) |
428 | { | 436 | { |
429 | if (lane != nullptr) | 437 | if (lane != nullptr) |
@@ -439,7 +447,7 @@ LUAG_FUNC(linda_receive) | |||
439 | } | 447 | } |
440 | 448 | ||
441 | // all arguments of receive() but the first are passed to the keeper's receive function | 449 | // all arguments of receive() but the first are passed to the keeper's receive function |
442 | pushed = keeper_call(linda->U, K->L, keeper_receive, L, linda, key_i); | 450 | pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i); |
443 | if (pushed < 0) | 451 | if (pushed < 0) |
444 | { | 452 | { |
445 | break; | 453 | break; |
@@ -451,11 +459,11 @@ LUAG_FUNC(linda_receive) | |||
451 | keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper); | 459 | keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper); |
452 | // To be done from within the 'K' locking area | 460 | // To be done from within the 'K' locking area |
453 | // | 461 | // |
454 | SIGNAL_ALL(&linda->read_happened); | 462 | linda->m_read_happened.notify_all(); |
455 | break; | 463 | break; |
456 | } | 464 | } |
457 | 465 | ||
458 | if (timeout == 0.0) | 466 | if (std::chrono::steady_clock::now() >= until) |
459 | { | 467 | { |
460 | break; /* instant timeout */ | 468 | break; /* instant timeout */ |
461 | } | 469 | } |
@@ -469,18 +477,22 @@ LUAG_FUNC(linda_receive) | |||
469 | prev_status = lane->status; // RUNNING, most likely | 477 | prev_status = lane->status; // RUNNING, most likely |
470 | ASSERT_L(prev_status == RUNNING); // but check, just in case | 478 | ASSERT_L(prev_status == RUNNING); // but check, just in case |
471 | lane->status = WAITING; | 479 | lane->status = WAITING; |
472 | ASSERT_L(lane->waiting_on == nullptr); | 480 | ASSERT_L(lane->m_waiting_on == nullptr); |
473 | lane->waiting_on = &linda->write_happened; | 481 | lane->m_waiting_on = &linda->m_write_happened; |
474 | } | 482 | } |
475 | // not enough data to read: wakeup when data was sent, or when timeout is reached | 483 | // not enough data to read: wakeup when data was sent, or when timeout is reached |
476 | try_again = SIGNAL_WAIT(&linda->write_happened, &K->keeper_cs, timeout); | 484 | std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; |
485 | std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) }; | ||
486 | keeper_lock.release(); // we don't want to release the lock! | ||
487 | try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
477 | if (lane != nullptr) | 488 | if (lane != nullptr) |
478 | { | 489 | { |
479 | lane->waiting_on = nullptr; | 490 | lane->m_waiting_on = nullptr; |
480 | lane->status = prev_status; | 491 | lane->status = prev_status; |
481 | } | 492 | } |
482 | } | 493 | } |
483 | } | 494 | } |
495 | STACK_CHECK(KL, 0); | ||
484 | 496 | ||
485 | if (pushed < 0) | 497 | if (pushed < 0) |
486 | { | 498 | { |
@@ -537,13 +549,13 @@ LUAG_FUNC(linda_set) | |||
537 | if (has_value) | 549 | if (has_value) |
538 | { | 550 | { |
539 | // we put some data in the slot, tell readers that they should wake | 551 | // we put some data in the slot, tell readers that they should wake |
540 | SIGNAL_ALL(&linda->write_happened); // To be done from within the 'K' locking area | 552 | linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area |
541 | } | 553 | } |
542 | if (pushed == 1) | 554 | if (pushed == 1) |
543 | { | 555 | { |
544 | // the key was full, but it is no longer the case, tell writers they should wake | 556 | // the key was full, but it is no longer the case, tell writers they should wake |
545 | ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); | 557 | ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); |
546 | SIGNAL_ALL(&linda->read_happened); // To be done from within the 'K' locking area | 558 | linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area |
547 | } | 559 | } |
548 | } | 560 | } |
549 | } | 561 | } |
@@ -648,7 +660,7 @@ LUAG_FUNC( linda_limit) | |||
648 | if( pushed == 1) | 660 | if( pushed == 1) |
649 | { | 661 | { |
650 | ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); | 662 | ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); |
651 | SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area | 663 | linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area |
652 | } | 664 | } |
653 | } | 665 | } |
654 | else // linda is cancelled | 666 | else // linda is cancelled |
@@ -678,8 +690,8 @@ LUAG_FUNC(linda_cancel) | |||
678 | linda->simulate_cancel = CancelRequest::Soft; | 690 | linda->simulate_cancel = CancelRequest::Soft; |
679 | if (strcmp(who, "both") == 0) // tell everyone writers to wake up | 691 | if (strcmp(who, "both") == 0) // tell everyone writers to wake up |
680 | { | 692 | { |
681 | SIGNAL_ALL(&linda->write_happened); | 693 | linda->m_write_happened.notify_all(); |
682 | SIGNAL_ALL(&linda->read_happened); | 694 | linda->m_read_happened.notify_all(); |
683 | } | 695 | } |
684 | else if (strcmp(who, "none") == 0) // reset flag | 696 | else if (strcmp(who, "none") == 0) // reset flag |
685 | { | 697 | { |
@@ -687,11 +699,11 @@ LUAG_FUNC(linda_cancel) | |||
687 | } | 699 | } |
688 | else if (strcmp(who, "read") == 0) // tell blocked readers to wake up | 700 | else if (strcmp(who, "read") == 0) // tell blocked readers to wake up |
689 | { | 701 | { |
690 | SIGNAL_ALL(&linda->write_happened); | 702 | linda->m_write_happened.notify_all(); |
691 | } | 703 | } |
692 | else if (strcmp(who, "write") == 0) // tell blocked writers to wake up | 704 | else if (strcmp(who, "write") == 0) // tell blocked writers to wake up |
693 | { | 705 | { |
694 | SIGNAL_ALL(&linda->read_happened); | 706 | linda->m_read_happened.notify_all(); |
695 | } | 707 | } |
696 | else | 708 | else |
697 | { | 709 | { |