Skip to content

Handle "Unexpected response type for remote procedure call: Close" on query stream opening #759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 10, 2025

Conversation

farost
Copy link
Member

@farost farost commented Jun 3, 2025

Usage and product changes

Fix a rare InternalError returned by mistake when a client sends a query request while the transaction is being closed. Now, an expected "The transaction is closed and no further operation is allowed." error is returned instead.

Additionally, wait for specific transaction responses in rollback, commit, and query to solidify the protocol and ensure that the server acts as expected.

Implementation

We faced this error when running a huge number of queries against a server that eventually went to sleep. Presumably, this state led to the GRPC stream interruption, and it "gracefully" stopped without errors. However, the driver's code did not expect that it could happen right after the stream opening request, while it should be considered a normal situation.

The source code where Close responses are generated:

async fn listen_loop(
    mut grpc_source: Streaming<transaction::Server>,
    collector: ResponseCollector,
    shutdown_sink: UnboundedSender<()>,
) {
    loop {
        match grpc_source.next().await {
            Some(Ok(message)) => collector.collect(message).await,
            Some(Err(status)) => break collector.close_with_error(status.into()).await,
            None => break collector.close().await, // <---- here, the stream has ended
        }
    }
    shutdown_sink.send(()).ok();
}

async fn close(self) {
  self.is_open.store(false);
  let mut listeners = std::mem::take(&mut *self.callbacks.write().unwrap());
  for (_, listener) in listeners.drain() {
      listener.finish(Ok(TransactionResponse::Close)); // <--- here, the only `Close` response is sent
  }
  // ....
}

Additionally, we explicitly wait for request's response type or stream closed marker with their correct processing for each other transaction request in transaction_stream. For this, the commit behavior of the server has been made more explicit: typedb/typedb#7484

Comment on lines 167 to 172
Ok(self.stream(TransactionRequest::Query(req))?.map(|response| match response {
Ok(TransactionResponse::Query(res)) => Ok(res),
Ok(TransactionResponse::Close) => Err(ConnectionError::TransactionIsClosed.into()),
Ok(other) => Err(InternalError::UnexpectedResponseType { response_type: format!("{other:?}") }.into()),
Err(err) => Err(err),
}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only place that needs to handle Close? What happens in any of the one-shot requests (like rollback) if the stream is closed between sending the request and receiving a response?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a fair point. It looks like we currently ignore it for single requests, and I thought that it's normal. But, while it's fine for rollbacks (the next request will fail anyway), it might not be good for commits, as there is a chance that the client will think that everything is good:

    pub(crate) fn commit(self: Pin<Box<Self>>) -> impl Promise<'static, Result> {
        let promise = self.single(TransactionRequest::Commit);
        promisify! {
            let _this = self; // move into the promise so the stream isn't dropped until the promise is resolved
            resolve!(promise).map(|_| ()) // <--- here, `TransactionResponse::Close` becomes `()` just like any other
        }
    }

    pub(crate) fn rollback(&self) -> impl Promise<'_, Result> {
        let promise = self.single(TransactionRequest::Rollback);
        promisify! { resolve!(promise).map(|_| ()) } // <--- here, `TransactionResponse::Close` becomes `()` just like any other
    }

I will add the relevant processing here, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it's actually fine. I will test it more tomorrow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out Commit should explicitly wait for Close without any Commit responses. Weird. Will decide if the server needs an update and then finalize this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmitrii-ubskii It was a good flag. I additionally covered other transaction requests and fixed an incorrect server's behavior: typedb/typedb#7484

@farost farost requested a review from dmitrii-ubskii June 9, 2025 13:03
@farost farost merged commit 5c1454d into typedb:master Jun 10, 2025
8 checks passed
@farost farost deleted the transaction-close-on-stream-opening branch June 10, 2025 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants