-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync: Added WeakSender
to sync::broadcast::channel
#7100
base: master
Are you sure you want to change the base?
Conversation
pub fn is_closed(&self) -> bool { | ||
// Channel is closed when there are no strong senders left active | ||
self.shared.num_tx.load(Acquire) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Man, the orderings in this file are a mess. Mixing SeqCst and non-SeqCst orderings on the same atomic is not a good idea. The correct orderings for counters is:
- Increments happen with relaxed.
- Decrements happen with acqrel.
- Checks for zero happen with acquire.
We shouldn't need SeqCst for num_tx
ever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry for that. There orderings and atomic operations are new for me so I copied it from sync/mpsc
and tried to understand why the specific orderings where used there.
Thank you for some explanation on what should be used instead! I will change the code accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-reading your comment I noticed that you said that we don't need SeqCst
for num_tx
ever. That was not part of my PR but should I adjust it now here in the PR too?
Edit.: I adjusted the orderings for num_tx
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's not your fault. This problem is pre-existing.
* Changed memory orderings of `num_weak_tx`: Increments happen with relaxed, decrements happen with acqrel and checks for zero happen with acquire. * Add asserts for `sync::broadcast::WeakSender` to `tests/async_send_sync.rs`
//redirect.github.com/ Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some` | ||
//redirect.github.com/ if there are other `Sender` instances alive and the channel wasn't | ||
//redirect.github.com/ previously dropped, otherwise `None` is returned. | ||
pub fn upgrade(&self) -> Option<Sender<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be #[must_use]
too.
//redirect.github.com/ Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some` | ||
//redirect.github.com/ if there are other `Sender` instances alive and the channel wasn't | ||
//redirect.github.com/ previously dropped, otherwise `None` is returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Markdown docs generally start with one really short summary, followed by more text.
//redirect.github.com/ Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some` | |
//redirect.github.com/ if there are other `Sender` instances alive and the channel wasn't | |
//redirect.github.com/ previously dropped, otherwise `None` is returned. | |
//redirect.github.com/ Tries to convert a `WeakSender` into a [`Sender`]. | |
//redirect.github.com/ | |
//redirect.github.com/ This will return `Some` | |
//redirect.github.com/ if there are other `Sender` instances alive and the channel wasn't | |
//redirect.github.com/ previously dropped, otherwise `None` is returned. |
+ reflow to line length
Motivation
Closing issue #7003. Add a type
WeakSender
to thesync::broadcast::channel
similar tosync::maps::channel
.Solution
The new
WeakSender
type just stores anArc<Shared<T>>
just like the normalSender
but activeWeakSenders
will not prevent the channel from being closed if allSender
s are dropped.Closes #7003.