aboutsummaryrefslogtreecommitdiff
path: root/src/lanes.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lanes.c')
-rw-r--r--src/lanes.c1014
1 files changed, 3 insertions, 1011 deletions
diff --git a/src/lanes.c b/src/lanes.c
index f0a5697..f2e3065 100644
--- a/src/lanes.c
+++ b/src/lanes.c
@@ -52,7 +52,7 @@
52 * ... 52 * ...
53 */ 53 */
54 54
55char const* VERSION = "3.13"; 55char const* VERSION = "3.13.0";
56 56
57/* 57/*
58=============================================================================== 58===============================================================================
@@ -93,7 +93,7 @@ THE SOFTWARE.
93#include "tools.h" 93#include "tools.h"
94#include "universe.h" 94#include "universe.h"
95#include "keeper.h" 95#include "keeper.h"
96#include "uniquekey.h" 96#include "lanes_private.h"
97 97
98#if !(defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) 98#if !(defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC))
99# include <sys/time.h> 99# include <sys/time.h>
@@ -111,105 +111,6 @@ THE SOFTWARE.
111*/ 111*/
112#define ERROR_FULL_STACK 1 // must be either 0 or 1 as we do some index arithmetics with it! 112#define ERROR_FULL_STACK 1 // must be either 0 or 1 as we do some index arithmetics with it!
113 113
114/*
115 * Lane cancellation request modes
116 */
117enum e_cancel_request
118{
119 CANCEL_NONE, // no pending cancel request
120 CANCEL_SOFT, // user wants the lane to cancel itself manually on cancel_test()
121 CANCEL_HARD // user wants the lane to be interrupted (meaning code won't return from those functions) from inside linda:send/receive calls
122};
123
124// NOTE: values to be changed by either thread, during execution, without
125// locking, are marked "volatile"
126//
127struct s_Lane
128{
129 THREAD_T thread;
130 //
131 // M: sub-thread OS thread
132 // S: not used
133
134 char const* debug_name;
135
136 lua_State* L;
137 Universe* U;
138 //
139 // M: prepares the state, and reads results
140 // S: while S is running, M must keep out of modifying the state
141
142 volatile enum e_status status;
143 //
144 // M: sets to PENDING (before launching)
145 // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED
146
147 SIGNAL_T* volatile waiting_on;
148 //
149 // When status is WAITING, points on the linda's signal the thread waits on, else NULL
150
151 volatile enum e_cancel_request cancel_request;
152 //
153 // M: sets to FALSE, flags TRUE for cancel request
154 // S: reads to see if cancel is requested
155
156#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
157 SIGNAL_T done_signal;
158 //
159 // M: Waited upon at lane ending (if Posix with no PTHREAD_TIMEDJOIN)
160 // S: sets the signal once cancellation is noticed (avoids a kill)
161
162 MUTEX_T done_lock;
163 //
164 // Lock required by 'done_signal' condition variable, protecting
165 // lane status changes to DONE/ERROR_ST/CANCELLED.
166#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
167
168 volatile enum
169 {
170 NORMAL, // normal master side state
171 KILLED // issued an OS kill
172 } mstatus;
173 //
174 // M: sets to NORMAL, if issued a kill changes to KILLED
175 // S: not used
176
177 struct s_Lane* volatile selfdestruct_next;
178 //
179 // M: sets to non-NULL if facing lane handle '__gc' cycle but the lane
180 // is still running
181 // S: cleans up after itself if non-NULL at lane exit
182
183#if HAVE_LANE_TRACKING
184 struct s_Lane* volatile tracking_next;
185#endif // HAVE_LANE_TRACKING
186 //
187 // For tracking only
188};
189typedef struct s_Lane Lane;
190
191// To allow free-running threads (longer lifespan than the handle's)
192// 'Lane' are malloc/free'd and the handle only carries a pointer.
193// This is not deep userdata since the handle's not portable among lanes.
194//
195#define lua_toLane( L, i) (*((Lane**) luaL_checkudata( L, i, "Lane")))
196
197// crc64/we of string "CANCEL_TEST_KEY" generated at http://www.nitrxgen.net/hashgen/
198static DECLARE_CONST_UNIQUE_KEY( CANCEL_TEST_KEY, 0xe66f5960c57d133a); // used as registry key
199
200static inline Lane* get_lane_from_registry( lua_State* L)
201{
202 Lane* s;
203 STACK_GROW( L, 1);
204 STACK_CHECK( L);
205 push_unique_key( L, CANCEL_TEST_KEY);
206 lua_rawget( L, LUA_REGISTRYINDEX);
207 s = lua_touserdata( L, -1); // lightuserdata (true 's_lane' pointer) / nil
208 lua_pop( L, 1);
209 STACK_END( L, 0);
210 return s;
211}
212
213// intern the debug name in the specified lua state so that the pointer remains valid when the lane's state is closed 114// intern the debug name in the specified lua state so that the pointer remains valid when the lane's state is closed
214static void securize_debug_threadname( lua_State* L, Lane* s) 115static void securize_debug_threadname( lua_State* L, Lane* s)
215{ 116{
@@ -241,16 +142,6 @@ static inline enum e_cancel_request cancel_test( lua_State* L)
241 return s ? s->cancel_request : CANCEL_NONE; 142 return s ? s->cancel_request : CANCEL_NONE;
242} 143}
243 144
244// crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/
245static DECLARE_CONST_UNIQUE_KEY( CANCEL_ERROR, 0xe97d41626cc97577); // 'cancel_error' sentinel
246
247static int cancel_error( lua_State* L)
248{
249 STACK_GROW( L, 1);
250 push_unique_key( L, CANCEL_ERROR); // special error value
251 return lua_error( L); // doesn't return
252}
253
254static void cancel_hook( lua_State* L, lua_Debug* ar) 145static void cancel_hook( lua_State* L, lua_Debug* ar)
255{ 146{
256 (void)ar; 147 (void)ar;
@@ -411,905 +302,6 @@ static void lane_cleanup( Lane* s)
411 302
412/* 303/*
413 * ############################################################################################### 304 * ###############################################################################################
414 * ############################################ Linda ############################################
415 * ###############################################################################################
416 */
417
418/*
419* Actual data is kept within a keeper state, which is hashed by the 's_Linda'
420* pointer (which is same to all userdatas pointing to it).
421*/
422struct s_Linda
423{
424 SIGNAL_T read_happened;
425 SIGNAL_T write_happened;
426 Universe* U; // the universe this linda belongs to
427 ptrdiff_t group; // a group to control keeper allocation between lindas
428 enum e_cancel_request simulate_cancel;
429 char name[1];
430};
431#define LINDA_KEEPER_HASHSEED( linda) (linda->group ? linda->group : (ptrdiff_t)linda)
432
433static void* linda_id( lua_State*, DeepOp);
434
435static inline struct s_Linda* lua_toLinda( lua_State* L, int idx_)
436{
437 struct s_Linda* linda = (struct s_Linda*) luaG_todeep( L, linda_id, idx_);
438 luaL_argcheck( L, linda != NULL, idx_, "expecting a linda object");
439 return linda;
440}
441
442static void check_key_types( lua_State* L, int start_, int end_)
443{
444 int i;
445 for( i = start_; i <= end_; ++ i)
446 {
447 int t = lua_type( L, i);
448 if( t == LUA_TBOOLEAN || t == LUA_TNUMBER || t == LUA_TSTRING || t == LUA_TLIGHTUSERDATA)
449 {
450 continue;
451 }
452 (void) luaL_error( L, "argument #%d: invalid key type (not a boolean, string, number or light userdata)", i);
453 }
454}
455
456LUAG_FUNC( linda_protected_call)
457{
458 int rc = LUA_OK;
459 struct s_Linda* linda = lua_toLinda( L, 1);
460
461 // acquire the keeper
462 Keeper* K = keeper_acquire( linda->U->keepers, LINDA_KEEPER_HASHSEED(linda));
463 lua_State* KL = K ? K->L : NULL; // need to do this for 'STACK_CHECK'
464 if( KL == NULL) return 0;
465
466 // retrieve the actual function to be called and move it before the arguments
467 lua_pushvalue( L, lua_upvalueindex( 1));
468 lua_insert( L, 1);
469 // do a protected call
470 rc = lua_pcall( L, lua_gettop( L) - 1, LUA_MULTRET, 0);
471
472 // release the keeper
473 keeper_release( K);
474
475 // if there was an error, forward it
476 if( rc != LUA_OK)
477 {
478 return lua_error( L);
479 }
480 // return whatever the actual operation provided
481 return lua_gettop( L);
482}
483
484/*
485* bool= linda_send( linda_ud, [timeout_secs=-1,] [linda.null,] key_num|str|bool|lightuserdata, ... )
486*
487* Send one or more values to a Linda. If there is a limit, all values must fit.
488*
489* Returns: 'true' if the value was queued
490* 'false' for timeout (only happens when the queue size is limited)
491* nil, CANCEL_ERROR if cancelled
492*/
493LUAG_FUNC( linda_send)
494{
495 struct s_Linda* linda = lua_toLinda( L, 1);
496 bool_t ret = FALSE;
497 enum e_cancel_request cancel = CANCEL_NONE;
498 int pushed;
499 time_d timeout = -1.0;
500 uint_t key_i = 2; // index of first key, if timeout not there
501 bool_t as_nil_sentinel; // if not NULL, send() will silently send a single nil if nothing is provided
502
503 if( lua_type( L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
504 {
505 timeout = SIGNAL_TIMEOUT_PREPARE( lua_tonumber( L, 2));
506 ++ key_i;
507 }
508 else if( lua_isnil( L, 2)) // alternate explicit "no timeout" by passing nil before the key
509 {
510 ++ key_i;
511 }
512
513 as_nil_sentinel = equal_unique_key( L, key_i, NIL_SENTINEL);
514 if( as_nil_sentinel)
515 {
516 // the real key to send data to is after the NIL_SENTINEL marker
517 ++ key_i;
518 }
519
520 // make sure the key is of a valid type
521 check_key_types( L, key_i, key_i);
522
523 STACK_GROW( L, 1);
524
525 // make sure there is something to send
526 if( (uint_t)lua_gettop( L) == key_i)
527 {
528 if( as_nil_sentinel)
529 {
530 // send a single nil if nothing is provided
531 push_unique_key( L, NIL_SENTINEL);
532 }
533 else
534 {
535 return luaL_error( L, "no data to send");
536 }
537 }
538
539 // convert nils to some special non-nil sentinel in sent values
540 keeper_toggle_nil_sentinels( L, key_i + 1, eLM_ToKeeper);
541
542 {
543 bool_t try_again = TRUE;
544 Lane* const s = get_lane_from_registry( L);
545 Keeper* K = which_keeper( linda->U->keepers, LINDA_KEEPER_HASHSEED( linda));
546 lua_State* KL = K ? K->L : NULL; // need to do this for 'STACK_CHECK'
547 if( KL == NULL) return 0;
548 STACK_CHECK( KL);
549 for( ;;)
550 {
551 if( s != NULL)
552 {
553 cancel = s->cancel_request;
554 }
555 cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel;
556 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
557 if( !try_again || cancel != CANCEL_NONE)
558 {
559 pushed = 0;
560 break;
561 }
562
563 STACK_MID( KL, 0);
564 pushed = keeper_call( linda->U, KL, KEEPER_API( send), L, linda, key_i);
565 if( pushed < 0)
566 {
567 break;
568 }
569 ASSERT_L( pushed == 1);
570
571 ret = lua_toboolean( L, -1);
572 lua_pop( L, 1);
573
574 if( ret)
575 {
576 // Wake up ALL waiting threads
577 SIGNAL_ALL( &linda->write_happened);
578 break;
579 }
580
581 // instant timout to bypass the wait syscall
582 if( timeout == 0.0)
583 {
584 break; /* no wait; instant timeout */
585 }
586
587 // storage limit hit, wait until timeout or signalled that we should try again
588 {
589 enum e_status prev_status = ERROR_ST; // prevent 'might be used uninitialized' warnings
590 if( s != NULL)
591 {
592 // change status of lane to "waiting"
593 prev_status = s->status; // RUNNING, most likely
594 ASSERT_L( prev_status == RUNNING); // but check, just in case
595 s->status = WAITING;
596 ASSERT_L( s->waiting_on == NULL);
597 s->waiting_on = &linda->read_happened;
598 }
599 // could not send because no room: wait until some data was read before trying again, or until timeout is reached
600 try_again = SIGNAL_WAIT( &linda->read_happened, &K->keeper_cs, timeout);
601 if( s != NULL)
602 {
603 s->waiting_on = NULL;
604 s->status = prev_status;
605 }
606 }
607 }
608 STACK_END( KL, 0);
609 }
610
611 if( pushed < 0)
612 {
613 return luaL_error( L, "tried to copy unsupported types");
614 }
615
616 switch( cancel)
617 {
618 case CANCEL_SOFT:
619 // if user wants to soft-cancel, the call returns lanes.cancel_error
620 push_unique_key( L, CANCEL_ERROR);
621 return 1;
622
623 case CANCEL_HARD:
624 // raise an error interrupting execution only in case of hard cancel
625 return cancel_error( L); // raises an error and doesn't return
626
627 default:
628 lua_pushboolean( L, ret); // true (success) or false (timeout)
629 return 1;
630 }
631}
632
633
634/*
635 * 2 modes of operation
636 * [val, key]= linda_receive( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata [, ...] )
637 * Consumes a single value from the Linda, in any key.
638 * Returns: received value (which is consumed from the slot), and the key which had it
639
640 * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
641 * Consumes between min_COUNT and max_COUNT values from the linda, from a single key.
642 * returns the actual consumed values, or nil if there weren't enough values to consume
643 *
644 */
645#define BATCH_SENTINEL "270e6c9d-280f-4983-8fee-a7ecdda01475"
646LUAG_FUNC( linda_receive)
647{
648 struct s_Linda* linda = lua_toLinda( L, 1);
649 int pushed, expected_pushed_min, expected_pushed_max;
650 enum e_cancel_request cancel = CANCEL_NONE;
651 keeper_api_t keeper_receive;
652
653 time_d timeout = -1.0;
654 uint_t key_i = 2;
655
656 if( lua_type( L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
657 {
658 timeout = SIGNAL_TIMEOUT_PREPARE( lua_tonumber( L, 2));
659 ++ key_i;
660 }
661 else if( lua_isnil( L, 2)) // alternate explicit "no timeout" by passing nil before the key
662 {
663 ++ key_i;
664 }
665
666 // are we in batched mode?
667 {
668 int is_batched;
669 lua_pushliteral( L, BATCH_SENTINEL);
670 is_batched = lua501_equal( L, key_i, -1);
671 lua_pop( L, 1);
672 if( is_batched)
673 {
674 // no need to pass linda.batched in the keeper state
675 ++ key_i;
676 // make sure the keys are of a valid type
677 check_key_types( L, key_i, key_i);
678 // receive multiple values from a single slot
679 keeper_receive = KEEPER_API( receive_batched);
680 // we expect a user-defined amount of return value
681 expected_pushed_min = (int)luaL_checkinteger( L, key_i + 1);
682 expected_pushed_max = (int)luaL_optinteger( L, key_i + 2, expected_pushed_min);
683 // don't forget to count the key in addition to the values
684 ++ expected_pushed_min;
685 ++ expected_pushed_max;
686 if( expected_pushed_min > expected_pushed_max)
687 {
688 return luaL_error( L, "batched min/max error");
689 }
690 }
691 else
692 {
693 // make sure the keys are of a valid type
694 check_key_types( L, key_i, lua_gettop( L));
695 // receive a single value, checking multiple slots
696 keeper_receive = KEEPER_API( receive);
697 // we expect a single (value, key) pair of returned values
698 expected_pushed_min = expected_pushed_max = 2;
699 }
700 }
701
702 {
703 bool_t try_again = TRUE;
704 Lane* const s = get_lane_from_registry( L);
705 Keeper* K = which_keeper( linda->U->keepers, LINDA_KEEPER_HASHSEED( linda));
706 if( K == NULL) return 0;
707 for( ;;)
708 {
709 if( s != NULL)
710 {
711 cancel = s->cancel_request;
712 }
713 cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel;
714 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
715 if( !try_again || cancel != CANCEL_NONE)
716 {
717 pushed = 0;
718 break;
719 }
720
721 // all arguments of receive() but the first are passed to the keeper's receive function
722 pushed = keeper_call( linda->U, K->L, keeper_receive, L, linda, key_i);
723 if( pushed < 0)
724 {
725 break;
726 }
727 if( pushed > 0)
728 {
729 ASSERT_L( pushed >= expected_pushed_min && pushed <= expected_pushed_max);
730 // replace sentinels with real nils
731 keeper_toggle_nil_sentinels( L, lua_gettop( L) - pushed, eLM_FromKeeper);
732 // To be done from within the 'K' locking area
733 //
734 SIGNAL_ALL( &linda->read_happened);
735 break;
736 }
737
738 if( timeout == 0.0)
739 {
740 break; /* instant timeout */
741 }
742
743 // nothing received, wait until timeout or signalled that we should try again
744 {
745 enum e_status prev_status = ERROR_ST; // prevent 'might be used uninitialized' warnings
746 if( s != NULL)
747 {
748 // change status of lane to "waiting"
749 prev_status = s->status; // RUNNING, most likely
750 ASSERT_L( prev_status == RUNNING); // but check, just in case
751 s->status = WAITING;
752 ASSERT_L( s->waiting_on == NULL);
753 s->waiting_on = &linda->write_happened;
754 }
755 // not enough data to read: wakeup when data was sent, or when timeout is reached
756 try_again = SIGNAL_WAIT( &linda->write_happened, &K->keeper_cs, timeout);
757 if( s != NULL)
758 {
759 s->waiting_on = NULL;
760 s->status = prev_status;
761 }
762 }
763 }
764 }
765
766 if( pushed < 0)
767 {
768 return luaL_error( L, "tried to copy unsupported types");
769 }
770
771 switch( cancel)
772 {
773 case CANCEL_SOFT:
774 // if user wants to soft-cancel, the call returns CANCEL_ERROR
775 push_unique_key( L, CANCEL_ERROR);
776 return 1;
777
778 case CANCEL_HARD:
779 // raise an error interrupting execution only in case of hard cancel
780 return cancel_error( L); // raises an error and doesn't return
781
782 default:
783 return pushed;
784 }
785}
786
787
788/*
789* [true|lanes.cancel_error] = linda_set( linda_ud, key_num|str|bool|lightuserdata [, value [, ...]])
790*
791* Set one or more value to Linda.
792* TODO: what do we do if we set to non-nil and limit is 0?
793*
794* Existing slot value is replaced, and possible queued entries removed.
795*/
796LUAG_FUNC( linda_set)
797{
798 struct s_Linda* const linda = lua_toLinda( L, 1);
799 int pushed;
800 bool_t has_value = lua_gettop( L) > 2;
801
802 // make sure the key is of a valid type (throws an error if not the case)
803 check_key_types( L, 2, 2);
804
805 {
806 Keeper* K = which_keeper( linda->U->keepers, LINDA_KEEPER_HASHSEED( linda));
807
808 if( linda->simulate_cancel == CANCEL_NONE)
809 {
810 if( has_value)
811 {
812 // convert nils to some special non-nil sentinel in sent values
813 keeper_toggle_nil_sentinels( L, 3, eLM_ToKeeper);
814 }
815 pushed = keeper_call( linda->U, K->L, KEEPER_API( set), L, linda, 2);
816 if( pushed >= 0) // no error?
817 {
818 ASSERT_L( pushed == 0 || pushed == 1);
819
820 if( has_value)
821 {
822 // we put some data in the slot, tell readers that they should wake
823 SIGNAL_ALL( &linda->write_happened); // To be done from within the 'K' locking area
824 }
825 if( pushed == 1)
826 {
827 // the key was full, but it is no longer the case, tell writers they should wake
828 ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1);
829 SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area
830 }
831 }
832 }
833 else // linda is cancelled
834 {
835 // do nothing and return lanes.cancel_error
836 push_unique_key( L, CANCEL_ERROR);
837 pushed = 1;
838 }
839 }
840
841 // must trigger any error after keeper state has been released
842 return (pushed < 0) ? luaL_error( L, "tried to copy unsupported types") : pushed;
843}
844
845
846/*
847 * [val] = linda_count( linda_ud, [key [, ...]])
848 *
849 * Get a count of the pending elements in the specified keys
850 */
851LUAG_FUNC( linda_count)
852{
853 struct s_Linda* linda = lua_toLinda( L, 1);
854 int pushed;
855
856 // make sure the keys are of a valid type
857 check_key_types( L, 2, lua_gettop( L));
858
859 {
860 Keeper* K = which_keeper( linda->U->keepers, LINDA_KEEPER_HASHSEED( linda));
861 pushed = keeper_call( linda->U, K->L, KEEPER_API( count), L, linda, 2);
862 if( pushed < 0)
863 {
864 return luaL_error( L, "tried to count an invalid key");
865 }
866 }
867 return pushed;
868}
869
870
871/*
872* [val [, ...]] = linda_get( linda_ud, key_num|str|bool|lightuserdata [, count = 1])
873*
874* Get one or more values from Linda.
875*/
876LUAG_FUNC( linda_get)
877{
878 struct s_Linda* const linda = lua_toLinda( L, 1);
879 int pushed;
880 lua_Integer count = luaL_optinteger( L, 3, 1);
881 luaL_argcheck( L, count >= 1, 3, "count should be >= 1");
882 luaL_argcheck( L, lua_gettop( L) <= 3, 4, "too many arguments");
883
884 // make sure the key is of a valid type (throws an error if not the case)
885 check_key_types( L, 2, 2);
886 {
887 Keeper* K = which_keeper( linda->U->keepers, LINDA_KEEPER_HASHSEED( linda));
888
889 if( linda->simulate_cancel == CANCEL_NONE)
890 {
891 pushed = keeper_call( linda->U, K->L, KEEPER_API( get), L, linda, 2);
892 if( pushed > 0)
893 {
894 keeper_toggle_nil_sentinels( L, lua_gettop( L) - pushed, eLM_FromKeeper);
895 }
896 }
897 else // linda is cancelled
898 {
899 // do nothing and return lanes.cancel_error
900 push_unique_key( L, CANCEL_ERROR);
901 pushed = 1;
902 }
903 // an error can be raised if we attempt to read an unregistered function
904 if( pushed < 0)
905 {
906 return luaL_error( L, "tried to copy unsupported types");
907 }
908 }
909
910 return pushed;
911}
912
913
914/*
915* [true] = linda_limit( linda_ud, key_num|str|bool|lightuserdata, int)
916*
917* Set limit to 1 Linda keys.
918* Optionally wake threads waiting to write on the linda, in case the limit enables them to do so
919*/
920LUAG_FUNC( linda_limit)
921{
922 struct s_Linda* linda = lua_toLinda( L, 1);
923 int pushed;
924
925 // make sure we got 3 arguments: the linda, a key and a limit
926 luaL_argcheck( L, lua_gettop( L) == 3, 2, "wrong number of arguments");
927 // make sure we got a numeric limit
928 luaL_checknumber( L, 3);
929 // make sure the key is of a valid type
930 check_key_types( L, 2, 2);
931
932 {
933 Keeper* K = which_keeper( linda->U->keepers, LINDA_KEEPER_HASHSEED( linda));
934
935 if( linda->simulate_cancel == CANCEL_NONE)
936 {
937 pushed = keeper_call( linda->U, K->L, KEEPER_API( limit), L, linda, 2);
938 ASSERT_L( pushed == 0 || pushed == 1); // no error, optional boolean value saying if we should wake blocked writer threads
939 if( pushed == 1)
940 {
941 ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1);
942 SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area
943 }
944 }
945 else // linda is cancelled
946 {
947 // do nothing and return lanes.cancel_error
948 push_unique_key( L, CANCEL_ERROR);
949 pushed = 1;
950 }
951 }
952 // propagate pushed boolean if any
953 return pushed;
954}
955
956
957/*
958* (void) = linda_cancel( linda_ud, "read"|"write"|"both"|"none")
959*
960* Signal linda so that waiting threads wake up as if their own lane was cancelled
961*/
962LUAG_FUNC( linda_cancel)
963{
964 struct s_Linda* linda = lua_toLinda( L, 1);
965 char const* who = luaL_optstring( L, 2, "both");
966
967 // make sure we got 3 arguments: the linda, a key and a limit
968 luaL_argcheck( L, lua_gettop( L) <= 2, 2, "wrong number of arguments");
969
970 linda->simulate_cancel = CANCEL_SOFT;
971 if( strcmp( who, "both") == 0) // tell everyone writers to wake up
972 {
973 SIGNAL_ALL( &linda->write_happened);
974 SIGNAL_ALL( &linda->read_happened);
975 }
976 else if( strcmp( who, "none") == 0) // reset flag
977 {
978 linda->simulate_cancel = CANCEL_NONE;
979 }
980 else if( strcmp( who, "read") == 0) // tell blocked readers to wake up
981 {
982 SIGNAL_ALL( &linda->write_happened);
983 }
984 else if( strcmp( who, "write") == 0) // tell blocked writers to wake up
985 {
986 SIGNAL_ALL( &linda->read_happened);
987 }
988 else
989 {
990 return luaL_error( L, "unknown wake hint '%s'", who);
991 }
992 return 0;
993}
994
995
996/*
997* lightuserdata= linda_deep( linda_ud )
998*
999* Return the 'deep' userdata pointer, identifying the Linda.
1000*
1001* This is needed for using Lindas as key indices (timer system needs it);
1002* separately created proxies of the same underlying deep object will have
1003* different userdata and won't be known to be essentially the same deep one
1004* without this.
1005*/
1006LUAG_FUNC( linda_deep)
1007{
1008 struct s_Linda* linda= lua_toLinda( L, 1);
1009 lua_pushlightuserdata( L, linda); // just the address
1010 return 1;
1011}
1012
1013
1014/*
1015* string = linda:__tostring( linda_ud)
1016*
1017* Return the stringification of a linda
1018*
1019* Useful for concatenation or debugging purposes
1020*/
1021
1022static int linda_tostring( lua_State* L, int idx_, bool_t opt_)
1023{
1024 struct s_Linda* linda = (struct s_Linda*) luaG_todeep( L, linda_id, idx_);
1025 if( !opt_)
1026 {
1027 luaL_argcheck( L, linda, idx_, "expecting a linda object");
1028 }
1029 if( linda != NULL)
1030 {
1031 char text[128];
1032 int len;
1033 if( linda->name[0])
1034 len = sprintf( text, "Linda: %.*s", (int)sizeof(text) - 8, linda->name);
1035 else
1036 len = sprintf( text, "Linda: %p", linda);
1037 lua_pushlstring( L, text, len);
1038 return 1;
1039 }
1040 return 0;
1041}
1042
1043LUAG_FUNC( linda_tostring)
1044{
1045 return linda_tostring( L, 1, FALSE);
1046}
1047
1048
1049/*
1050* string = linda:__concat( a, b)
1051*
1052* Return the concatenation of a pair of items, one of them being a linda
1053*
1054* Useful for concatenation or debugging purposes
1055*/
1056LUAG_FUNC( linda_concat)
1057{ // linda1? linda2?
1058 bool_t atLeastOneLinda = FALSE;
1059 // Lua semantics enforce that one of the 2 arguments is a Linda, but not necessarily both.
1060 if( linda_tostring( L, 1, TRUE))
1061 {
1062 atLeastOneLinda = TRUE;
1063 lua_replace( L, 1);
1064 }
1065 if( linda_tostring( L, 2, TRUE))
1066 {
1067 atLeastOneLinda = TRUE;
1068 lua_replace( L, 2);
1069 }
1070 if( !atLeastOneLinda) // should not be possible
1071 {
1072 return luaL_error( L, "internal error: linda_concat called on non-Linda");
1073 }
1074 lua_concat( L, 2);
1075 return 1;
1076}
1077
1078/*
1079 * table = linda:dump()
1080 * return a table listing all pending data inside the linda
1081 */
1082LUAG_FUNC( linda_dump)
1083{
1084 struct s_Linda* linda = lua_toLinda( L, 1);
1085 ASSERT_L( linda->U == universe_get( L));
1086 return keeper_push_linda_storage( linda->U, L, linda, LINDA_KEEPER_HASHSEED( linda));
1087}
1088
1089/*
1090 * table = linda:dump()
1091 * return a table listing all pending data inside the linda
1092 */
1093LUAG_FUNC( linda_towatch)
1094{
1095 struct s_Linda* linda = lua_toLinda( L, 1);
1096 int pushed;
1097 ASSERT_L( linda->U == universe_get( L));
1098 pushed = keeper_push_linda_storage( linda->U, L, linda, LINDA_KEEPER_HASHSEED( linda));
1099 if( pushed == 0)
1100 {
1101 // if the linda is empty, don't return nil
1102 pushed = linda_tostring( L, 1, FALSE);
1103 }
1104 return pushed;
1105}
1106
1107/*
1108* Identity function of a shared userdata object.
1109*
1110* lightuserdata= linda_id( "new" [, ...] )
1111* = linda_id( "delete", lightuserdata )
1112*
1113* Creation and cleanup of actual 'deep' objects. 'luaG_...' will wrap them into
1114* regular userdata proxies, per each state using the deep data.
1115*
1116* tbl= linda_id( "metatable" )
1117*
1118* Returns a metatable for the proxy objects ('__gc' method not needed; will
1119* be added by 'luaG_...')
1120*
1121* string= linda_id( "module")
1122*
1123* Returns the name of the module that a state should require
1124* in order to keep a handle on the shared library that exported the idfunc
1125*
1126* = linda_id( str, ... )
1127*
1128* For any other strings, the ID function must not react at all. This allows
1129* future extensions of the system.
1130*/
1131static void* linda_id( lua_State* L, DeepOp op_)
1132{
1133 switch( op_)
1134 {
1135 case eDO_new:
1136 {
1137 struct s_Linda* s;
1138 size_t name_len = 0;
1139 char const* linda_name = NULL;
1140 unsigned long linda_group = 0;
1141 // should have a string and/or a number of the stack as parameters (name and group)
1142 switch( lua_gettop( L))
1143 {
1144 default: // 0
1145 break;
1146
1147 case 1: // 1 parameter, either a name or a group
1148 if( lua_type( L, -1) == LUA_TSTRING)
1149 {
1150 linda_name = lua_tolstring( L, -1, &name_len);
1151 }
1152 else
1153 {
1154 linda_group = (unsigned long) lua_tointeger( L, -1);
1155 }
1156 break;
1157
1158 case 2: // 2 parameters, a name and group, in that order
1159 linda_name = lua_tolstring( L, -2, &name_len);
1160 linda_group = (unsigned long) lua_tointeger( L, -1);
1161 break;
1162 }
1163
1164 /* The deep data is allocated separately of Lua stack; we might no
1165 * longer be around when last reference to it is being released.
1166 * One can use any memory allocation scheme.
1167 * just don't use L's allocF because we don't know which state will get the honor of GCing the linda
1168 */
1169 s = (struct s_Linda*) malloc( sizeof(struct s_Linda) + name_len); // terminating 0 is already included
1170 if( s)
1171 {
1172 SIGNAL_INIT( &s->read_happened);
1173 SIGNAL_INIT( &s->write_happened);
1174 s->U = universe_get( L);
1175 s->simulate_cancel = CANCEL_NONE;
1176 s->group = linda_group << KEEPER_MAGIC_SHIFT;
1177 s->name[0] = 0;
1178 memcpy( s->name, linda_name, name_len ? name_len + 1 : 0);
1179 }
1180 return s;
1181 }
1182
1183 case eDO_delete:
1184 {
1185 Keeper* K;
1186 struct s_Linda* linda = lua_touserdata( L, 1);
1187 ASSERT_L( linda);
1188
1189 // Clean associated structures in the keeper state.
1190 K = keeper_acquire( linda->U->keepers, LINDA_KEEPER_HASHSEED( linda));
1191 if( K && K->L) // can be NULL if this happens during main state shutdown (lanes is GC'ed -> no keepers -> no need to cleanup)
1192 {
1193 // hopefully this won't ever raise an error as we would jump to the closest pcall site while forgetting to release the keeper mutex...
1194 keeper_call( linda->U, K->L, KEEPER_API( clear), L, linda, 0);
1195 }
1196 keeper_release( K);
1197
1198 // There aren't any lanes waiting on these lindas, since all proxies have been gc'ed. Right?
1199 SIGNAL_FREE( &linda->read_happened);
1200 SIGNAL_FREE( &linda->write_happened);
1201 free( linda);
1202 return NULL;
1203 }
1204
1205 case eDO_metatable:
1206 {
1207
1208 STACK_CHECK( L);
1209 lua_newtable( L);
1210 // metatable is its own index
1211 lua_pushvalue( L, -1);
1212 lua_setfield( L, -2, "__index");
1213
1214 // protect metatable from external access
1215 lua_pushliteral( L, "Linda");
1216 lua_setfield( L, -2, "__metatable");
1217
1218 lua_pushcfunction( L, LG_linda_tostring);
1219 lua_setfield( L, -2, "__tostring");
1220
1221 // Decoda __towatch support
1222 lua_pushcfunction( L, LG_linda_towatch);
1223 lua_setfield( L, -2, "__towatch");
1224
1225 lua_pushcfunction( L, LG_linda_concat);
1226 lua_setfield( L, -2, "__concat");
1227
1228 // protected calls, to ensure associated keeper is always released even in case of error
1229 // all function are the protected call wrapper, where the actual operation is provided as upvalue
1230 // note that this kind of thing can break function lookup as we use the function pointer here and there
1231
1232 lua_pushcfunction( L, LG_linda_send);
1233 lua_pushcclosure( L, LG_linda_protected_call, 1);
1234 lua_setfield( L, -2, "send");
1235
1236 lua_pushcfunction( L, LG_linda_receive);
1237 lua_pushcclosure( L, LG_linda_protected_call, 1);
1238 lua_setfield( L, -2, "receive");
1239
1240 lua_pushcfunction( L, LG_linda_limit);
1241 lua_pushcclosure( L, LG_linda_protected_call, 1);
1242 lua_setfield( L, -2, "limit");
1243
1244 lua_pushcfunction( L, LG_linda_set);
1245 lua_pushcclosure( L, LG_linda_protected_call, 1);
1246 lua_setfield( L, -2, "set");
1247
1248 lua_pushcfunction( L, LG_linda_count);
1249 lua_pushcclosure( L, LG_linda_protected_call, 1);
1250 lua_setfield( L, -2, "count");
1251
1252 lua_pushcfunction( L, LG_linda_get);
1253 lua_pushcclosure( L, LG_linda_protected_call, 1);
1254 lua_setfield( L, -2, "get");
1255
1256 lua_pushcfunction( L, LG_linda_cancel);
1257 lua_setfield( L, -2, "cancel");
1258
1259 lua_pushcfunction( L, LG_linda_deep);
1260 lua_setfield( L, -2, "deep");
1261
1262 lua_pushcfunction( L, LG_linda_dump);
1263 lua_pushcclosure( L, LG_linda_protected_call, 1);
1264 lua_setfield( L, -2, "dump");
1265
1266 // some constants
1267 lua_pushliteral( L, BATCH_SENTINEL);
1268 lua_setfield(L, -2, "batched");
1269
1270 push_unique_key( L, NIL_SENTINEL);
1271 lua_setfield(L, -2, "null");
1272
1273 luaG_pushdeepversion( L);
1274 STACK_END( L, 2);
1275 return NULL;
1276 }
1277
1278 case eDO_module:
1279 // linda is a special case because we know lanes must be loaded from the main lua state
1280 // to be able to ever get here, so we know it will remain loaded as long a the main state is around
1281 // in other words, forever.
1282 default:
1283 {
1284 return NULL;
1285 }
1286 }
1287}
1288
1289/*
1290 * ud = lanes.linda( [name[,group]])
1291 *
1292 * returns a linda object, or raises an error if creation failed
1293 */
1294LUAG_FUNC( linda)
1295{
1296 int const top = lua_gettop( L);
1297 luaL_argcheck( L, top <= 2, top, "too many arguments");
1298 if( top == 1)
1299 {
1300 int const t = lua_type( L, 1);
1301 luaL_argcheck( L, t == LUA_TSTRING || t == LUA_TNUMBER, 1, "wrong parameter (should be a string or a number)");
1302 }
1303 else if( top == 2)
1304 {
1305 luaL_checktype( L, 1, LUA_TSTRING);
1306 luaL_checktype( L, 2, LUA_TNUMBER);
1307 }
1308 return luaG_newdeepuserdata( L, linda_id);
1309}
1310
1311/*
1312 * ###############################################################################################
1313 * ########################################## Finalizer ########################################## 305 * ########################################## Finalizer ##########################################
1314 * ############################################################################################### 306 * ###############################################################################################
1315 */ 307 */
@@ -3000,6 +1992,7 @@ LUAG_FUNC( wakeup_conv )
3000 * ############################################################################################### 1992 * ###############################################################################################
3001 */ 1993 */
3002 1994
1995extern int LG_linda( lua_State* L);
3003static const struct luaL_Reg lanes_functions [] = { 1996static const struct luaL_Reg lanes_functions [] = {
3004 {"linda", LG_linda}, 1997 {"linda", LG_linda},
3005 {"now_secs", LG_now_secs}, 1998 {"now_secs", LG_now_secs},
@@ -3012,7 +2005,6 @@ static const struct luaL_Reg lanes_functions [] = {
3012 {NULL, NULL} 2005 {NULL, NULL}
3013}; 2006};
3014 2007
3015
3016/* 2008/*
3017 * One-time initializations 2009 * One-time initializations
3018 * settings table it at position 1 on the stack 2010 * settings table it at position 1 on the stack