Skip to content

bug: Stream termination is not handled, a raw grpc exception is raised from SDK method #696

@LuckySting

Description

@LuckySting

Bug Report

YDB Python SDK version:

3.21.7

Environment

Python3.8, Python3.12. MacOS and Linux.

Current behavior:

If the response stream is closed due to cluster restart, the SDK raises and unhandled grpc exception. Example of exception:

grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "connections to all backends failing; last error: UNKNOWN: ipv6:<...>:2135: Failed to connect to remote host: Timeout occurred: FD Shutdown"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_status:14, grpc_message:"connections to all backends failing; last error: UNKNOWN: ipv6:<...>:2135: Failed to connect to remote host: Timeout occurred: FD Shutdown"}"

Expected behavior:

SDK should raise ydb.Unavailable error, which should be retried in retry helpers.

Steps to reproduce:
We just need to patch grpc method, which handles UnaryStreamCall:

import asyncio
from typing import Any
from unittest import mock

import grpc
import ydb.aio
from grpc._cython import cygrpc


async def fetch_stream_termination(self) -> Any:
    message = await self._read()  # Read the first message
    while message is not cygrpc.EOF:  # While the message is not empty, continue reading the stream
        yield message
        message = await self._read()

    # Emulate stream termination
    raise grpc.aio.AioRpcError(
        code=grpc.StatusCode.UNAVAILABLE,
        initial_metadata=await self.initial_metadata(),
        trailing_metadata=await self.trailing_metadata(),
    )


@mock.patch.object(grpc.aio._call._StreamResponseMixin, "_fetch_stream_responses", fetch_stream_termination)
async def main() -> None:
    async with ydb.aio.Driver(connection_string="grpc://localhost:2135?database=/local") as driver:
        await driver.wait(fail_fast=True)

        session = ydb.aio.query.QuerySession(driver)
        await session.create()
        async with await session.execute("select 1") as results:
            print("Successfuly opened the stream")
            try:
                async for _ in results:
                    pass
            except grpc.aio.AioRpcError as e:
                print(f"Got raw grpc error!:\n{e}")


if __name__ == "__main__":
    asyncio.run(main())

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions