aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/linda.cpp104
1 files changed, 40 insertions, 64 deletions
diff --git a/src/linda.cpp b/src/linda.cpp
index fa28385..5fb8279 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -147,6 +147,42 @@ namespace {
147 } 147 }
148 148
149 // ############################################################################################# 149 // #############################################################################################
150 static bool WaitInternal([[maybe_unused]] lua_State* const L_, Lane* const lane_, Linda* const linda_, Keeper* const keeper_, std::condition_variable& waitingOn_, std::chrono::time_point<std::chrono::steady_clock> until_)
151 {
152 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
153 if (lane_ != nullptr) {
154 // change status of lane to "waiting"
155 _prev_status = lane_->status.load(std::memory_order_acquire); // Running, most likely
156 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
157 LUA_ASSERT(L_, lane_->waiting_on == nullptr);
158 lane_->waiting_on = &waitingOn_;
159 lane_->status.store(Lane::Waiting, std::memory_order_release);
160 }
161
162 // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests,
163 // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it
164 auto const [_forceTryAgain, _until_check_cancel] = std::invoke([until_, wakePeriod = linda_->getWakePeriod()] {
165 auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() };
166 if (wakePeriod.count() > 0.0f) {
167 _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod);
168 }
169 bool const _forceTryAgain{ _until_check_cancel < until_ };
170 return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : until_);
171 });
172
173 // operation can't complete: wake when it is signalled to be possible, or when timeout is reached
174 std::unique_lock<std::mutex> _guard{ keeper_->mutex, std::adopt_lock };
175 std::cv_status const _status{ waitingOn_.wait_until(_guard, _until_check_cancel) };
176 _guard.release(); // we don't want to unlock the mutex on exit!
177 bool const _try_again{ _forceTryAgain || (_status == std::cv_status::no_timeout) }; // detect spurious wakeups
178 if (lane_ != nullptr) {
179 lane_->waiting_on = nullptr;
180 lane_->status.store(_prev_status, std::memory_order_release);
181 }
182 return _try_again;
183 }
184
185 // #############################################################################################
150 186
151 // the implementation for linda:receive() and linda:receive_batched() 187 // the implementation for linda:receive() and linda:receive_batched()
152 static int ReceiveInternal(lua_State* const L_, bool const batched_) 188 static int ReceiveInternal(lua_State* const L_, bool const batched_)
@@ -201,6 +237,7 @@ namespace {
201 _cancel = (_cancel != CancelRequest::None) 237 _cancel = (_cancel != CancelRequest::None)
202 ? _cancel 238 ? _cancel
203 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); 239 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
240
204 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything 241 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
205 if (!_try_again || _cancel != CancelRequest::None) { 242 if (!_try_again || _cancel != CancelRequest::None) {
206 _pushed.emplace(0); 243 _pushed.emplace(0);
@@ -227,38 +264,7 @@ namespace {
227 } 264 }
228 265
229 // nothing received, wait until timeout or signalled that we should try again 266 // nothing received, wait until timeout or signalled that we should try again
230 { 267 _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->writeHappened, _until);
231 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
232 if (_lane != nullptr) {
233 // change status of lane to "waiting"
234 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
235 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
236 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
237 _lane->waiting_on = &_linda->writeHappened;
238 _lane->status.store(Lane::Waiting, std::memory_order_release);
239 }
240
241 // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests,
242 // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it
243 auto const [_forceTryAgain, _until_check_cancel] = std::invoke([_until, wakePeriod = _linda->getWakePeriod()] {
244 auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() };
245 if (wakePeriod.count() > 0.0f) {
246 _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod);
247 }
248 bool const _forceTryAgain{ _until_check_cancel < _until };
249 return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : _until);
250 });
251
252 // not enough data to read: wakeup when data was sent, or when timeout is reached
253 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
254 std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until_check_cancel) };
255 _guard.release(); // we don't want to unlock the mutex on exit!
256 _try_again = _forceTryAgain || (_status == std::cv_status::no_timeout); // detect spurious wakeups
257 if (_lane != nullptr) {
258 _lane->waiting_on = nullptr;
259 _lane->status.store(_prev_status, std::memory_order_release);
260 }
261 }
262 } 268 }
263 STACK_CHECK(_K, 0); 269 STACK_CHECK(_K, 0);
264 270
@@ -916,6 +922,7 @@ LUAG_FUNC(linda_send)
916 _cancel = (_cancel != CancelRequest::None) 922 _cancel = (_cancel != CancelRequest::None)
917 ? _cancel 923 ? _cancel
918 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); 924 : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None);
925
919 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything 926 // if user wants to cancel, or looped because of a timeout, the call returns without sending anything
920 if (!_try_again || _cancel != CancelRequest::None) { 927 if (!_try_again || _cancel != CancelRequest::None) {
921 _pushed.emplace(0); 928 _pushed.emplace(0);
@@ -948,38 +955,7 @@ LUAG_FUNC(linda_send)
948 } 955 }
949 956
950 // storage limit hit, wait until timeout or signalled that we should try again 957 // storage limit hit, wait until timeout or signalled that we should try again
951 { 958 _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->readHappened, _until);
952 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
953 if (_lane != nullptr) {
954 // change status of lane to "waiting"
955 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
956 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
957 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
958 _lane->waiting_on = &_linda->readHappened;
959 _lane->status.store(Lane::Waiting, std::memory_order_release);
960 }
961
962 // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests,
963 // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it
964 auto const [_forceTryAgain, _until_check_cancel] = std::invoke([_until, wakePeriod = _linda->getWakePeriod()] {
965 auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() };
966 if (wakePeriod.count() > 0.0f) {
967 _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod);
968 }
969 bool const _forceTryAgain{ _until_check_cancel < _until };
970 return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : _until);
971 });
972
973 // could not send because no room: wait until some data was read before trying again, or until timeout is reached
974 std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock };
975 std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until_check_cancel) };
976 _guard.release(); // we don't want to unlock the mutex on exit!
977 _try_again = _forceTryAgain || (status == std::cv_status::no_timeout); // detect spurious wakeups
978 if (_lane != nullptr) {
979 _lane->waiting_on = nullptr;
980 _lane->status.store(_prev_status, std::memory_order_release);
981 }
982 }
983 } 959 }
984 STACK_CHECK(_K, 0); 960 STACK_CHECK(_K, 0);
985 961