-
Notifications
You must be signed in to change notification settings - Fork 229
feat(client): add parallel execution control for receiver callbacks #1140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Doug Davis <[email protected]> Signed-off-by: flc1125 <[email protected]>
Signed-off-by: flc1125 <[email protected]>
Signed-off-by: flc1125 <[email protected]>
Signed-off-by: flc1125 <[email protected]>
@@ -265,8 +269,16 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { | |||
} | |||
} | |||
|
|||
if c.blockingCallback { | |||
callback() | |||
if parallel != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am i reading this right... we don't support blocking any more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the value of WithParallelGoroutines
to 1
, which can achieve the same effect. Moreover, it will be more versatile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may not matter but you're blocking at a different place now... if I'm reading this right.
Before it was: read -> block while processing
Now it's: read -> func(process)() -> read -> wait for a free spot to open in the channel
Right? This means that we're getting the next event BEFORE we're ready, where before we could only get one AFTER we were ready. Not sure it matters, but something to think about.
Also, we need testcases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before it was: read -> block while processing
Now it's: read -> func(process)() -> read -> wait for a free spot to open in the channel
You're right, indeed. Given this situation, should we maintain the original approach or treat it as a breaking change?
I'd like to confirm our final approach so I can make some adjustments accordingly.
PS: I'll supplement the unit tests later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends on whether calling:
- c.responder.Respond(ctx)
- c.receiver.Receive(ctx)
before we're ready is risky or not. For example, if we get the next "msg" and then pause, and this entire for-loop stops. If another process tries to continue with the same responser/receiver, that one "msg" will be lost.
Abstractly it sounds like a bug. But in practice I don't know if the idea of continuing to get messages outside of the instance of this for-loop makes any sense. E.g. if we're pulling messages from a queue during the "receiver.Receive()" call, then I think this situation might occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make assumptions based on the logic of WithPollGoroutines. When there are multiple asynchronous fetches occurring simultaneously, it essentially means the scenario you just described is happening.
Can I understand it this way: event messages, when distributed across different processes (goroutines or others), inherently have the potential to encounter such situations? At the very least, the message currently being read would appear as "lost" or "non-existent" to other processes (goroutines or others).
Therefore, I believe this scenario might not be a cause for concern.
Of course, there is one extreme case where the user specifically desires to read messages "sequentially," meaning no other processes (goroutines or others) are allowed to handle the "next" message additionally.
However, I think this situation is similar to using blockingCallback, where the user intentionally makes such a choice. When this happens, it implies there is no scenario where another process (goroutine or other) is missing a message still waiting for the channel to become available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of things come to mind:
1 - I make a distinction between "lost a message because the processor of that message crashed", and "lost a message because it was pulled it from a queue and purposely never even tried to be processed". The former is kind of expected. The latter is a bug IMO. While the latter isn't 100% "purposely dropped it"... the logic comes close since we don't take into account the situation where this entire go process just dies while that message is "on hold". The queue it's being pulled from should be where it's "on hold".
2 - if we're pulling messages from a queue then this "on hold" message could be processed before the next message if there's another processor pulling messages from that queue. Now, I'm not suggesting that it's our job to try to synchronous things in such a scenario, however, I think someone would find it really odd that this "on hold" message might be delayed significantly before it's even started to be processed while the next message might be processed immediately.
3 - "this situation is similar to using blockingCallback"... exactly. We don't support "blocking" any more. At least not with the same "when are things pulled off of the original queue" semantics.
I'm curious to know @embano1's thoughts, but I'm leaning towards a "only pull the next message when we're ready" model, otherwise it kind of feels like a potential bug or we're introducing another queue into the flow... granted a queue of size one, but still a queue, that has zero persistence/resiliency to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right. We need to maintain predictability for end users regarding events. Hiding certain event-related operations might make users feel things are beyond their control.
Perhaps we could slightly adjust the channel position—that might preserve the original logic. Of course, this is just my initial thought—I'll look into it more carefully when I have time to confirm.
@duglin Involves some methods, functions, and variable naming aspects. If there are better suggestions, they can also be provided to me for adjustment. |
…oroutines Signed-off-by: Flc <[email protected]>
Signed-off-by: Flc <[email protected]>
Add an attribute to support customizing the number of asynchronous parallel executions.
Currently, creating coroutines without limits significantly impacts server performance. Since we currently only have serial and arbitrary options, this attribute has been added to facilitate customizing the number of asynchronous coroutines.
Certainly, the actual total number of goroutines is determined by
WithPollGoroutines
andWithParallelGoroutines
, and it involves a multiplication operation.These two may seem redundant, but their purposes are actually different.
WithPollGoroutines
defines the concurrency level for reading events from the protocol;WithParallelGoroutines
, on the other hand, specifies the parallel processing count after the events are read.