From 336935ed4920e1359ab30261a5a028a70d379241 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Fri, 29 Mar 2024 16:21:56 +0100 Subject: C++ migration: parallelize lane setup and OS thread warmup --- src/lanes.cpp | 241 ++++++++++++++++++++++++++++++---------------------- src/lanes_private.h | 5 +- src/universe.cpp | 2 +- 3 files changed, 146 insertions(+), 102 deletions(-) (limited to 'src') diff --git a/src/lanes.cpp b/src/lanes.cpp index 079b880..8a76217 100644 --- a/src/lanes.cpp +++ b/src/lanes.cpp @@ -410,6 +410,8 @@ static void selfdestruct_add(Lane* lane_) lane_->U->selfdestruct_first = lane_; } +// ############################################################################################### + /* * A free-running lane has ended; remove it from selfdestruct chain */ @@ -443,10 +445,12 @@ static bool selfdestruct_remove(Lane* lane_) return found; } +// ############################################################################################### + /* * Process end; cancel any still free-running threads */ -static int selfdestruct_gc( lua_State* L) +static int universe_gc( lua_State* L) { Universe* const U{ lua_tofulluserdata(L, 1) }; @@ -595,6 +599,7 @@ static int selfdestruct_gc( lua_State* L) return 0; } +// ############################################################################################### //--- // = _single( [cores_uint=1] ) @@ -624,6 +629,7 @@ LUAG_FUNC( set_singlethreaded) #endif } +// ############################################################################################### /* * str= lane_error( error_val|str ) @@ -885,78 +891,85 @@ static void thread_cleanup_handler(void* opaque) static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) { - Lane* const lane{ (Lane*) vs }; + Lane* lane{ (Lane*) vs }; lua_State* const L{ lane->L }; - // Called with the lane function and arguments on the stack - int const nargs{ lua_gettop(L) - 1 }; - DEBUGSPEW_CODE(Universe* U = universe_get(L)); - THREAD_MAKE_ASYNCH_CANCELLABLE(); - THREAD_CLEANUP_PUSH(thread_cleanup_handler, lane); - lane->status = RUNNING; // PENDING -> RUNNING - - // Tie "set_finalizer()" to the state - lua_pushcfunction(L, LG_set_finalizer); - populate_func_lookup_table(L, -1, "set_finalizer"); - lua_setglobal(L, "set_finalizer"); - - // Tie "set_debug_threadname()" to the state - // But don't register it in the lookup database because of the Lane pointer upvalue - lua_pushlightuserdata(L, lane); - lua_pushcclosure(L, LG_set_debug_threadname, 1); - lua_setglobal(L, "set_debug_threadname"); - - // Tie "cancel_test()" to the state - lua_pushcfunction(L, LG_cancel_test); - populate_func_lookup_table(L, -1, "cancel_test"); - lua_setglobal(L, "cancel_test"); - - // this could be done in lane_new before the lane body function is pushed on the stack to avoid unnecessary stack slot shifting around + // wait until the launching thread has finished preparing L + lane->m_ready.wait(); + int rc{ LUA_ERRRUN }; + if (lane->status == PENDING) // nothing wrong happened during preparation, we can work + { + // At this point, the lane function and arguments are on the stack + int const nargs{ lua_gettop(L) - 1 }; + DEBUGSPEW_CODE(Universe* U = universe_get(L)); + THREAD_MAKE_ASYNCH_CANCELLABLE(); + THREAD_CLEANUP_PUSH(thread_cleanup_handler, lane); + lane->status = RUNNING; // PENDING -> RUNNING + + // Tie "set_finalizer()" to the state + lua_pushcfunction(L, LG_set_finalizer); + populate_func_lookup_table(L, -1, "set_finalizer"); + lua_setglobal(L, "set_finalizer"); + + // Tie "set_debug_threadname()" to the state + // But don't register it in the lookup database because of the Lane pointer upvalue + lua_pushlightuserdata(L, lane); + lua_pushcclosure(L, LG_set_debug_threadname, 1); + lua_setglobal(L, "set_debug_threadname"); + + // Tie "cancel_test()" to the state + lua_pushcfunction(L, LG_cancel_test); + populate_func_lookup_table(L, -1, "cancel_test"); + lua_setglobal(L, "cancel_test"); + + // this could be done in lane_new before the lane body function is pushed on the stack to avoid unnecessary stack slot shifting around #if ERROR_FULL_STACK - // Tie "set_error_reporting()" to the state - lua_pushcfunction(L, LG_set_error_reporting); - populate_func_lookup_table(L, -1, "set_error_reporting"); - lua_setglobal(L, "set_error_reporting"); - - STACK_GROW(L, 1); - lua_pushcfunction(L, lane_error); // func args handler - lua_insert(L, 1); // handler func args + // Tie "set_error_reporting()" to the state + lua_pushcfunction(L, LG_set_error_reporting); + populate_func_lookup_table(L, -1, "set_error_reporting"); + lua_setglobal(L, "set_error_reporting"); + + STACK_GROW(L, 1); + lua_pushcfunction(L, lane_error); // func args handler + lua_insert(L, 1); // handler func args #endif // ERROR_FULL_STACK - int rc{ lua_pcall(L, nargs, LUA_MULTRET, ERROR_FULL_STACK) }; // retvals|err + rc = lua_pcall(L, nargs, LUA_MULTRET, ERROR_FULL_STACK); // retvals|err #if ERROR_FULL_STACK - lua_remove(L, 1); // retvals|error -# endif // ERROR_FULL_STACK + lua_remove(L, 1); // retvals|error +#endif // ERROR_FULL_STACK - // in case of error and if it exists, fetch stack trace from registry and push it - push_stack_trace(L, rc, 1); // retvals|error [trace] + // in case of error and if it exists, fetch stack trace from registry and push it + push_stack_trace(L, rc, 1); // retvals|error [trace] - DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "Lane %p body: %s (%s)\n" INDENT_END, L, get_errcode_name(rc), CANCEL_ERROR.equals(L, 1) ? "cancelled" : lua_typename(L, lua_type(L, 1)))); - // STACK_DUMP(L); - // Call finalizers, if the script has set them up. - // - int rc2{ run_finalizers(L, rc) }; - DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "Lane %p finalizer: %s\n" INDENT_END, L, get_errcode_name(rc2))); - if (rc2 != LUA_OK) // Error within a finalizer! - { - // the finalizer generated an error, and left its own error message [and stack trace] on the stack - rc = rc2; // we're overruling the earlier script error or normal return - } - lane->waiting_on = nullptr; // just in case - if (selfdestruct_remove(lane)) // check and remove (under lock!) - { - // We're a free-running thread and no-one's there to clean us up. + DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "Lane %p body: %s (%s)\n" INDENT_END, L, get_errcode_name(rc), CANCEL_ERROR.equals(L, 1) ? "cancelled" : lua_typename(L, lua_type(L, 1)))); + // STACK_DUMP(L); + // Call finalizers, if the script has set them up. // - lua_close(lane->L); + int rc2{ run_finalizers(L, rc) }; + DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "Lane %p finalizer: %s\n" INDENT_END, L, get_errcode_name(rc2))); + if (rc2 != LUA_OK) // Error within a finalizer! + { + // the finalizer generated an error, and left its own error message [and stack trace] on the stack + rc = rc2; // we're overruling the earlier script error or normal return + } + lane->waiting_on = nullptr; // just in case + if (selfdestruct_remove(lane)) // check and remove (under lock!) + { + // We're a free-running thread and no-one's there to clean us up. + // + lua_close(lane->L); - lane->U->selfdestruct_cs.lock(); - // done with lua_close(), terminal shutdown sequence may proceed - --lane->U->selfdestructing_count; - lane->U->selfdestruct_cs.unlock(); + lane->U->selfdestruct_cs.lock(); + // done with lua_close(), terminal shutdown sequence may proceed + --lane->U->selfdestructing_count; + lane->U->selfdestruct_cs.unlock(); - delete lane; + delete lane; + lane = nullptr; + } } - else + if (lane) { // leave results (1..top) or error message + stack trace (1..2) on the stack - master will copy them @@ -1044,19 +1057,19 @@ static constexpr UniqueKey GCCB_KEY{ 0xcfb1f046ef074e88ull }; // // Upvalues: metatable to use for 'lane_ud' // -LUAG_FUNC( lane_new) +LUAG_FUNC(lane_new) { - char const* libs_str = lua_tostring(L, 2); + char const* const libs_str{ lua_tostring(L, 2) }; bool const have_priority{ !lua_isnoneornil(L, 3) }; - int const priority = have_priority ? (int) lua_tointeger(L, 3) : THREAD_PRIO_DEFAULT; - int const globals_idx = lua_isnoneornil(L, 4) ? 0 : 4; - int const package_idx = lua_isnoneornil(L, 5) ? 0 : 5; - int const required_idx = lua_isnoneornil(L, 6) ? 0 : 6; - int const gc_cb_idx = lua_isnoneornil(L, 7) ? 0 : 7; - -#define FIXED_ARGS 7 - int const nargs = lua_gettop(L) - FIXED_ARGS; - Universe* const U = universe_get( L); + int const priority{ have_priority ? (int) lua_tointeger(L, 3) : THREAD_PRIO_DEFAULT }; + int const globals_idx{ lua_isnoneornil(L, 4) ? 0 : 4 }; + int const package_idx{ lua_isnoneornil(L, 5) ? 0 : 5 }; + int const required_idx{ lua_isnoneornil(L, 6) ? 0 : 6 }; + int const gc_cb_idx{ lua_isnoneornil(L, 7) ? 0 : 7 }; + + static constexpr int FIXED_ARGS{ 7 }; + int const nargs{ lua_gettop(L) - FIXED_ARGS }; + Universe* const U{ universe_get(L) }; ASSERT_L( nargs >= 0); // public Lanes API accepts a generic range -3/+3 @@ -1072,7 +1085,42 @@ LUAG_FUNC( lane_new) DEBUGSPEW_CODE( ++ U->debugspew_indent_depth); // populate with selected libraries at the same time - lua_State* const L2{ luaG_newstate(U, L, libs_str) }; // L // L2 + lua_State* const L2{ luaG_newstate(U, L, libs_str) }; // L // L2 + + // 'lane' is allocated from heap, not Lua, since its life span may surpass the handle's (if free running thread) + Lane* const lane{ new (U) Lane{ U, L2 } }; + if (lane == nullptr) + { + return luaL_error(L, "could not create lane: out of memory"); + } + + // would prefer std::experimental::scope, but it's not universally available + struct OnExit + { + Lane* m_lane{ nullptr }; + OnExit(Lane* lane_) : m_lane{ lane_ } {} + ~OnExit() + { + if (m_lane) + { + // leave a single cancel_error on the stack for the caller + lua_settop(m_lane->L, 0); + CANCEL_ERROR.pushKey(m_lane->L); +#if THREADWAIT_METHOD == THREADWAIT_CONDVAR + MUTEX_LOCK(&m_lane->done_lock); +#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR + m_lane->status = CANCELLED; +#if THREADWAIT_METHOD == THREADWAIT_CONDVAR + MUTEX_UNLOCK(&m_lane->done_lock); +#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR + // unblock the thread so that it can terminate gracefully + m_lane->m_ready.count_down(); + } + } + } onExit{ lane }; + // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation + DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END)); + THREAD_CREATE(&lane->thread, lane_main, lane, priority); STACK_GROW( L2, nargs + 3); // STACK_CHECK_START_REL(L2, 0); @@ -1106,7 +1154,7 @@ LUAG_FUNC( lane_new) return luaL_error(L, "expected required module list as a table, got %s", luaL_typename(L, required_idx)); } - lua_pushnil( L); // func libs priority globals package required gc_cb [... args ...] nil + lua_pushnil(L); // func libs priority globals package required gc_cb [... args ...] nil while( lua_next(L, required_idx) != 0) // func libs priority globals package required gc_cb [... args ...] n "modname" { if (lua_type(L, -1) != LUA_TSTRING || lua_type(L, -2) != LUA_TNUMBER || lua_tonumber(L, -2) != nbRequired) @@ -1133,7 +1181,7 @@ LUAG_FUNC( lane_new) if (lua_pcall( L2, 1, 1, 0) != LUA_OK) // ret/errcode { // propagate error to main state if any - luaG_inter_move( U, L2, L, 1, eLM_LaneBody); // func libs priority globals package required gc_cb [... args ...] n "modname" error + luaG_inter_move(U, L2, L, 1, eLM_LaneBody); // func libs priority globals package required gc_cb [... args ...] n "modname" error raise_lua_error(L); } // after requiring the module, register the functions it exported in our name<->function database @@ -1143,11 +1191,11 @@ LUAG_FUNC( lane_new) } lua_pop(L, 1); // func libs priority globals package required gc_cb [... args ...] n ++ nbRequired; - } // func libs priority globals package required gc_cb [... args ...] + } // func libs priority globals package required gc_cb [... args ...] DEBUGSPEW_CODE( -- U->debugspew_indent_depth); } STACK_CHECK(L, 0); - STACK_CHECK( L2, 0); // + STACK_CHECK(L2, 0); // // Appending the specified globals to the global environment // *after* stdlibs have been loaded and modules required, in case we transfer references to native functions they exposed... @@ -1161,16 +1209,16 @@ LUAG_FUNC( lane_new) } DEBUGSPEW_CODE( ++ U->debugspew_indent_depth); - lua_pushnil( L); // func libs priority globals package required gc_cb [... args ...] nil + lua_pushnil(L); // func libs priority globals package required gc_cb [... args ...] nil // Lua 5.2 wants us to push the globals table on the stack - lua_pushglobaltable( L2); // _G + lua_pushglobaltable(L2); // _G while( lua_next(L, globals_idx)) // func libs priority globals package required gc_cb [... args ...] k v { - luaG_inter_copy( U, L, L2, 2, eLM_LaneBody); // _G k v + luaG_inter_copy(U, L, L2, 2, eLM_LaneBody); // _G k v // assign it in L2's globals table - lua_rawset( L2, -3); // _G + lua_rawset(L2, -3); // _G lua_pop(L, 1); // func libs priority globals package required gc_cb [... args ...] k - } // func libs priority globals package required gc_cb [... args ...] + } // func libs priority globals package required gc_cb [... args ...] lua_pop( L2, 1); // DEBUGSPEW_CODE( -- U->debugspew_indent_depth); @@ -1185,7 +1233,7 @@ LUAG_FUNC( lane_new) DEBUGSPEW_CODE( fprintf( stderr, INDENT_BEGIN "lane_new: transfer lane body\n" INDENT_END)); DEBUGSPEW_CODE( ++ U->debugspew_indent_depth); lua_pushvalue(L, 1); // func libs priority globals package required gc_cb [... args ...] func - res = luaG_inter_move( U, L, L2, 1, eLM_LaneBody); // func libs priority globals package required gc_cb [... args ...] // func + res = luaG_inter_move(U, L, L2, 1, eLM_LaneBody); // func libs priority globals package required gc_cb [... args ...] // func DEBUGSPEW_CODE( -- U->debugspew_indent_depth); if (res != 0) { @@ -1195,7 +1243,7 @@ LUAG_FUNC( lane_new) else if (lua_type(L, 1) == LUA_TSTRING) { // compile the string - if (luaL_loadstring( L2, lua_tostring(L, 1)) != 0) // func + if (luaL_loadstring(L2, lua_tostring(L, 1)) != 0) // func { return luaL_error(L, "error when parsing lane function code"); } @@ -1210,7 +1258,7 @@ LUAG_FUNC( lane_new) int res; DEBUGSPEW_CODE( fprintf( stderr, INDENT_BEGIN "lane_new: transfer lane arguments\n" INDENT_END)); DEBUGSPEW_CODE( ++ U->debugspew_indent_depth); - res = luaG_inter_move(U, L, L2, nargs, eLM_LaneBody); // func libs priority globals package required gc_cb // func [... args ...] + res = luaG_inter_move(U, L, L2, nargs, eLM_LaneBody); // func libs priority globals package required gc_cb // func [... args ...] DEBUGSPEW_CODE( -- U->debugspew_indent_depth); if (res != 0) { @@ -1222,12 +1270,6 @@ LUAG_FUNC( lane_new) STACK_CHECK_RESET_REL(L, 0); STACK_CHECK( L2, 1 + nargs); - // 'lane' is allocated from heap, not Lua, since its life span may surpass the handle's (if free running thread) - Lane* const lane{ new (U) Lane{ U, L2 } }; - if (lane == nullptr) - { - return luaL_error(L, "could not create lane: out of memory"); - } // a Lane full userdata needs a single uservalue Lane** const ud{ lua_newuserdatauv(L, 1) }; // func libs priority globals package required gc_cb lane *ud = lane; // don't forget to store the pointer in the userdata! @@ -1256,13 +1298,12 @@ LUAG_FUNC( lane_new) LANE_POINTER_REGKEY.setValue(L2, [lane](lua_State* L) { lua_pushlightuserdata(L, lane); }); // func [... args ...] STACK_CHECK(L, 1); - STACK_CHECK( L2, 1 + nargs); - - // TODO: launch thread earlier, sync with a std::latch to parallelize OS thread warmup and L2 preparation - DEBUGSPEW_CODE( fprintf( stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END)); - THREAD_CREATE(&lane->thread, lane_main, lane, priority); - + STACK_CHECK(L2, 1 + nargs); DEBUGSPEW_CODE(--U->debugspew_indent_depth); + + // all went well, the lane's thread can start working + onExit.m_lane = nullptr; + lane->m_ready.count_down(); return 1; } @@ -1280,7 +1321,7 @@ LUAG_FUNC( lane_new) // and the issue of canceling/killing threads at gc is not very nice, either // (would easily cause waits at gc cycle, which we don't want). // -LUAG_FUNC(thread_gc) +static int lane_gc(lua_State* L) { bool have_gc_cb{ false }; Lane* lane{ lua_toLane(L, 1) }; // ud @@ -1869,7 +1910,7 @@ LUAG_FUNC(configure) DEBUGSPEW_CODE( ++ U->debugspew_indent_depth); lua_newtable( L); // settings universe mt lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout - lua_pushcclosure(L, selfdestruct_gc, 1); // settings universe mt selfdestruct_gc + lua_pushcclosure(L, universe_gc, 1); // settings universe mt universe_gc lua_setfield(L, -2, "__gc"); // settings universe mt lua_setmetatable(L, -2); // settings universe lua_pop(L, 1); // settings @@ -1940,7 +1981,7 @@ LUAG_FUNC(configure) // if (luaL_newmetatable(L, "Lane")) // settings M mt { - lua_pushcfunction(L, LG_thread_gc); // settings M mt LG_thread_gc + lua_pushcfunction(L, lane_gc); // settings M mt lane_gc lua_setfield(L, -2, "__gc"); // settings M mt lua_pushcfunction(L, LG_thread_index); // settings M mt LG_thread_index lua_setfield(L, -2, "__index"); // settings M mt diff --git a/src/lanes_private.h b/src/lanes_private.h index 7c52876..bcc3014 100644 --- a/src/lanes_private.h +++ b/src/lanes_private.h @@ -1,9 +1,11 @@ #pragma once -#include "uniquekey.h" #include "cancel.h" +#include "uniquekey.h" #include "universe.h" +#include + // NOTE: values to be changed by either thread, during execution, without // locking, are marked "volatile" // @@ -22,6 +24,7 @@ class Lane using enum ThreadStatus; THREAD_T thread; + std::latch m_ready{ 1 }; // // M: sub-thread OS thread // S: not used diff --git a/src/universe.cpp b/src/universe.cpp index c487ac0..66da147 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -60,7 +60,7 @@ Universe* universe_create(lua_State* L) void universe_store(lua_State* L, Universe* U) { - ASSERT_L(universe_get(L) == nullptr); + ASSERT_L(!U || universe_get(L) == nullptr); STACK_CHECK_START_REL(L, 0); UNIVERSE_LIGHT_REGKEY.setValue(L, [U](lua_State* L) { U ? lua_pushlightuserdata( L, U) : lua_pushnil( L); }); STACK_CHECK( L, 0); -- cgit v1.2.3-55-g6feb