diff options
Diffstat (limited to '')
-rw-r--r-- | src/linda.cpp | 104 |
1 files changed, 40 insertions, 64 deletions
diff --git a/src/linda.cpp b/src/linda.cpp index fa28385..5fb8279 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -147,6 +147,42 @@ namespace { | |||
147 | } | 147 | } |
148 | 148 | ||
149 | // ############################################################################################# | 149 | // ############################################################################################# |
150 | static bool WaitInternal([[maybe_unused]] lua_State* const L_, Lane* const lane_, Linda* const linda_, Keeper* const keeper_, std::condition_variable& waitingOn_, std::chrono::time_point<std::chrono::steady_clock> until_) | ||
151 | { | ||
152 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
153 | if (lane_ != nullptr) { | ||
154 | // change status of lane to "waiting" | ||
155 | _prev_status = lane_->status.load(std::memory_order_acquire); // Running, most likely | ||
156 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
157 | LUA_ASSERT(L_, lane_->waiting_on == nullptr); | ||
158 | lane_->waiting_on = &waitingOn_; | ||
159 | lane_->status.store(Lane::Waiting, std::memory_order_release); | ||
160 | } | ||
161 | |||
162 | // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, | ||
163 | // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it | ||
164 | auto const [_forceTryAgain, _until_check_cancel] = std::invoke([until_, wakePeriod = linda_->getWakePeriod()] { | ||
165 | auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
166 | if (wakePeriod.count() > 0.0f) { | ||
167 | _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod); | ||
168 | } | ||
169 | bool const _forceTryAgain{ _until_check_cancel < until_ }; | ||
170 | return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : until_); | ||
171 | }); | ||
172 | |||
173 | // operation can't complete: wake when it is signalled to be possible, or when timeout is reached | ||
174 | std::unique_lock<std::mutex> _guard{ keeper_->mutex, std::adopt_lock }; | ||
175 | std::cv_status const _status{ waitingOn_.wait_until(_guard, _until_check_cancel) }; | ||
176 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
177 | bool const _try_again{ _forceTryAgain || (_status == std::cv_status::no_timeout) }; // detect spurious wakeups | ||
178 | if (lane_ != nullptr) { | ||
179 | lane_->waiting_on = nullptr; | ||
180 | lane_->status.store(_prev_status, std::memory_order_release); | ||
181 | } | ||
182 | return _try_again; | ||
183 | } | ||
184 | |||
185 | // ############################################################################################# | ||
150 | 186 | ||
151 | // the implementation for linda:receive() and linda:receive_batched() | 187 | // the implementation for linda:receive() and linda:receive_batched() |
152 | static int ReceiveInternal(lua_State* const L_, bool const batched_) | 188 | static int ReceiveInternal(lua_State* const L_, bool const batched_) |
@@ -201,6 +237,7 @@ namespace { | |||
201 | _cancel = (_cancel != CancelRequest::None) | 237 | _cancel = (_cancel != CancelRequest::None) |
202 | ? _cancel | 238 | ? _cancel |
203 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | 239 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); |
240 | |||
204 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | 241 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything |
205 | if (!_try_again || _cancel != CancelRequest::None) { | 242 | if (!_try_again || _cancel != CancelRequest::None) { |
206 | _pushed.emplace(0); | 243 | _pushed.emplace(0); |
@@ -227,38 +264,7 @@ namespace { | |||
227 | } | 264 | } |
228 | 265 | ||
229 | // nothing received, wait until timeout or signalled that we should try again | 266 | // nothing received, wait until timeout or signalled that we should try again |
230 | { | 267 | _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->writeHappened, _until); |
231 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
232 | if (_lane != nullptr) { | ||
233 | // change status of lane to "waiting" | ||
234 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
235 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
236 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
237 | _lane->waiting_on = &_linda->writeHappened; | ||
238 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
239 | } | ||
240 | |||
241 | // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, | ||
242 | // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it | ||
243 | auto const [_forceTryAgain, _until_check_cancel] = std::invoke([_until, wakePeriod = _linda->getWakePeriod()] { | ||
244 | auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
245 | if (wakePeriod.count() > 0.0f) { | ||
246 | _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod); | ||
247 | } | ||
248 | bool const _forceTryAgain{ _until_check_cancel < _until }; | ||
249 | return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : _until); | ||
250 | }); | ||
251 | |||
252 | // not enough data to read: wakeup when data was sent, or when timeout is reached | ||
253 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
254 | std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until_check_cancel) }; | ||
255 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
256 | _try_again = _forceTryAgain || (_status == std::cv_status::no_timeout); // detect spurious wakeups | ||
257 | if (_lane != nullptr) { | ||
258 | _lane->waiting_on = nullptr; | ||
259 | _lane->status.store(_prev_status, std::memory_order_release); | ||
260 | } | ||
261 | } | ||
262 | } | 268 | } |
263 | STACK_CHECK(_K, 0); | 269 | STACK_CHECK(_K, 0); |
264 | 270 | ||
@@ -916,6 +922,7 @@ LUAG_FUNC(linda_send) | |||
916 | _cancel = (_cancel != CancelRequest::None) | 922 | _cancel = (_cancel != CancelRequest::None) |
917 | ? _cancel | 923 | ? _cancel |
918 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | 924 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); |
925 | |||
919 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | 926 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything |
920 | if (!_try_again || _cancel != CancelRequest::None) { | 927 | if (!_try_again || _cancel != CancelRequest::None) { |
921 | _pushed.emplace(0); | 928 | _pushed.emplace(0); |
@@ -948,38 +955,7 @@ LUAG_FUNC(linda_send) | |||
948 | } | 955 | } |
949 | 956 | ||
950 | // storage limit hit, wait until timeout or signalled that we should try again | 957 | // storage limit hit, wait until timeout or signalled that we should try again |
951 | { | 958 | _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->readHappened, _until); |
952 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
953 | if (_lane != nullptr) { | ||
954 | // change status of lane to "waiting" | ||
955 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
956 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
957 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
958 | _lane->waiting_on = &_linda->readHappened; | ||
959 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
960 | } | ||
961 | |||
962 | // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, | ||
963 | // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it | ||
964 | auto const [_forceTryAgain, _until_check_cancel] = std::invoke([_until, wakePeriod = _linda->getWakePeriod()] { | ||
965 | auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
966 | if (wakePeriod.count() > 0.0f) { | ||
967 | _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod); | ||
968 | } | ||
969 | bool const _forceTryAgain{ _until_check_cancel < _until }; | ||
970 | return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : _until); | ||
971 | }); | ||
972 | |||
973 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached | ||
974 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
975 | std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until_check_cancel) }; | ||
976 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
977 | _try_again = _forceTryAgain || (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
978 | if (_lane != nullptr) { | ||
979 | _lane->waiting_on = nullptr; | ||
980 | _lane->status.store(_prev_status, std::memory_order_release); | ||
981 | } | ||
982 | } | ||
983 | } | 959 | } |
984 | STACK_CHECK(_K, 0); | 960 | STACK_CHECK(_K, 0); |
985 | 961 | ||