diff options
Diffstat (limited to '')
-rw-r--r-- | src/linda.cpp | 174 |
1 files changed, 86 insertions, 88 deletions
diff --git a/src/linda.cpp b/src/linda.cpp index a094a8f..1119d71 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -43,7 +43,6 @@ namespace { | |||
43 | // ############################################################################################# | 43 | // ############################################################################################# |
44 | // ############################################################################################# | 44 | // ############################################################################################# |
45 | 45 | ||
46 | |||
47 | static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_) | 46 | static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_) |
48 | { | 47 | { |
49 | STACK_CHECK_START_REL(L_, 0); | 48 | STACK_CHECK_START_REL(L_, 0); |
@@ -124,6 +123,27 @@ namespace { | |||
124 | return 0; | 123 | return 0; |
125 | } | 124 | } |
126 | 125 | ||
126 | // a helper to process the timeout argument of linda:send() and linda:receive() | ||
127 | [[nodiscard]] | ||
128 | static auto ProcessTimeoutArg(lua_State* const L_) | ||
129 | { | ||
130 | StackIndex _key_i{ 2 }; // index of first slot, if timeout not there | ||
131 | |||
132 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
133 | if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion | ||
134 | lua_Duration const _duration{ lua_tonumber(L_, 2) }; | ||
135 | if (_duration.count() >= 0.0) { | ||
136 | _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration); | ||
137 | } else { | ||
138 | raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); | ||
139 | } | ||
140 | ++_key_i; | ||
141 | } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot | ||
142 | ++_key_i; | ||
143 | } | ||
144 | return std::make_pair(_key_i, _until); | ||
145 | } | ||
146 | |||
127 | // ############################################################################################# | 147 | // ############################################################################################# |
128 | // ############################################################################################# | 148 | // ############################################################################################# |
129 | } // namespace | 149 | } // namespace |
@@ -644,20 +664,8 @@ LUAG_FUNC(linda_receive) | |||
644 | static constexpr lua_CFunction _receive{ | 664 | static constexpr lua_CFunction _receive{ |
645 | +[](lua_State* const L_) { | 665 | +[](lua_State* const L_) { |
646 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | 666 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; |
647 | StackIndex _key_i{ 2 }; // index of first slot, if timeout not there | ||
648 | 667 | ||
649 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | 668 | auto [_key_i, _until] = ProcessTimeoutArg(L_); |
650 | if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion | ||
651 | lua_Duration const _duration{ lua_tonumber(L_, 2) }; | ||
652 | if (_duration.count() >= 0.0) { | ||
653 | _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration); | ||
654 | } else { | ||
655 | raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); | ||
656 | } | ||
657 | ++_key_i; | ||
658 | } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot | ||
659 | ++_key_i; | ||
660 | } | ||
661 | 669 | ||
662 | keeper_api_t _selected_keeper_receive{ nullptr }; | 670 | keeper_api_t _selected_keeper_receive{ nullptr }; |
663 | int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; | 671 | int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; |
@@ -698,6 +706,7 @@ LUAG_FUNC(linda_receive) | |||
698 | 706 | ||
699 | CancelRequest _cancel{ CancelRequest::None }; | 707 | CancelRequest _cancel{ CancelRequest::None }; |
700 | KeeperCallResult _pushed{}; | 708 | KeeperCallResult _pushed{}; |
709 | |||
701 | STACK_CHECK_START_REL(_K, 0); | 710 | STACK_CHECK_START_REL(_K, 0); |
702 | for (bool _try_again{ true };;) { | 711 | for (bool _try_again{ true };;) { |
703 | if (_lane != nullptr) { | 712 | if (_lane != nullptr) { |
@@ -713,6 +722,7 @@ LUAG_FUNC(linda_receive) | |||
713 | } | 722 | } |
714 | 723 | ||
715 | // all arguments of receive() but the first are passed to the keeper's receive function | 724 | // all arguments of receive() but the first are passed to the keeper's receive function |
725 | STACK_CHECK(_K, 0); | ||
716 | _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); | 726 | _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); |
717 | if (!_pushed.has_value()) { | 727 | if (!_pushed.has_value()) { |
718 | break; | 728 | break; |
@@ -848,20 +858,8 @@ LUAG_FUNC(linda_send) | |||
848 | static constexpr lua_CFunction _send{ | 858 | static constexpr lua_CFunction _send{ |
849 | +[](lua_State* const L_) { | 859 | +[](lua_State* const L_) { |
850 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; | 860 | Linda* const _linda{ ToLinda<false>(L_, StackIndex{ 1 }) }; |
851 | StackIndex _key_i{ 2 }; // index of first slot, if timeout not there | ||
852 | 861 | ||
853 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | 862 | auto const [_key_i, _until] = ProcessTimeoutArg(L_); |
854 | if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion | ||
855 | lua_Duration const _duration{ lua_tonumber(L_, 2) }; | ||
856 | if (_duration.count() >= 0.0) { | ||
857 | _until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(_duration); | ||
858 | } else { | ||
859 | raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); | ||
860 | } | ||
861 | ++_key_i; | ||
862 | } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot | ||
863 | ++_key_i; | ||
864 | } | ||
865 | 863 | ||
866 | // make sure the slot is of a valid type | 864 | // make sure the slot is of a valid type |
867 | CheckKeyTypes(L_, _key_i, _key_i); | 865 | CheckKeyTypes(L_, _key_i, _key_i); |
@@ -873,78 +871,78 @@ LUAG_FUNC(linda_send) | |||
873 | raise_luaL_error(L_, "no data to send"); | 871 | raise_luaL_error(L_, "no data to send"); |
874 | } | 872 | } |
875 | 873 | ||
874 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
875 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
876 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
877 | if (_K == nullptr) | ||
878 | return 0; | ||
879 | |||
876 | bool _ret{ false }; | 880 | bool _ret{ false }; |
877 | CancelRequest _cancel{ CancelRequest::None }; | 881 | CancelRequest _cancel{ CancelRequest::None }; |
878 | KeeperCallResult _pushed; | 882 | KeeperCallResult _pushed{}; |
879 | { | ||
880 | Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue<Lane>(L_) }; | ||
881 | Keeper* const _keeper{ _linda->whichKeeper() }; | ||
882 | KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast<lua_State*>(nullptr) } }; | ||
883 | if (_K == nullptr) | ||
884 | return 0; | ||
885 | 883 | ||
886 | STACK_CHECK_START_REL(_K, 0); | 884 | STACK_CHECK_START_REL(_K, 0); |
887 | for (bool _try_again{ true };;) { | 885 | for (bool _try_again{ true };;) { |
888 | if (_lane != nullptr) { | 886 | if (_lane != nullptr) { |
889 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); | 887 | _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); |
890 | } | 888 | } |
891 | _cancel = (_cancel != CancelRequest::None) | 889 | _cancel = (_cancel != CancelRequest::None) |
892 | ? _cancel | 890 | ? _cancel |
893 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | 891 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); |
894 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | 892 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything |
895 | if (!_try_again || _cancel != CancelRequest::None) { | 893 | if (!_try_again || _cancel != CancelRequest::None) { |
896 | _pushed.emplace(0); | 894 | _pushed.emplace(0); |
897 | break; | 895 | break; |
898 | } | 896 | } |
899 | 897 | ||
900 | STACK_CHECK(_K, 0); | 898 | // all arguments of send() but the first are passed to the keeper's send function |
901 | _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i); | 899 | STACK_CHECK(_K, 0); |
902 | if (!_pushed.has_value()) { | 900 | _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i); |
903 | break; | 901 | if (!_pushed.has_value()) { |
904 | } | 902 | break; |
905 | LUA_ASSERT(L_, _pushed.value() == 1); | 903 | } |
904 | LUA_ASSERT(L_, _pushed.value() == 1); | ||
906 | 905 | ||
907 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { | 906 | if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { |
908 | raise_luaL_error(L_, "Key is restricted"); | 907 | raise_luaL_error(L_, "Key is restricted"); |
909 | } | 908 | } |
910 | _ret = lua_toboolean(L_, -1) ? true : false; | 909 | _ret = lua_toboolean(L_, -1) ? true : false; |
911 | lua_pop(L_, 1); | 910 | lua_pop(L_, 1); |
912 | 911 | ||
913 | if (_ret) { | 912 | if (_ret) { |
914 | // Wake up ALL waiting threads | 913 | // Wake up ALL waiting threads |
915 | _linda->writeHappened.notify_all(); | 914 | _linda->writeHappened.notify_all(); |
916 | break; | 915 | break; |
917 | } | 916 | } |
918 | 917 | ||
919 | // instant timout to bypass the wait syscall | 918 | // instant timout to bypass the wait syscall |
920 | if (std::chrono::steady_clock::now() >= _until) { | 919 | if (std::chrono::steady_clock::now() >= _until) { |
921 | break; /* no wait; instant timeout */ | 920 | break; /* no wait; instant timeout */ |
922 | } | 921 | } |
923 | 922 | ||
924 | // storage limit hit, wait until timeout or signalled that we should try again | 923 | // storage limit hit, wait until timeout or signalled that we should try again |
925 | { | 924 | { |
926 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | 925 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings |
927 | if (_lane != nullptr) { | 926 | if (_lane != nullptr) { |
928 | // change status of lane to "waiting" | 927 | // change status of lane to "waiting" |
929 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | 928 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely |
930 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | 929 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case |
931 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | 930 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); |
932 | _lane->waiting_on = &_linda->readHappened; | 931 | _lane->waiting_on = &_linda->readHappened; |
933 | _lane->status.store(Lane::Waiting, std::memory_order_release); | 932 | _lane->status.store(Lane::Waiting, std::memory_order_release); |
934 | } | 933 | } |
935 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached | 934 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached |
936 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | 935 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; |
937 | std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) }; | 936 | std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) }; |
938 | _guard.release(); // we don't want to unlock the mutex on exit! | 937 | _guard.release(); // we don't want to unlock the mutex on exit! |
939 | _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | 938 | _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups |
940 | if (_lane != nullptr) { | 939 | if (_lane != nullptr) { |
941 | _lane->waiting_on = nullptr; | 940 | _lane->waiting_on = nullptr; |
942 | _lane->status.store(_prev_status, std::memory_order_release); | 941 | _lane->status.store(_prev_status, std::memory_order_release); |
943 | } | ||
944 | } | 942 | } |
945 | } | 943 | } |
946 | STACK_CHECK(_K, 0); | ||
947 | } | 944 | } |
945 | STACK_CHECK(_K, 0); | ||
948 | 946 | ||
949 | if (!_pushed.has_value()) { | 947 | if (!_pushed.has_value()) { |
950 | raise_luaL_error(L_, "tried to copy unsupported types"); | 948 | raise_luaL_error(L_, "tried to copy unsupported types"); |