aboutsummaryrefslogtreecommitdiff
path: root/src/linda.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/linda.cpp')
-rw-r--r--src/linda.cpp734
1 files changed, 369 insertions, 365 deletions
diff --git a/src/linda.cpp b/src/linda.cpp
index 103f4ed..8005211 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -57,6 +57,8 @@ class LindaFactory : public DeepFactory
57// I'm not totally happy with having a global variable. But since it's stateless, it will do for the time being. 57// I'm not totally happy with having a global variable. But since it's stateless, it will do for the time being.
58static LindaFactory g_LindaFactory; 58static LindaFactory g_LindaFactory;
59 59
60// #################################################################################################
61
60/* 62/*
61* Actual data is kept within a keeper state, which is hashed by the 'Linda' 63* Actual data is kept within a keeper state, which is hashed by the 'Linda'
62* pointer (which is same to all userdatas pointing to it). 64* pointer (which is same to all userdatas pointing to it).
@@ -110,7 +112,9 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header
110 } 112 }
111 } 113 }
112 114
113 private: 115 static int ProtectedCall(lua_State* L, lua_CFunction f_);
116
117 private :
114 118
115 void setName(char const* name_, size_t len_) 119 void setName(char const* name_, size_t len_)
116 { 120 {
@@ -203,7 +207,8 @@ static void check_key_types(lua_State* L, int start_, int end_)
203 207
204// ################################################################################################# 208// #################################################################################################
205 209
206LUAG_FUNC(linda_protected_call) 210// used to perform all linda operations that access keepers
211int Linda::ProtectedCall(lua_State* L, lua_CFunction f_)
207{ 212{
208 Linda* const linda{ ToLinda<false>(L, 1) }; 213 Linda* const linda{ ToLinda<false>(L, 1) };
209 214
@@ -215,8 +220,8 @@ LUAG_FUNC(linda_protected_call)
215 // if we didn't do anything wrong, the keeper stack should be clean 220 // if we didn't do anything wrong, the keeper stack should be clean
216 ASSERT_L(lua_gettop(KL) == 0); 221 ASSERT_L(lua_gettop(KL) == 0);
217 222
218 // retrieve the actual function to be called and move it before the arguments 223 // push the function to be called and move it before the arguments
219 lua_pushvalue(L, lua_upvalueindex(1)); 224 lua_pushcfunction(L, f_);
220 lua_insert(L, 1); 225 lua_insert(L, 1);
221 // do a protected call 226 // do a protected call
222 int const rc{ lua_pcall(L, lua_gettop(L) - 1, LUA_MULTRET, 0) }; 227 int const rc{ lua_pcall(L, lua_gettop(L) - 1, LUA_MULTRET, 0) };
@@ -248,62 +253,232 @@ LUAG_FUNC(linda_protected_call)
248*/ 253*/
249LUAG_FUNC(linda_send) 254LUAG_FUNC(linda_send)
250{ 255{
251 Linda* const linda{ ToLinda<false>(L, 1) }; 256 auto send = [](lua_State* L)
252 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
253 int key_i{ 2 }; // index of first key, if timeout not there
254
255 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
256 { 257 {
257 lua_Duration const duration{ lua_tonumber(L, 2) }; 258 Linda* const linda{ ToLinda<false>(L, 1) };
258 if (duration.count() >= 0.0) 259 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
260 int key_i{ 2 }; // index of first key, if timeout not there
261
262 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
263 {
264 lua_Duration const duration{ lua_tonumber(L, 2) };
265 if (duration.count() >= 0.0)
266 {
267 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
268 }
269 ++key_i;
270 }
271 else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key
259 { 272 {
260 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); 273 ++key_i;
261 } 274 }
262 ++key_i;
263 }
264 else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key
265 {
266 ++key_i;
267 }
268 275
269 bool const as_nil_sentinel{ NIL_SENTINEL.equals(L, key_i) }; // if not nullptr, send() will silently send a single nil if nothing is provided 276 bool const as_nil_sentinel{ NIL_SENTINEL.equals(L, key_i) }; // if not nullptr, send() will silently send a single nil if nothing is provided
270 if (as_nil_sentinel) 277 if (as_nil_sentinel)
271 { 278 {
272 // the real key to send data to is after the NIL_SENTINEL marker 279 // the real key to send data to is after the NIL_SENTINEL marker
273 ++key_i; 280 ++key_i;
274 } 281 }
282
283 // make sure the key is of a valid type
284 check_key_types(L, key_i, key_i);
285
286 STACK_GROW(L, 1);
287
288 // make sure there is something to send
289 if (lua_gettop(L) == key_i)
290 {
291 if (as_nil_sentinel)
292 {
293 // send a single nil if nothing is provided
294 NIL_SENTINEL.pushKey(L);
295 }
296 else
297 {
298 return luaL_error(L, "no data to send");
299 }
300 }
301
302 // convert nils to some special non-nil sentinel in sent values
303 keeper_toggle_nil_sentinels(L, key_i + 1, LookupMode::ToKeeper);
304 bool ret{ false };
305 CancelRequest cancel{ CancelRequest::None };
306 KeeperCallResult pushed;
307 {
308 Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) };
309 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
310 lua_State* const KL{ K ? K->L : nullptr };
311 if (KL == nullptr)
312 return 0;
313
314 STACK_CHECK_START_REL(KL, 0);
315 for (bool try_again{ true };;)
316 {
317 if (lane != nullptr)
318 {
319 cancel = lane->cancel_request;
320 }
321 cancel = (cancel != CancelRequest::None) ? cancel : linda->simulate_cancel;
322 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
323 if (!try_again || cancel != CancelRequest::None)
324 {
325 pushed.emplace(0);
326 break;
327 }
328
329 STACK_CHECK(KL, 0);
330 pushed = keeper_call(linda->U, KL, KEEPER_API(send), L, linda, key_i);
331 if (!pushed.has_value())
332 {
333 break;
334 }
335 ASSERT_L(pushed.value() == 1);
336
337 ret = lua_toboolean(L, -1) ? true : false;
338 lua_pop(L, 1);
275 339
276 // make sure the key is of a valid type 340 if (ret)
277 check_key_types(L, key_i, key_i); 341 {
342 // Wake up ALL waiting threads
343 linda->m_write_happened.notify_all();
344 break;
345 }
278 346
279 STACK_GROW(L, 1); 347 // instant timout to bypass the wait syscall
348 if (std::chrono::steady_clock::now() >= until)
349 {
350 break; /* no wait; instant timeout */
351 }
280 352
281 // make sure there is something to send 353 // storage limit hit, wait until timeout or signalled that we should try again
282 if (lua_gettop(L) == key_i) 354 {
355 Lane::Status prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
356 if (lane != nullptr)
357 {
358 // change status of lane to "waiting"
359 prev_status = lane->m_status; // Running, most likely
360 ASSERT_L(prev_status == Lane::Running); // but check, just in case
361 lane->m_status = Lane::Waiting;
362 ASSERT_L(lane->m_waiting_on == nullptr);
363 lane->m_waiting_on = &linda->m_read_happened;
364 }
365 // could not send because no room: wait until some data was read before trying again, or until timeout is reached
366 std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock };
367 std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) };
368 keeper_lock.release(); // we don't want to release the lock!
369 try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
370 if (lane != nullptr)
371 {
372 lane->m_waiting_on = nullptr;
373 lane->m_status = prev_status;
374 }
375 }
376 }
377 STACK_CHECK(KL, 0);
378 }
379
380 if (!pushed.has_value())
381 {
382 luaL_error(L, "tried to copy unsupported types"); // doesn't return
383 }
384
385 switch (cancel)
386 {
387 case CancelRequest::Soft:
388 // if user wants to soft-cancel, the call returns lanes.cancel_error
389 CANCEL_ERROR.pushKey(L);
390 return 1;
391
392 case CancelRequest::Hard:
393 // raise an error interrupting execution only in case of hard cancel
394 raise_cancel_error(L); // raises an error and doesn't return
395
396 default:
397 lua_pushboolean(L, ret); // true (success) or false (timeout)
398 return 1;
399 }
400 };
401 return Linda::ProtectedCall(L, send);
402}
403
404// #################################################################################################
405
406/*
407 * 2 modes of operation
408 * [val, key]= linda_receive( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata [, ...] )
409 * Consumes a single value from the Linda, in any key.
410 * Returns: received value (which is consumed from the slot), and the key which had it
411
412 * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
413 * Consumes between min_COUNT and max_COUNT values from the linda, from a single key.
414 * returns the actual consumed values, or nil if there weren't enough values to consume
415 *
416 */
417LUAG_FUNC(linda_receive)
418{
419 auto receive = [](lua_State* L)
283 { 420 {
284 if (as_nil_sentinel) 421 Linda* const linda{ ToLinda<false>(L, 1) };
422 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
423 int key_i{ 2 }; // index of first key, if timeout not there
424
425 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
426 {
427 lua_Duration const duration{ lua_tonumber(L, 2) };
428 if (duration.count() >= 0.0)
429 {
430 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
431 }
432 ++key_i;
433 }
434 else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key
285 { 435 {
286 // send a single nil if nothing is provided 436 ++key_i;
287 NIL_SENTINEL.pushKey(L); 437 }
438
439 keeper_api_t selected_keeper_receive{ nullptr };
440 int expected_pushed_min{ 0 }, expected_pushed_max{ 0 };
441 // are we in batched mode?
442 BATCH_SENTINEL.pushKey(L);
443 int const is_batched{ lua501_equal(L, key_i, -1) };
444 lua_pop(L, 1);
445 if (is_batched)
446 {
447 // no need to pass linda.batched in the keeper state
448 ++key_i;
449 // make sure the keys are of a valid type
450 check_key_types(L, key_i, key_i);
451 // receive multiple values from a single slot
452 selected_keeper_receive = KEEPER_API(receive_batched);
453 // we expect a user-defined amount of return value
454 expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1);
455 expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min);
456 // don't forget to count the key in addition to the values
457 ++expected_pushed_min;
458 ++expected_pushed_max;
459 if (expected_pushed_min > expected_pushed_max)
460 {
461 return luaL_error(L, "batched min/max error");
462 }
288 } 463 }
289 else 464 else
290 { 465 {
291 return luaL_error(L, "no data to send"); 466 // make sure the keys are of a valid type
467 check_key_types(L, key_i, lua_gettop(L));
468 // receive a single value, checking multiple slots
469 selected_keeper_receive = KEEPER_API(receive);
470 // we expect a single (value, key) pair of returned values
471 expected_pushed_min = expected_pushed_max = 2;
292 } 472 }
293 }
294 473
295 // convert nils to some special non-nil sentinel in sent values
296 keeper_toggle_nil_sentinels(L, key_i + 1, LookupMode::ToKeeper);
297 bool ret{ false };
298 CancelRequest cancel{ CancelRequest::None };
299 KeeperCallResult pushed;
300 {
301 Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; 474 Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) };
302 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; 475 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
303 lua_State* const KL{ K ? K->L : nullptr }; 476 lua_State* const KL{ K ? K->L : nullptr };
304 if (KL == nullptr) 477 if (KL == nullptr)
305 return 0; 478 return 0;
306 479
480 CancelRequest cancel{ CancelRequest::None };
481 KeeperCallResult pushed;
307 STACK_CHECK_START_REL(KL, 0); 482 STACK_CHECK_START_REL(KL, 0);
308 for (bool try_again{ true };;) 483 for (bool try_again{ true };;)
309 { 484 {
@@ -319,31 +494,29 @@ LUAG_FUNC(linda_send)
319 break; 494 break;
320 } 495 }
321 496
322 STACK_CHECK(KL, 0); 497 // all arguments of receive() but the first are passed to the keeper's receive function
323 pushed = keeper_call(linda->U, KL, KEEPER_API(send), L, linda, key_i); 498 pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i);
324 if (!pushed.has_value()) 499 if (!pushed.has_value())
325 { 500 {
326 break; 501 break;
327 } 502 }
328 ASSERT_L(pushed.value() == 1); 503 if (pushed.value() > 0)
329
330 ret = lua_toboolean(L, -1) ? true : false;
331 lua_pop(L, 1);
332
333 if (ret)
334 { 504 {
335 // Wake up ALL waiting threads 505 ASSERT_L(pushed.value() >= expected_pushed_min && pushed.value() <= expected_pushed_max);
336 linda->m_write_happened.notify_all(); 506 // replace sentinels with real nils
507 keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed.value(), LookupMode::FromKeeper);
508 // To be done from within the 'K' locking area
509 //
510 linda->m_read_happened.notify_all();
337 break; 511 break;
338 } 512 }
339 513
340 // instant timout to bypass the wait syscall
341 if (std::chrono::steady_clock::now() >= until) 514 if (std::chrono::steady_clock::now() >= until)
342 { 515 {
343 break; /* no wait; instant timeout */ 516 break; /* instant timeout */
344 } 517 }
345 518
346 // storage limit hit, wait until timeout or signalled that we should try again 519 // nothing received, wait until timeout or signalled that we should try again
347 { 520 {
348 Lane::Status prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings 521 Lane::Status prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
349 if (lane != nullptr) 522 if (lane != nullptr)
@@ -353,11 +526,11 @@ LUAG_FUNC(linda_send)
353 ASSERT_L(prev_status == Lane::Running); // but check, just in case 526 ASSERT_L(prev_status == Lane::Running); // but check, just in case
354 lane->m_status = Lane::Waiting; 527 lane->m_status = Lane::Waiting;
355 ASSERT_L(lane->m_waiting_on == nullptr); 528 ASSERT_L(lane->m_waiting_on == nullptr);
356 lane->m_waiting_on = &linda->m_read_happened; 529 lane->m_waiting_on = &linda->m_write_happened;
357 } 530 }
358 // could not send because no room: wait until some data was read before trying again, or until timeout is reached 531 // not enough data to read: wakeup when data was sent, or when timeout is reached
359 std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; 532 std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock };
360 std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) }; 533 std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) };
361 keeper_lock.release(); // we don't want to release the lock! 534 keeper_lock.release(); // we don't want to release the lock!
362 try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups 535 try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
363 if (lane != nullptr) 536 if (lane != nullptr)
@@ -368,188 +541,28 @@ LUAG_FUNC(linda_send)
368 } 541 }
369 } 542 }
370 STACK_CHECK(KL, 0); 543 STACK_CHECK(KL, 0);
371 }
372
373 if (!pushed.has_value())
374 {
375 luaL_error(L, "tried to copy unsupported types"); // doesn't return
376 }
377
378 switch (cancel)
379 {
380 case CancelRequest::Soft:
381 // if user wants to soft-cancel, the call returns lanes.cancel_error
382 CANCEL_ERROR.pushKey(L);
383 return 1;
384
385 case CancelRequest::Hard:
386 // raise an error interrupting execution only in case of hard cancel
387 raise_cancel_error(L); // raises an error and doesn't return
388
389 default:
390 lua_pushboolean(L, ret); // true (success) or false (timeout)
391 return 1;
392 }
393}
394
395// #################################################################################################
396
397/*
398 * 2 modes of operation
399 * [val, key]= linda_receive( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata [, ...] )
400 * Consumes a single value from the Linda, in any key.
401 * Returns: received value (which is consumed from the slot), and the key which had it
402
403 * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
404 * Consumes between min_COUNT and max_COUNT values from the linda, from a single key.
405 * returns the actual consumed values, or nil if there weren't enough values to consume
406 *
407 */
408LUAG_FUNC(linda_receive)
409{
410 Linda* const linda{ ToLinda<false>(L, 1) };
411 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
412 int key_i{ 2 }; // index of first key, if timeout not there
413
414 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
415 {
416 lua_Duration const duration{ lua_tonumber(L, 2) };
417 if (duration.count() >= 0.0)
418 {
419 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
420 }
421 ++key_i;
422 }
423 else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key
424 {
425 ++key_i;
426 }
427
428 keeper_api_t selected_keeper_receive{ nullptr };
429 int expected_pushed_min{ 0 }, expected_pushed_max{ 0 };
430 // are we in batched mode?
431 BATCH_SENTINEL.pushKey(L);
432 int const is_batched{ lua501_equal(L, key_i, -1) };
433 lua_pop(L, 1);
434 if (is_batched)
435 {
436 // no need to pass linda.batched in the keeper state
437 ++key_i;
438 // make sure the keys are of a valid type
439 check_key_types(L, key_i, key_i);
440 // receive multiple values from a single slot
441 selected_keeper_receive = KEEPER_API(receive_batched);
442 // we expect a user-defined amount of return value
443 expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1);
444 expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min);
445 // don't forget to count the key in addition to the values
446 ++expected_pushed_min;
447 ++expected_pushed_max;
448 if (expected_pushed_min > expected_pushed_max)
449 {
450 return luaL_error(L, "batched min/max error");
451 }
452 }
453 else
454 {
455 // make sure the keys are of a valid type
456 check_key_types(L, key_i, lua_gettop(L));
457 // receive a single value, checking multiple slots
458 selected_keeper_receive = KEEPER_API(receive);
459 // we expect a single (value, key) pair of returned values
460 expected_pushed_min = expected_pushed_max = 2;
461 }
462
463 Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) };
464 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
465 lua_State* const KL{ K ? K->L : nullptr };
466 if (KL == nullptr)
467 return 0;
468
469 CancelRequest cancel{ CancelRequest::None };
470 KeeperCallResult pushed;
471 STACK_CHECK_START_REL(KL, 0);
472 for (bool try_again{ true };;)
473 {
474 if (lane != nullptr)
475 {
476 cancel = lane->cancel_request;
477 }
478 cancel = (cancel != CancelRequest::None) ? cancel : linda->simulate_cancel;
479 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
480 if (!try_again || cancel != CancelRequest::None)
481 {
482 pushed.emplace(0);
483 break;
484 }
485 544
486 // all arguments of receive() but the first are passed to the keeper's receive function
487 pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i);
488 if (!pushed.has_value()) 545 if (!pushed.has_value())
489 { 546 {
490 break; 547 return luaL_error(L, "tried to copy unsupported types");
491 }
492 if (pushed.value() > 0)
493 {
494 ASSERT_L(pushed.value() >= expected_pushed_min && pushed.value() <= expected_pushed_max);
495 // replace sentinels with real nils
496 keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed.value(), LookupMode::FromKeeper);
497 // To be done from within the 'K' locking area
498 //
499 linda->m_read_happened.notify_all();
500 break;
501 } 548 }
502 549
503 if (std::chrono::steady_clock::now() >= until) 550 switch (cancel)
504 { 551 {
505 break; /* instant timeout */ 552 case CancelRequest::Soft:
506 } 553 // if user wants to soft-cancel, the call returns CANCEL_ERROR
554 CANCEL_ERROR.pushKey(L);
555 return 1;
507 556
508 // nothing received, wait until timeout or signalled that we should try again 557 case CancelRequest::Hard:
509 { 558 // raise an error interrupting execution only in case of hard cancel
510 Lane::Status prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings 559 raise_cancel_error(L); // raises an error and doesn't return
511 if (lane != nullptr)
512 {
513 // change status of lane to "waiting"
514 prev_status = lane->m_status; // Running, most likely
515 ASSERT_L(prev_status == Lane::Running); // but check, just in case
516 lane->m_status = Lane::Waiting;
517 ASSERT_L(lane->m_waiting_on == nullptr);
518 lane->m_waiting_on = &linda->m_write_happened;
519 }
520 // not enough data to read: wakeup when data was sent, or when timeout is reached
521 std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock };
522 std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) };
523 keeper_lock.release(); // we don't want to release the lock!
524 try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
525 if (lane != nullptr)
526 {
527 lane->m_waiting_on = nullptr;
528 lane->m_status = prev_status;
529 }
530 }
531 }
532 STACK_CHECK(KL, 0);
533 560
534 if (!pushed.has_value()) 561 default:
535 { 562 return pushed.value();
536 return luaL_error(L, "tried to copy unsupported types"); 563 }
537 } 564 };
538 565 return Linda::ProtectedCall(L, receive);
539 switch (cancel)
540 {
541 case CancelRequest::Soft:
542 // if user wants to soft-cancel, the call returns CANCEL_ERROR
543 CANCEL_ERROR.pushKey(L);
544 return 1;
545
546 case CancelRequest::Hard:
547 // raise an error interrupting execution only in case of hard cancel
548 raise_cancel_error(L); // raises an error and doesn't return
549
550 default:
551 return pushed.value();
552 }
553} 566}
554 567
555// ################################################################################################# 568// #################################################################################################
@@ -564,47 +577,51 @@ LUAG_FUNC(linda_receive)
564*/ 577*/
565LUAG_FUNC(linda_set) 578LUAG_FUNC(linda_set)
566{ 579{
567 Linda* const linda{ ToLinda<false>(L, 1) }; 580 auto set = [](lua_State* L)
568 bool const has_value{ lua_gettop(L) > 2 };
569 // make sure the key is of a valid type (throws an error if not the case)
570 check_key_types(L, 2, 2);
571
572 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
573 KeeperCallResult pushed;
574 if (linda->simulate_cancel == CancelRequest::None)
575 { 581 {
576 if (has_value) 582 Linda* const linda{ ToLinda<false>(L, 1) };
577 { 583 bool const has_value{ lua_gettop(L) > 2 };
578 // convert nils to some special non-nil sentinel in sent values 584 // make sure the key is of a valid type (throws an error if not the case)
579 keeper_toggle_nil_sentinels(L, 3, LookupMode::ToKeeper); 585 check_key_types(L, 2, 2);
580 }
581 pushed = keeper_call(linda->U, K->L, KEEPER_API(set), L, linda, 2);
582 if (pushed.has_value()) // no error?
583 {
584 ASSERT_L(pushed.value() == 0 || pushed.value() == 1);
585 586
587 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
588 KeeperCallResult pushed;
589 if (linda->simulate_cancel == CancelRequest::None)
590 {
586 if (has_value) 591 if (has_value)
587 { 592 {
588 // we put some data in the slot, tell readers that they should wake 593 // convert nils to some special non-nil sentinel in sent values
589 linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area 594 keeper_toggle_nil_sentinels(L, 3, LookupMode::ToKeeper);
590 } 595 }
591 if (pushed.value() == 1) 596 pushed = keeper_call(linda->U, K->L, KEEPER_API(set), L, linda, 2);
597 if (pushed.has_value()) // no error?
592 { 598 {
593 // the key was full, but it is no longer the case, tell writers they should wake 599 ASSERT_L(pushed.value() == 0 || pushed.value() == 1);
594 ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); 600
595 linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area 601 if (has_value)
602 {
603 // we put some data in the slot, tell readers that they should wake
604 linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area
605 }
606 if (pushed.value() == 1)
607 {
608 // the key was full, but it is no longer the case, tell writers they should wake
609 ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1);
610 linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area
611 }
596 } 612 }
597 } 613 }
598 } 614 else // linda is cancelled
599 else // linda is cancelled 615 {
600 { 616 // do nothing and return lanes.cancel_error
601 // do nothing and return lanes.cancel_error 617 CANCEL_ERROR.pushKey(L);
602 CANCEL_ERROR.pushKey(L); 618 pushed.emplace(1);
603 pushed.emplace(1); 619 }
604 }
605 620
606 // must trigger any error after keeper state has been released 621 // must trigger any error after keeper state has been released
607 return OptionalValue(pushed, L, "tried to copy unsupported types"); 622 return OptionalValue(pushed, L, "tried to copy unsupported types");
623 };
624 return Linda::ProtectedCall(L, set);
608} 625}
609 626
610// ################################################################################################# 627// #################################################################################################
@@ -616,13 +633,17 @@ LUAG_FUNC(linda_set)
616 */ 633 */
617LUAG_FUNC(linda_count) 634LUAG_FUNC(linda_count)
618{ 635{
619 Linda* const linda{ ToLinda<false>(L, 1) }; 636 auto count = [](lua_State* L)
620 // make sure the keys are of a valid type 637 {
621 check_key_types(L, 2, lua_gettop(L)); 638 Linda* const linda{ ToLinda<false>(L, 1) };
639 // make sure the keys are of a valid type
640 check_key_types(L, 2, lua_gettop(L));
622 641
623 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; 642 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
624 KeeperCallResult const pushed{ keeper_call(linda->U, K->L, KEEPER_API(count), L, linda, 2) }; 643 KeeperCallResult const pushed{ keeper_call(linda->U, K->L, KEEPER_API(count), L, linda, 2) };
625 return OptionalValue(pushed, L, "tried to count an invalid key"); 644 return OptionalValue(pushed, L, "tried to count an invalid key");
645 };
646 return Linda::ProtectedCall(L, count);
626} 647}
627 648
628// ################################################################################################# 649// #################################################################################################
@@ -634,31 +655,35 @@ LUAG_FUNC(linda_count)
634*/ 655*/
635LUAG_FUNC(linda_get) 656LUAG_FUNC(linda_get)
636{ 657{
637 Linda* const linda{ ToLinda<false>(L, 1) }; 658 auto get = [](lua_State* L)
638 lua_Integer const count{ luaL_optinteger(L, 3, 1) };
639 luaL_argcheck(L, count >= 1, 3, "count should be >= 1");
640 luaL_argcheck(L, lua_gettop(L) <= 3, 4, "too many arguments");
641 // make sure the key is of a valid type (throws an error if not the case)
642 check_key_types(L, 2, 2);
643
644 KeeperCallResult pushed;
645 if (linda->simulate_cancel == CancelRequest::None)
646 { 659 {
647 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; 660 Linda* const linda{ ToLinda<false>(L, 1) };
648 pushed = keeper_call(linda->U, K->L, KEEPER_API(get), L, linda, 2); 661 lua_Integer const count{ luaL_optinteger(L, 3, 1) };
649 if (pushed.value_or(0) > 0) 662 luaL_argcheck(L, count >= 1, 3, "count should be >= 1");
663 luaL_argcheck(L, lua_gettop(L) <= 3, 4, "too many arguments");
664 // make sure the key is of a valid type (throws an error if not the case)
665 check_key_types(L, 2, 2);
666
667 KeeperCallResult pushed;
668 if (linda->simulate_cancel == CancelRequest::None)
650 { 669 {
651 keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed.value(), LookupMode::FromKeeper); 670 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
671 pushed = keeper_call(linda->U, K->L, KEEPER_API(get), L, linda, 2);
672 if (pushed.value_or(0) > 0)
673 {
674 keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed.value(), LookupMode::FromKeeper);
675 }
652 } 676 }
653 } 677 else // linda is cancelled
654 else // linda is cancelled 678 {
655 { 679 // do nothing and return lanes.cancel_error
656 // do nothing and return lanes.cancel_error 680 CANCEL_ERROR.pushKey(L);
657 CANCEL_ERROR.pushKey(L); 681 pushed.emplace(1);
658 pushed.emplace(1); 682 }
659 } 683 // an error can be raised if we attempt to read an unregistered function
660 // an error can be raised if we attempt to read an unregistered function 684 return OptionalValue(pushed, L, "tried to copy unsupported types");
661 return OptionalValue(pushed, L, "tried to copy unsupported types"); 685 };
686 return Linda::ProtectedCall(L, get);
662} 687}
663 688
664// ################################################################################################# 689// #################################################################################################
@@ -671,34 +696,38 @@ LUAG_FUNC(linda_get)
671*/ 696*/
672LUAG_FUNC(linda_limit) 697LUAG_FUNC(linda_limit)
673{ 698{
674 Linda* const linda{ ToLinda<false>(L, 1) }; 699 auto limit = [](lua_State* L)
675 // make sure we got 3 arguments: the linda, a key and a limit
676 luaL_argcheck( L, lua_gettop( L) == 3, 2, "wrong number of arguments");
677 // make sure we got a numeric limit
678 luaL_checknumber( L, 3);
679 // make sure the key is of a valid type
680 check_key_types( L, 2, 2);
681
682 KeeperCallResult pushed;
683 if (linda->simulate_cancel == CancelRequest::None)
684 { 700 {
685 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; 701 Linda* const linda{ ToLinda<false>(L, 1) };
686 pushed = keeper_call(linda->U, K->L, KEEPER_API(limit), L, linda, 2); 702 // make sure we got 3 arguments: the linda, a key and a limit
687 ASSERT_L( pushed.has_value() && (pushed.value() == 0 || pushed.value() == 1)); // no error, optional boolean value saying if we should wake blocked writer threads 703 luaL_argcheck( L, lua_gettop( L) == 3, 2, "wrong number of arguments");
688 if (pushed.value() == 1) 704 // make sure we got a numeric limit
705 luaL_checknumber( L, 3);
706 // make sure the key is of a valid type
707 check_key_types( L, 2, 2);
708
709 KeeperCallResult pushed;
710 if (linda->simulate_cancel == CancelRequest::None)
689 { 711 {
690 ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); 712 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
691 linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area 713 pushed = keeper_call(linda->U, K->L, KEEPER_API(limit), L, linda, 2);
714 ASSERT_L( pushed.has_value() && (pushed.value() == 0 || pushed.value() == 1)); // no error, optional boolean value saying if we should wake blocked writer threads
715 if (pushed.value() == 1)
716 {
717 ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1);
718 linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area
719 }
692 } 720 }
693 } 721 else // linda is cancelled
694 else // linda is cancelled 722 {
695 { 723 // do nothing and return lanes.cancel_error
696 // do nothing and return lanes.cancel_error 724 CANCEL_ERROR.pushKey(L);
697 CANCEL_ERROR.pushKey(L); 725 pushed.emplace(1);
698 pushed.emplace(1); 726 }
699 } 727 // propagate pushed boolean if any
700 // propagate pushed boolean if any 728 return pushed.value();
701 return pushed.value(); 729 };
730 return Linda::ProtectedCall(L, limit);
702} 731}
703 732
704// ################################################################################################# 733// #################################################################################################
@@ -831,8 +860,12 @@ LUAG_FUNC(linda_concat)
831 */ 860 */
832LUAG_FUNC(linda_dump) 861LUAG_FUNC(linda_dump)
833{ 862{
834 Linda* const linda{ ToLinda<false>(L, 1) }; 863 auto dump = [](lua_State* L)
835 return keeper_push_linda_storage(linda->U, Dest{ L }, linda, linda->hashSeed()); 864 {
865 Linda* const linda{ ToLinda<false>(L, 1) };
866 return keeper_push_linda_storage(linda->U, Dest{ L }, linda, linda->hashSeed());
867 };
868 return Linda::ProtectedCall(L, dump);
836} 869}
837 870
838// ################################################################################################# 871// #################################################################################################
@@ -922,6 +955,23 @@ void LindaFactory::deleteDeepObjectInternal(lua_State* L, DeepPrelude* o_) const
922 955
923// ################################################################################################# 956// #################################################################################################
924 957
958static luaL_Reg const s_LindaMT[] =
959{
960 { "__concat", LG_linda_concat },
961 { "__tostring", LG_linda_tostring },
962 { "__towatch", LG_linda_towatch }, // Decoda __towatch support
963 { "cancel", LG_linda_cancel },
964 { "count", LG_linda_count },
965 { "deep", LG_linda_deep },
966 { "dump", LG_linda_dump },
967 { "get", LG_linda_get },
968 { "limit", LG_linda_limit },
969 { "receive", LG_linda_receive },
970 { "send", LG_linda_send },
971 { "set", LG_linda_set },
972 { nullptr, nullptr }
973};
974
925void LindaFactory::createMetatable(lua_State* L) const 975void LindaFactory::createMetatable(lua_State* L) const
926{ 976{
927 STACK_CHECK_START_REL(L, 0); 977 STACK_CHECK_START_REL(L, 0);
@@ -934,54 +984,8 @@ void LindaFactory::createMetatable(lua_State* L) const
934 lua_pushliteral(L, "Linda"); 984 lua_pushliteral(L, "Linda");
935 lua_setfield(L, -2, "__metatable"); 985 lua_setfield(L, -2, "__metatable");
936 986
937 lua_pushcfunction(L, LG_linda_tostring); 987 // the linda functions
938 lua_setfield(L, -2, "__tostring"); 988 luaL_setfuncs(L, s_LindaMT, 0);
939
940 // Decoda __towatch support
941 lua_pushcfunction(L, LG_linda_towatch);
942 lua_setfield(L, -2, "__towatch");
943
944 lua_pushcfunction(L, LG_linda_concat);
945 lua_setfield(L, -2, "__concat");
946
947 // protected calls, to ensure associated keeper is always released even in case of error
948 // all function are the protected call wrapper, where the actual operation is provided as upvalue
949 // note that this kind of thing can break function lookup as we use the function pointer here and there
950 // TODO: change that and use different functions!
951
952 lua_pushcfunction(L, LG_linda_send);
953 lua_pushcclosure(L, LG_linda_protected_call, 1);
954 lua_setfield(L, -2, "send");
955
956 lua_pushcfunction(L, LG_linda_receive);
957 lua_pushcclosure(L, LG_linda_protected_call, 1);
958 lua_setfield(L, -2, "receive");
959
960 lua_pushcfunction(L, LG_linda_limit);
961 lua_pushcclosure(L, LG_linda_protected_call, 1);
962 lua_setfield(L, -2, "limit");
963
964 lua_pushcfunction(L, LG_linda_set);
965 lua_pushcclosure(L, LG_linda_protected_call, 1);
966 lua_setfield(L, -2, "set");
967
968 lua_pushcfunction(L, LG_linda_count);
969 lua_pushcclosure(L, LG_linda_protected_call, 1);
970 lua_setfield(L, -2, "count");
971
972 lua_pushcfunction(L, LG_linda_get);
973 lua_pushcclosure(L, LG_linda_protected_call, 1);
974 lua_setfield(L, -2, "get");
975
976 lua_pushcfunction(L, LG_linda_cancel);
977 lua_setfield(L, -2, "cancel");
978
979 lua_pushcfunction(L, LG_linda_deep);
980 lua_setfield(L, -2, "deep");
981
982 lua_pushcfunction(L, LG_linda_dump);
983 lua_pushcclosure(L, LG_linda_protected_call, 1);
984 lua_setfield(L, -2, "dump");
985 989
986 // some constants 990 // some constants
987 BATCH_SENTINEL.pushKey(L); 991 BATCH_SENTINEL.pushKey(L);