aboutsummaryrefslogtreecommitdiff
path: root/src/lanes.c
diff options
context:
space:
mode:
authorBenoit Germain <bnt.germain@gmail.com>2011-03-01 21:12:56 +0100
committerBenoit Germain <bnt.germain@gmail.com>2011-03-01 21:12:56 +0100
commite97adefde985e30fe31ffa036c74ffb0ce10ca26 (patch)
tree4465decc5f07e7399f9760d6302dbaf7163120d7 /src/lanes.c
parent974aa4343cf900938b5d357d10798d91faf60f5a (diff)
downloadlanes-e97adefde985e30fe31ffa036c74ffb0ce10ca26.tar.gz
lanes-e97adefde985e30fe31ffa036c74ffb0ce10ca26.tar.bz2
lanes-e97adefde985e30fe31ffa036c74ffb0ce10ca26.zip
* fixed potential crash at application shutdown when calling lua_close() on a killed thread's VM.
* exposed cancel_test() in the lanes to enable manual testing for cancellation requests. * removed kludgy {globals={threadName}} support, replaced with a new function set_debug_threadname().
Diffstat (limited to 'src/lanes.c')
-rw-r--r--src/lanes.c238
1 files changed, 153 insertions, 85 deletions
diff --git a/src/lanes.c b/src/lanes.c
index 8b62532..e5a2987 100644
--- a/src/lanes.c
+++ b/src/lanes.c
@@ -119,8 +119,6 @@ struct s_lane {
119 // 119 //
120 // M: sub-thread OS thread 120 // M: sub-thread OS thread
121 // S: not used 121 // S: not used
122
123 char threadName[64]; //Optional, for debugging and such. owerflowable by a strcpy.
124 122
125 lua_State *L; 123 lua_State *L;
126 // 124 //
@@ -132,6 +130,10 @@ struct s_lane {
132 // M: sets to PENDING (before launching) 130 // M: sets to PENDING (before launching)
133 // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED 131 // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED
134 132
133 volatile SIGNAL_T waiting_on;
134 //
135 // When status is WAITING, points on the linda's signal the thread waits on, else NULL
136
135 volatile bool_t cancel_request; 137 volatile bool_t cancel_request;
136 // 138 //
137 // M: sets to FALSE, flags TRUE for cancel request 139 // M: sets to FALSE, flags TRUE for cancel request
@@ -333,17 +335,21 @@ LUAG_FUNC( linda_send)
333 { 335 {
334 prev_status = s->status; 336 prev_status = s->status;
335 s->status = WAITING; 337 s->status = WAITING;
338 ASSERT_L( s->waiting_on == NULL);
339 s->waiting_on = &linda->read_happened;
336 } 340 }
337 if( !SIGNAL_WAIT( &linda->read_happened, &K->lock_, timeout)) 341 if( !SIGNAL_WAIT( &linda->read_happened, &K->lock_, timeout))
338 { 342 {
339 if( s) 343 if( s)
340 { 344 {
345 s->waiting_on = NULL;
341 s->status = prev_status; 346 s->status = prev_status;
342 } 347 }
343 break; 348 break;
344 } 349 }
345 if( s) 350 if( s)
346 { 351 {
352 s->waiting_on = NULL;
347 s->status = prev_status; 353 s->status = prev_status;
348 } 354 }
349 } 355 }
@@ -450,17 +456,21 @@ LUAG_FUNC( linda_receive)
450 { 456 {
451 prev_status = s->status; 457 prev_status = s->status;
452 s->status = WAITING; 458 s->status = WAITING;
459 ASSERT_L( s->waiting_on == NULL);
460 s->waiting_on = &linda->write_happened;
453 } 461 }
454 if( !SIGNAL_WAIT( &linda->write_happened, &K->lock_, timeout)) 462 if( !SIGNAL_WAIT( &linda->write_happened, &K->lock_, timeout))
455 { 463 {
456 if( s) 464 if( s)
457 { 465 {
466 s->waiting_on = NULL;
458 s->status = prev_status; 467 s->status = prev_status;
459 } 468 }
460 break; 469 break;
461 } 470 }
462 if( s) 471 if( s)
463 { 472 {
473 s->waiting_on = NULL;
464 s->status = prev_status; 474 s->status = prev_status;
465 } 475 }
466 } 476 }
@@ -870,28 +880,33 @@ volatile DEEP_PRELUDE *timer_deep; // = NULL
870/* 880/*
871* Process end; cancel any still free-running threads 881* Process end; cancel any still free-running threads
872*/ 882*/
873static void selfdestruct_atexit( void ) { 883static void selfdestruct_atexit( void )
874 884{
875 if (selfdestruct_first == SELFDESTRUCT_END) return; // no free-running threads 885 if (selfdestruct_first == SELFDESTRUCT_END) return; // no free-running threads
876 886
877 // Signal _all_ still running threads to exit 887 // Signal _all_ still running threads to exit (including the timer thread)
878 // 888 //
879 MUTEX_LOCK( &selfdestruct_cs ); 889 MUTEX_LOCK( &selfdestruct_cs );
880 { 890 {
881 struct s_lane *s= selfdestruct_first; 891 struct s_lane *s= selfdestruct_first;
882 while( s != SELFDESTRUCT_END ) { 892 while( s != SELFDESTRUCT_END )
883 s->cancel_request= TRUE; 893 {
884 s= s->selfdestruct_next; 894 // attempt a regular unforced cancel with a small timeout
895 bool_t cancelled = thread_cancel( s, 0.0001, FALSE);
896 // if we failed, and we know the thread is waiting on a linda
897 if( cancelled == FALSE && s->status == WAITING && s->waiting_on != NULL)
898 {
899 // signal the linda the wake up the thread so that it can react to the cancel query
900 // let us hope we never land here with a pointer on a linda that has been destroyed...
901 SIGNAL_T waiting_on = s->waiting_on;
902 s->waiting_on = NULL;
903 SIGNAL_ALL( waiting_on);
904 }
905 s = s->selfdestruct_next;
885 } 906 }
886 } 907 }
887 MUTEX_UNLOCK( &selfdestruct_cs ); 908 MUTEX_UNLOCK( &selfdestruct_cs );
888 909
889 // Tell the timer thread to check it's cancel request
890 {
891 struct s_Linda *td = timer_deep->deep;
892 SIGNAL_ALL( &td->write_happened);
893 }
894
895 // When noticing their cancel, the lanes will remove themselves from 910 // When noticing their cancel, the lanes will remove themselves from
896 // the selfdestruct chain. 911 // the selfdestruct chain.
897 912
@@ -914,15 +929,37 @@ static void selfdestruct_atexit( void ) {
914 // -- AKa 25-Oct-2008 929 // -- AKa 25-Oct-2008
915 // 930 //
916 #ifndef ATEXIT_WAIT_SECS 931 #ifndef ATEXIT_WAIT_SECS
917 # define ATEXIT_WAIT_SECS (0.1) 932 # define ATEXIT_WAIT_SECS (0.25)
918 #endif 933 #endif
919 { 934 {
920 double t_until= now_secs() + ATEXIT_WAIT_SECS; 935 double t_until= now_secs() + ATEXIT_WAIT_SECS;
921 936
922 while( selfdestruct_first != SELFDESTRUCT_END ) { 937 while( selfdestruct_first != SELFDESTRUCT_END )
938 {
923 YIELD(); // give threads time to act on their cancel 939 YIELD(); // give threads time to act on their cancel
924 940 {
925 if (now_secs() >= t_until) break; 941 // count the number of cancelled thread that didn't have the time to act yet
942 int n = 0;
943 double t_now = 0.0;
944 MUTEX_LOCK( &selfdestruct_cs );
945 {
946 struct s_lane *s = selfdestruct_first;
947 while( s != SELFDESTRUCT_END)
948 {
949 if( s->cancel_request)
950 ++ n;
951 s = s->selfdestruct_next;
952 }
953 }
954 MUTEX_UNLOCK( &selfdestruct_cs );
955 // if timeout elapsed, or we know all threads have acted, stop waiting
956 t_now = now_secs();
957 if( n == 0 || ( t_now >= t_until))
958 {
959 DEBUGEXEC(fprintf( stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, ATEXIT_WAIT_SECS - (t_until - t_now)));
960 break;
961 }
962 }
926 } 963 }
927 } 964 }
928#endif 965#endif
@@ -951,18 +988,21 @@ static void selfdestruct_atexit( void ) {
951 // 988 //
952 DEBUGEXEC(fprintf( stderr, "Left %d lane(s) with cancel request at process end.\n", n )); 989 DEBUGEXEC(fprintf( stderr, "Left %d lane(s) with cancel request at process end.\n", n ));
953#else 990#else
991 // first thing we did was to raise the linda signals the threads were waiting on (if any)
992 // therefore, any well-behaved thread should be in CANCELLED state
993 // these are not running, and the state can be closed
954 n=0; 994 n=0;
955 MUTEX_LOCK( &selfdestruct_cs ); 995 MUTEX_LOCK( &selfdestruct_cs );
956 { 996 {
957 struct s_lane *s= selfdestruct_first; 997 struct s_lane *s= selfdestruct_first;
958 while( s != SELFDESTRUCT_END ) { 998 while( s != SELFDESTRUCT_END)
999 {
959 struct s_lane *next_s= s->selfdestruct_next; 1000 struct s_lane *next_s= s->selfdestruct_next;
960 s->selfdestruct_next= NULL; // detach from selfdestruct chain 1001 s->selfdestruct_next= NULL; // detach from selfdestruct chain
961 1002 THREAD_KILL( &s->thread);
962 THREAD_KILL( &s->thread ); 1003 // NO lua_close() in this case because we don't know where execution of the state was interrupted
963 lua_close(s->L); 1004 free( s);
964 free(s); 1005 s = next_s;
965 s= next_s;
966 n++; 1006 n++;
967 } 1007 }
968 selfdestruct_first= SELFDESTRUCT_END; 1008 selfdestruct_first= SELFDESTRUCT_END;
@@ -1022,6 +1062,19 @@ static void cancel_hook( lua_State *L, lua_Debug *ar ) {
1022 1062
1023 1063
1024//--- 1064//---
1065// bool= cancel_test()
1066//
1067// Available inside the global namespace of lanes
1068// returns a boolean saying if a cancel request is pending
1069//
1070LUAG_FUNC( cancel_test)
1071{
1072 bool_t test = cancel_test( L);
1073 lua_pushboolean( L, test);
1074 return 1;
1075}
1076
1077//---
1025// = _single( [cores_uint=1] ) 1078// = _single( [cores_uint=1] )
1026// 1079//
1027// Limits the process to use only 'cores' CPU cores. To be used for performance 1080// Limits the process to use only 'cores' CPU cores. To be used for performance
@@ -1131,12 +1184,12 @@ typedef struct tagTHREADNAME_INFO
1131} THREADNAME_INFO; 1184} THREADNAME_INFO;
1132#pragma pack(pop) 1185#pragma pack(pop)
1133 1186
1134void SetThreadName( DWORD dwThreadID, char* threadName) 1187void SetThreadName( DWORD dwThreadID, char const *_threadName)
1135{ 1188{
1136 THREADNAME_INFO info; 1189 THREADNAME_INFO info;
1137 Sleep(10); 1190 Sleep(10);
1138 info.dwType = 0x1000; 1191 info.dwType = 0x1000;
1139 info.szName = threadName; 1192 info.szName = _threadName;
1140 info.dwThreadID = dwThreadID; 1193 info.dwThreadID = dwThreadID;
1141 info.dwFlags = 0; 1194 info.dwFlags = 0;
1142 1195
@@ -1150,6 +1203,22 @@ void SetThreadName( DWORD dwThreadID, char* threadName)
1150} 1203}
1151#endif 1204#endif
1152 1205
1206LUAG_FUNC( set_debug_threadname)
1207{
1208 char const *threadName;
1209 luaL_checktype( L, -1, LUA_TSTRING);
1210 threadName = lua_tostring( L, -1);
1211
1212#if defined PLATFORM_WIN32 && !defined __GNUC__
1213 // to see thead name in Visual Studio C debugger
1214 SetThreadName(-1, threadName);
1215#endif
1216
1217 // to see VM name in Decoda debugger Virtual Machine window
1218 lua_setglobal( L, "decoda_name");
1219
1220 return 0;
1221}
1153 1222
1154//--- 1223//---
1155#if (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) 1224#if (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC)
@@ -1162,11 +1231,6 @@ void SetThreadName( DWORD dwThreadID, char* threadName)
1162 int rc, rc2; 1231 int rc, rc2;
1163 lua_State *L= s->L; 1232 lua_State *L= s->L;
1164 1233
1165
1166#if defined PLATFORM_WIN32 && !defined __GNUC__
1167 SetThreadName(-1, s->threadName);
1168#endif
1169
1170 s->status= RUNNING; // PENDING -> RUNNING 1234 s->status= RUNNING; // PENDING -> RUNNING
1171 1235
1172 // Tie "set_finalizer()" to the state 1236 // Tie "set_finalizer()" to the state
@@ -1174,6 +1238,16 @@ void SetThreadName( DWORD dwThreadID, char* threadName)
1174 lua_pushcfunction( L, LG_set_finalizer ); 1238 lua_pushcfunction( L, LG_set_finalizer );
1175 lua_setglobal( L, "set_finalizer" ); 1239 lua_setglobal( L, "set_finalizer" );
1176 1240
1241 // Tie "set_debug_threadname()" to the state
1242 //
1243 lua_pushcfunction( L, LG_set_debug_threadname);
1244 lua_setglobal( L, "set_debug_threadname" );
1245
1246 // Tie "cancel_test()" to the state
1247 //
1248 lua_pushcfunction( L, LG_cancel_test);
1249 lua_setglobal( L, "cancel_test" );
1250
1177#ifdef ERROR_FULL_STACK 1251#ifdef ERROR_FULL_STACK
1178 STACK_GROW( L, 1 ); 1252 STACK_GROW( L, 1 );
1179 lua_pushcfunction( L, lane_error ); 1253 lua_pushcfunction( L, lane_error );
@@ -1240,7 +1314,7 @@ void SetThreadName( DWORD dwThreadID, char* threadName)
1240 // 1314 //
1241 lua_newtable(L); 1315 lua_newtable(L);
1242 } 1316 }
1243 1317 s->waiting_on = NULL; // just in case
1244 if (s->selfdestruct_next != NULL) { 1318 if (s->selfdestruct_next != NULL) {
1245 // We're a free-running thread and no-one's there to clean us up. 1319 // We're a free-running thread and no-one's there to clean us up.
1246 // 1320 //
@@ -1276,7 +1350,6 @@ void SetThreadName( DWORD dwThreadID, char* threadName)
1276 MUTEX_UNLOCK( &s->done_lock_ ); 1350 MUTEX_UNLOCK( &s->done_lock_ );
1277 #endif 1351 #endif
1278 } 1352 }
1279
1280 return 0; // ignored 1353 return 0; // ignored
1281} 1354}
1282 1355
@@ -1295,7 +1368,6 @@ LUAG_FUNC( thread_new )
1295 lua_State *L2; 1368 lua_State *L2;
1296 struct s_lane *s; 1369 struct s_lane *s;
1297 struct s_lane **ud; 1370 struct s_lane **ud;
1298 const char *threadName = 0;
1299 1371
1300 const char *libs= lua_tostring( L, 2 ); 1372 const char *libs= lua_tostring( L, 2 );
1301 uint_t cs= luaG_optunsigned( L, 3,0); 1373 uint_t cs= luaG_optunsigned( L, 3,0);
@@ -1330,10 +1402,7 @@ LUAG_FUNC( thread_new )
1330 luaL_error( L, "Expected table, got %s", luaG_typename(L,glob) ); 1402 luaL_error( L, "Expected table, got %s", luaG_typename(L,glob) );
1331 1403
1332 lua_pushvalue( L, glob ); 1404 lua_pushvalue( L, glob );
1333 lua_pushstring( L, "threadName"); 1405
1334 lua_gettable( L, -2);
1335 threadName = lua_tostring( L, -1);
1336 lua_pop( L, 1);
1337 luaG_inter_move( L, L2, 1); // moves the table to L2 1406 luaG_inter_move( L, L2, 1); // moves the table to L2
1338 1407
1339 // L2 [-1]: table of globals 1408 // L2 [-1]: table of globals
@@ -1398,11 +1467,9 @@ LUAG_FUNC( thread_new )
1398 //memset( s, 0, sizeof(struct s_lane) ); 1467 //memset( s, 0, sizeof(struct s_lane) );
1399 s->L= L2; 1468 s->L= L2;
1400 s->status= PENDING; 1469 s->status= PENDING;
1470 s->waiting_on = NULL;
1401 s->cancel_request= FALSE; 1471 s->cancel_request= FALSE;
1402 1472
1403 threadName = threadName ? threadName : "<unnamed thread>";
1404 strcpy(s->threadName, threadName);
1405
1406#if !( (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) || (defined PTHREAD_TIMEDJOIN) ) 1473#if !( (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) || (defined PTHREAD_TIMEDJOIN) )
1407 MUTEX_INIT( &s->done_lock_ ); 1474 MUTEX_INIT( &s->done_lock_ );
1408 SIGNAL_INIT( &s->done_signal_ ); 1475 SIGNAL_INIT( &s->done_signal_ );
@@ -1484,7 +1551,7 @@ LUAG_FUNC( thread_gc )
1484#if 0 1551#if 0
1485 lua_close( s->L ); 1552 lua_close( s->L );
1486 s->L = 0; 1553 s->L = 0;
1487#else 1554#else // 0
1488 DEBUGEXEC(fprintf( stderr, "** Joining with a killed thread (needs testing) **" )); 1555 DEBUGEXEC(fprintf( stderr, "** Joining with a killed thread (needs testing) **" ));
1489#if (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) || (defined PTHREAD_TIMEDJOIN) 1556#if (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) || (defined PTHREAD_TIMEDJOIN)
1490 THREAD_WAIT( &s->thread, -1 ); 1557 THREAD_WAIT( &s->thread, -1 );
@@ -1492,7 +1559,7 @@ LUAG_FUNC( thread_gc )
1492 THREAD_WAIT( &s->thread, &s->done_signal_, &s->done_lock_, &s->status, -1 ); 1559 THREAD_WAIT( &s->thread, &s->done_signal_, &s->done_lock_, &s->status, -1 );
1493#endif 1560#endif
1494 DEBUGEXEC(fprintf( stderr, "** Joined ok **" )); 1561 DEBUGEXEC(fprintf( stderr, "** Joined ok **" ));
1495#endif 1562#endif // 0
1496 } 1563 }
1497 else if( s->L) 1564 else if( s->L)
1498 { 1565 {
@@ -1529,54 +1596,55 @@ LUAG_FUNC( thread_gc )
1529// managed to cancel it. 1596// managed to cancel it.
1530// false if the cancellation timed out, or a kill was needed. 1597// false if the cancellation timed out, or a kill was needed.
1531// 1598//
1532LUAG_FUNC( thread_cancel ) 1599static bool_t thread_cancel( struct s_lane *s, double secs, bool_t force)
1533{ 1600{
1534 struct s_lane *s= lua_toLane(L,1); 1601 bool_t done= TRUE;
1535 double secs= 0.0; 1602 // We can read 's->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
1536 uint_t force_i=2; 1603 //
1537 bool_t force, done= TRUE; 1604 if( s->status < DONE)
1538 1605 {
1539 if (lua_isnumber(L,2)) { 1606 s->cancel_request = TRUE; // it's now signalled to stop
1540 secs= lua_tonumber(L,2); 1607 done=
1541 force_i++;
1542 } else if (lua_isnil(L,2))
1543 force_i++;
1544
1545 force= lua_toboolean(L,force_i); // FALSE if nothing there
1546
1547 // We can read 's->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
1548 //
1549 if (s->status < DONE) {
1550 s->cancel_request= TRUE; // it's now signalled to stop
1551
1552 done= thread_cancel( s, secs, force );
1553 }
1554
1555 lua_pushboolean( L, done );
1556 return 1;
1557}
1558
1559static bool_t thread_cancel( struct s_lane *s, double secs, bool_t force )
1560{
1561 bool_t done=
1562#if (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) || (defined PTHREAD_TIMEDJOIN) 1608#if (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) || (defined PTHREAD_TIMEDJOIN)
1563 THREAD_WAIT( &s->thread, secs ); 1609 THREAD_WAIT( &s->thread, secs);
1564#else 1610#else
1565 THREAD_WAIT( &s->thread, &s->done_signal_, &s->done_lock_, &s->status, secs ); 1611 THREAD_WAIT( &s->thread, &s->done_signal_, &s->done_lock_, &s->status, secs);
1566#endif 1612#endif
1567 1613
1568 if ((!done) && force) { 1614 if ((!done) && force)
1569 // Killing is asynchronous; we _will_ wait for it to be done at 1615 {
1570 // GC, to make sure the data structure can be released (alternative 1616 // Killing is asynchronous; we _will_ wait for it to be done at
1571 // would be use of "cancellation cleanup handlers" that at least 1617 // GC, to make sure the data structure can be released (alternative
1572 // PThread seems to have). 1618 // would be use of "cancellation cleanup handlers" that at least
1573 // 1619 // PThread seems to have).
1574 THREAD_KILL( &s->thread ); 1620 //
1575 s->mstatus= KILLED; // mark 'gc' to wait for it 1621 THREAD_KILL( &s->thread);
1576 } 1622 s->mstatus= KILLED; // mark 'gc' to wait for it
1577 return done; 1623 }
1624 }
1625 return done;
1578} 1626}
1579 1627
1628LUAG_FUNC( thread_cancel)
1629{
1630 struct s_lane *s= lua_toLane(L,1);
1631 double secs= 0.0;
1632 uint_t force_i=2;
1633 bool_t force, done= TRUE;
1634
1635 if (lua_isnumber(L,2)) {
1636 secs= lua_tonumber(L,2);
1637 force_i++;
1638 } else if (lua_isnil(L,2))
1639 force_i++;
1640
1641 force= lua_toboolean(L,force_i); // FALSE if nothing there
1642
1643 done = thread_cancel( s, secs, force);
1644
1645 lua_pushboolean( L, done);
1646 return 1;
1647}
1580 1648
1581//--- 1649//---
1582// str= thread_status( lane ) 1650// str= thread_status( lane )