aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/linda.cpp174
1 files changed, 86 insertions, 88 deletions
diff --git a/src/linda.cpp b/src/linda.cpp
index a094a8f..1119d71 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -43,7 +43,6 @@ namespace {
43 // ############################################################################################# 43 // #############################################################################################
44 // ############################################################################################# 44 // #############################################################################################
45 45
46
47 static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_) 46 static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_)
48 { 47 {
49 STACK_CHECK_START_REL(L_, 0); 48 STACK_CHECK_START_REL(L_, 0);
@@ -124,6 +123,27 @@ namespace {
124 return 0; 123 return 0;
125 } 124 }
126 125
126 // a helper to process the timeout argument of linda:send() and linda:receive()
127 [[nodiscard]]
128 static auto ProcessTimeoutArg(lua_State* const L_)
129 {
130 StackIndex _key_i{ 2 }; // index of first slot, if timeout not there
131
132 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
133 if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion
134 lua_Duration const _duration{ lua_tonumber(L_, 2) };
135 if (_duration.count() >= 0.0) {
136 _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration);
137 } else {
138 raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0");
139 }
140 ++_key_i;
141 } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot
142 ++_key_i;
143 }
144 return std::make_pair(_key_i, _until);
145 }
146
127 // ############################################################################################# 147 // #############################################################################################
128 // ############################################################################################# 148 // #############################################################################################
129} // namespace 149} // namespace
@@ -644,20 +664,8 @@ LUAG_FUNC(linda_receive)
644 static constexpr lua_CFunction _receive{ 664 static constexpr lua_CFunction _receive{
645 +[](lua_State* const L_) { 665 +[](lua_State* const L_) {
646 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; 666 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
647 StackIndex _key_i{ 2 }; // index of first slot, if timeout not there
648 667
649 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; 668 auto [_key_i, _until] = ProcessTimeoutArg(L_);
650 if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion
651 lua_Duration const _duration{ lua_tonumber(L_, 2) };
652 if (_duration.count() >= 0.0) {
653 _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration);
654 } else {
655 raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0");
656 }
657 ++_key_i;
658 } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot
659 ++_key_i;
660 }
661 669
662 keeper_api_t _selected_keeper_receive{ nullptr }; 670 keeper_api_t _selected_keeper_receive{ nullptr };
663 int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; 671 int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 };
@@ -698,6 +706,7 @@ LUAG_FUNC(linda_receive)
698 706
699 CancelRequest _cancel{ CancelRequest::None }; 707 CancelRequest _cancel{ CancelRequest::None };
700 KeeperCallResult _pushed{}; 708 KeeperCallResult _pushed{};
709
701 STACK_CHECK_START_REL(_K, 0); 710 STACK_CHECK_START_REL(_K, 0);
702 for (bool _try_again{ true };;) { 711 for (bool _try_again{ true };;) {
703 if (_lane != nullptr) { 712 if (_lane != nullptr) {
@@ -713,6 +722,7 @@ LUAG_FUNC(linda_receive)
713 } 722 }
714 723
715 // all arguments of receive() but the first are passed to the keeper's receive function 724 // all arguments of receive() but the first are passed to the keeper's receive function
725 STACK_CHECK(_K, 0);
716 _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); 726 _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i);
717 if (!_pushed.has_value()) { 727 if (!_pushed.has_value()) {
718 break; 728 break;
@@ -848,20 +858,8 @@ LUAG_FUNC(linda_send)
848 static constexpr lua_CFunction _send{ 858 static constexpr lua_CFunction _send{
849 +[](lua_State* const L_) { 859 +[](lua_State* const L_) {
850 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; 860 Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) };
851 StackIndex _key_i{ 2 }; // index of first slot, if timeout not there
852 861
853 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; 862 auto const [_key_i, _until] = ProcessTimeoutArg(L_);
854 if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion
855 lua_Duration const _duration{ lua_tonumber(L_, 2) };
856 if (_duration.count() >= 0.0) {
857 _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration);
858 } else {
859 raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0");
860 }
861 ++_key_i;
862 } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot
863 ++_key_i;
864 }
865 863
866 // make sure the slot is of a valid type 864 // make sure the slot is of a valid type
867 CheckKeyTypes(L_, _key_i, _key_i); 865 CheckKeyTypes(L_, _key_i, _key_i);
@@ -873,78 +871,78 @@ LUAG_FUNC(linda_send)
873 raise_luaL_error(L_, "no data to send"); 871 raise_luaL_error(L_, "no data to send");
874 } 872 }
875 873
874 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
875 Keeper* const _keeper{ _linda->whichKeeper() };
876 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
877 if (_K == nullptr)
878 return 0;
879
876 bool _ret{ false }; 880 bool _ret{ false };
877 CancelRequest _cancel{ CancelRequest::None }; 881 CancelRequest _cancel{ CancelRequest::None };
878 KeeperCallResult _pushed; 882 KeeperCallResult _pushed{};
879 {
880 Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) };
881 Keeper* const _keeper{ _linda->whichKeeper() };
882 KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } };
883 if (_K == nullptr)
884 return 0;
885 883
886 STACK_CHECK_START_REL(_K, 0); 884 STACK_CHECK_START_REL(_K, 0);
887 for (bool _try_again{ true };;) { 885 for (bool _try_again{ true };;) {
888 if (_lane != nullptr) { 886 if (_lane != nullptr) {
889 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); 887 _cancel = _lane->cancelRequest.load(std::memory_order_relaxed);
890 } 888 }
891 _cancel = (_cancel != CancelRequest::None) 889 _cancel = (_cancel != CancelRequest::None)
892 ? _cancel 890 ? _cancel
893 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); 891 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
894 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything 892 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
895 if (!_try_again || _cancel != CancelRequest::None) { 893 if (!_try_again || _cancel != CancelRequest::None) {
896 _pushed.emplace(0); 894 _pushed.emplace(0);
897 break; 895 break;
898 } 896 }
899 897
900 STACK_CHECK(_K, 0); 898 // all arguments of send() but the first are passed to the keeper's send function
901 _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i); 899 STACK_CHECK(_K, 0);
902 if (!_pushed.has_value()) { 900 _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i);
903 break; 901 if (!_pushed.has_value()) {
904 } 902 break;
905 LUA_ASSERT(L_, _pushed.value() == 1); 903 }
904 LUA_ASSERT(L_, _pushed.value() == 1);
906 905
907 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { 906 if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) {
908 raise_luaL_error(L_, "Key is restricted"); 907 raise_luaL_error(L_, "Key is restricted");
909 } 908 }
910 _ret = lua_toboolean(L_, -1) ? true : false; 909 _ret = lua_toboolean(L_, -1) ? true : false;
911 lua_pop(L_, 1); 910 lua_pop(L_, 1);
912 911
913 if (_ret) { 912 if (_ret) {
914 // Wake up ALL waiting threads 913 // Wake up ALL waiting threads
915 _linda->writeHappened.notify_all(); 914 _linda->writeHappened.notify_all();
916 break; 915 break;
917 } 916 }
918 917
919 // instant timout to bypass the wait syscall 918 // instant timout to bypass the wait syscall
920 if (std::chrono::steady_clock::now() >= _until) { 919 if (std::chrono::steady_clock::now() >= _until) {
921 break; /* no wait; instant timeout */ 920 break; /* no wait; instant timeout */
922 } 921 }
923 922
924 // storage limit hit, wait until timeout or signalled that we should try again 923 // storage limit hit, wait until timeout or signalled that we should try again
925 { 924 {
926 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings 925 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
927 if (_lane != nullptr) { 926 if (_lane != nullptr) {
928 // change status of lane to "waiting" 927 // change status of lane to "waiting"
929 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely 928 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
930 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case 929 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
931 LUA_ASSERT(L_, _lane->waiting_on == nullptr); 930 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
932 _lane->waiting_on = &_linda->readHappened; 931 _lane->waiting_on = &_linda->readHappened;
933 _lane->status.store(Lane::Waiting, std::memory_order_release); 932 _lane->status.store(Lane::Waiting, std::memory_order_release);
934 } 933 }
935 // could not send because no room: wait until some data was read before trying again, or until timeout is reached 934 // could not send because no room: wait until some data was read before trying again, or until timeout is reached
936 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; 935 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
937 std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) }; 936 std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) };
938 _guard.release(); // we don't want to unlock the mutex on exit! 937 _guard.release(); // we don't want to unlock the mutex on exit!
939 _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups 938 _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
940 if (_lane != nullptr) { 939 if (_lane != nullptr) {
941 _lane->waiting_on = nullptr; 940 _lane->waiting_on = nullptr;
942 _lane->status.store(_prev_status, std::memory_order_release); 941 _lane->status.store(_prev_status, std::memory_order_release);
943 }
944 } 942 }
945 } 943 }
946 STACK_CHECK(_K, 0);
947 } 944 }
945 STACK_CHECK(_K, 0);
948 946
949 if (!_pushed.has_value()) { 947 if (!_pushed.has_value()) {
950 raise_luaL_error(L_, "tried to copy unsupported types"); 948 raise_luaL_error(L_, "tried to copy unsupported types");