|
8 | 8 |
|
9 | 9 | import asyncssh |
10 | 10 | from asyncssh.sftp import SFTPOpUnsupported |
11 | | -from fsspec.asyn import AsyncFileSystem, async_methods, sync, sync_wrapper |
| 11 | +from fsspec.asyn import ( |
| 12 | + AsyncFileSystem, |
| 13 | + FSTimeoutError, |
| 14 | + async_methods, |
| 15 | + sync, |
| 16 | + sync_wrapper, |
| 17 | +) |
12 | 18 | from fsspec.utils import infer_storage_options |
13 | 19 |
|
14 | 20 | from sshfs.file import SSHFile |
@@ -71,7 +77,7 @@ def __init__( |
71 | 77 | **_client_args, |
72 | 78 | ) |
73 | 79 | weakref.finalize( |
74 | | - self, sync, self.loop, self._finalize, self._pool, self._stack |
| 80 | + self, self._finalize, self.loop, self._pool, self._stack |
75 | 81 | ) |
76 | 82 |
|
77 | 83 | @classmethod |
@@ -101,15 +107,29 @@ async def _connect( |
101 | 107 | connect = sync_wrapper(_connect) |
102 | 108 |
|
103 | 109 | @staticmethod |
104 | | - async def _finalize(pool, stack): |
105 | | - await pool.close() |
106 | | - |
107 | | - # If an error occurs while the SSHFile is trying to |
108 | | - # open the native file, then the client might get broken |
109 | | - # due to partial initialization. We are just going to ignore |
110 | | - # the errors that arises on the finalization layer |
111 | | - with suppress(BrokenPipeError): |
112 | | - await stack.aclose() |
| 110 | + def _finalize(loop, pool, stack): |
| 111 | + async def close(): |
| 112 | + await pool.close() |
| 113 | + # If an error occurs while the SSHFile is trying to |
| 114 | + # open the native file, then the client might get broken |
| 115 | + # due to partial initialization. We are just going to ignore |
| 116 | + # the errors that arises on the finalization layer |
| 117 | + with suppress(BrokenPipeError): |
| 118 | + await stack.aclose() |
| 119 | + |
| 120 | + if loop is not None and loop.is_running(): |
| 121 | + try: |
| 122 | + loop = asyncio.get_running_loop() |
| 123 | + loop.create_task(close()) |
| 124 | + return |
| 125 | + except RuntimeError: |
| 126 | + pass |
| 127 | + |
| 128 | + try: |
| 129 | + sync(loop, close, timeout=0.1) |
| 130 | + return |
| 131 | + except FSTimeoutError: |
| 132 | + pass |
113 | 133 |
|
114 | 134 | @property |
115 | 135 | def client(self): |
|
0 commit comments