Skip to content

Conversation

@muhamadazmy
Copy link
Contributor

@muhamadazmy muhamadazmy commented Nov 12, 2025

@muhamadazmy muhamadazmy force-pushed the pr3973 branch 2 times, most recently from c093ea2 to 2a7342c Compare November 12, 2025 09:57
@muhamadazmy muhamadazmy marked this pull request as ready for review November 12, 2025 10:52
@muhamadazmy muhamadazmy force-pushed the pr3973 branch 6 times, most recently from 8b8db7f to 953b39a Compare November 13, 2025 11:27
@muhamadazmy muhamadazmy force-pushed the pr3973 branch 5 times, most recently from 966834c to 8ae259e Compare November 17, 2025 11:40
@tillrohrmann tillrohrmann linked an issue Nov 17, 2025 that may be closed by this pull request
@muhamadazmy muhamadazmy force-pushed the pr3973 branch 2 times, most recently from fc8efad to 5b6ea28 Compare November 25, 2025 12:54
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for creating this PR @muhamadazmy. The changes make a lot of sense to me. I left a few minor comments.

My main question was about the impact on the partition processor event loop if we are ingesting a lot of entries (potentially also large entries). Did you observe any negative effects?

It would be great to address the problem of deserializing the Envelope records in a follow-up PR since this is really unnecessary work that the system now needs to do.

Comment on lines +167 to +169
// sender
// .enqueue_many(records)
// .await
// .map_err(|_| Error::SelfProposer)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor optimization: We might be able to use sender.try_enqueue_many and only fall back to enqueue individual records if the try one fails.

In the future we might have a LogSender::enqueue_many_optimized which checks for the available capacity and obtains as many permits as possible in a batch. But this is probably premature optimization.

// is a way to pass the raw encoded data directly to the appender
let envelope = StorageCodec::decode(&mut record.record)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed a pity. Let's do this as a follow-up. I think there isn't a lot missing because Record already supports carrying Bytes and we only need a way to create a Record from (Keys, Bytes) that we have available here. This is literally wasted work that we are doing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed

Comment on lines +669 to +670
.propose_many_with_callback(records, callback)
.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this await become a problem for the partition processor event loop if there are too many records to ingest? Should we maybe think about a throttling mechanism to not starve the other select branches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I will look into it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now mainly controlled by the ingestion client "batch_size" (in bytes) This put an upper limit on the size of a single ingest request. Default is 50kB.

@muhamadazmy muhamadazmy force-pushed the pr3973 branch 6 times, most recently from 9dd6b0e to 2554660 Compare November 28, 2025 11:51
@muhamadazmy muhamadazmy changed the title [Ingress] Handle IngestRequest message [PP] Handle IngestRequest message Nov 28, 2025
@muhamadazmy muhamadazmy force-pushed the pr3973 branch 5 times, most recently from e5ee450 to ac216d6 Compare December 1, 2025 12:50
@muhamadazmy muhamadazmy force-pushed the pr3973 branch 12 times, most recently from 0f3f506 to a16b71e Compare December 4, 2025 10:57
- `ingestion-client` implements the runtime layer that receives WAL envelopes, fans it out to the correct partition, and tracks completion. It exposes:
  - `IngestionClient`, enforces inflight budgets, and resolves partition IDs before sending work downstream.
  - The session subsystem that batches `IngestRecords`, retries connections, and reports commit status to callers.
- `ingestion-client` only ingests records and notify the caller once the record is "committed" to bifrost by the PP. This makes it useful to implement kafka ingress and other external ingestion
Summary:
Handle the incoming `IngestRequest` messages sent by the `ingestion-client`
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @muhamadazmy. I think we shouldn't map ServiceStopped to LostLeadership as this has a slightly different semantical meaning than before. Instead mapping it to NotLeader should be fine, I believe. Apart from this, +1 for merging :-)

It's really nice that we no longer need to do the deserialization serialization steps when handling the Envelope 👏

RpcReplyError::LoadShedding => Self::Busy,
RpcReplyError::ServiceNotReady => Self::Busy,
RpcReplyError::ServiceStopped => Self::Stopping,
RpcReplyError::ServiceStopped => Self::LostLeadership,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ServiceStopped should rather be NotLeader. The difference is whether the partition processor has potentially processed this message or not. With LostLeadership it is not possible to safely retry an rpc because it might have processed it (e.g. written to Bifrost).

Comment on lines +666 to +667
PartitionProcessorRpcError::NotLeader(id)
| PartitionProcessorRpcError::LostLeadership(id) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to highlight that NotLeader means the message has not been processed by the pp and LostLeadership means we don't know. If this should make any difference for the IngestionClient, then we probably need two different return values. If not, then ignore my comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

New ingress API

2 participants