diff --git a/AlorPy.py b/AlorPy.py index f91e4b4..008995e 100644 --- a/AlorPy.py +++ b/AlorPy.py @@ -2,10 +2,9 @@ from typing import Union, Any # Объединение типов, любой тип from math import log10 # Кол-во десятичных знаков будем получать из шага цены через десятичный логарифм from datetime import datetime, UTC -from time import time_ns # Текущее время в наносекундах, прошедших с 01.01.1970 UTC +from time import time_ns, sleep # Текущее время в наносекундах, прошедших с 01.01.1970 UTC from uuid import uuid4 # Номера подписок должны быть уникальными во времени и пространстве from json import loads, JSONDecodeError, dumps # Сервер WebSockets работает с JSON сообщениями -from asyncio import get_event_loop, create_task, run, CancelledError # Работа с асинхронными функциями from threading import Thread # Подписки сервера WebSockets будем получать в отдельном потоке from pytz import timezone, utc # Работаем с временнОй зоной и UTC @@ -13,7 +12,8 @@ from requests import post, get, put, delete, Response # Запросы/ответы от сервера запросов from jwt import decode # Декодирование токена JWT для получения договоров и портфелей from urllib3.exceptions import MaxRetryError # Соединение с сервером не установлено за максимальное кол-во попыток подключения -from websockets import connect, ConnectionClosed # Работа с сервером WebSockets +from websockets.sync.client import connect +from websockets.exceptions import ConnectionClosed # Работа с сервером WebSockets from AlorPy import Config # Файл конфигурации @@ -40,8 +40,10 @@ def __init__(self, refresh_token=Config.refresh_token, demo=False): self.cws_socket = None # Подключение к серверу WebSocket self.ws_server = f'wss://api{"dev" if demo else ""}.alor.ru/ws' # Сервис подписок и событий WebSocket self.ws_socket = None # Подключение к серверу WebSocket - self.ws_task = None # Задача управления подписками WebSocket + self.ws_thread = None # Поток получения сообщений из WebSocket self.ws_ready = False # WebSocket готов принимать запросы + self.ws_running = False # Флаг управления потоком WebSocket + self.ws_reconnect_timeout = 5 # Задержка между попытками подключиться к серверу, секунды # События Alor OpenAPI V2 self.on_change_order_book = self.default_handler # Биржевой стакан @@ -58,16 +60,15 @@ def __init__(self, refresh_token=Config.refresh_token, demo=False): self.on_order = self.default_handler # Заявки self.on_symbol = self.default_handler # Информация о финансовых инструментах - # События WebSocket Thread/Task - self.on_entering = lambda: self.logger.debug(f'WebSocket Thread: Запуск') # Начало входа (Thread) + # События WebSocket Thread + self.on_entering = lambda: self.logger.debug(f'WebSocket Main: Запуск') # Начало входа (Main) self.on_enter = lambda: self.logger.debug(f'WebSocket Thread: Запущен') # Вход (Thread) - self.on_connect = lambda: self.logger.debug(f'WebSocket Task: Подключен к серверу') # Подключение к серверу (Task) - self.on_resubscribe = lambda: self.logger.debug(f'WebSocket Task: Возобновление подписок ({len(self.subscriptions)})') # Возобновление подписок (Task) - self.on_ready = lambda: self.logger.debug(f'WebSocket Task: Готов') # Готовность к работе (Task) - self.on_disconnect = lambda: self.logger.debug(f'WebSocket Task: Отключен от сервера') # Отключение от сервера (Task) - self.on_timeout = lambda: self.logger.debug(f'WebSocket Task: Таймаут') # Таймаут/максимальное кол-во попыток подключения (Task) - self.on_error = lambda response: self.logger.debug(f'WebSocket Task: {response}') # Ошибка (Task) - self.on_cancel = lambda: self.logger.debug(f'WebSocket Task: Отмена') # Отмена (Task) + self.on_connect = lambda: self.logger.debug(f'WebSocket Thread: Подключен к серверу') # Подключение к серверу (Thread) + self.on_resubscribe = lambda: self.logger.debug(f'WebSocket Thread: Возобновление подписок ({len(self.subscriptions)})') # Возобновление подписок (Thread) + self.on_ready = lambda: self.logger.debug(f'WebSocket Thread: Готов') # Готовность к работе (Thread) + self.on_disconnect = lambda: self.logger.debug(f'WebSocket Thread: Отключен от сервера') # Отключение от сервера (Thread) + self.on_timeout = lambda: self.logger.debug(f'WebSocket Thread: Таймаут') # Таймаут/максимальное кол-во попыток подключения (Thread) + self.on_error = lambda response: self.logger.debug(f'WebSocket Thread: {response}') # Ошибка (Thread) self.on_exit = lambda: self.logger.debug(f'WebSocket Thread: Завершение') # Выход (Thread) self.refresh_token = refresh_token # Токен @@ -1566,7 +1567,7 @@ def unsubscribe(self, guid): # https://alor.dev/docs/api/websocket/data-subscri :return: Уникальный идентификатор подписки """ request = {'opcode': 'unsubscribe', 'token': str(self.get_jwt_token()), 'guid': guid} # Запрос на отмену подписки - get_event_loop().run_until_complete(self.ws_socket.send(dumps(request))) # Отправляем запрос. Дожидаемся его выполнения + self.ws_socket.send(dumps(request)) # Отправляем запрос. Дожидаемся его выполнения del self.subscriptions[guid] # Удаляем подписку из справочника return self.subscribe(request) # Отправляем запрос, возвращаем уникальный идентификатор подписки @@ -1955,25 +1956,17 @@ def check_result(self, response): # Запросы WebSocket def send_websocket(self, request): - """Отправка запроса WebSocket - - :param request: Запрос JSON - :return: JSON, текст, None в случае веб ошибки - """ - response = get_event_loop().run_until_complete(self.send_websocket_async(request)) # Запускаем асинхронную фукнцию с параметрами. Дожидаемся выполнения. Получаем результат - return self.check_websocket_result(response) # Возвращаем результат после анализа - - async def send_websocket_async(self, request): - """Отправка асинхронного запроса WebSocket + """Отправка запроса через командный WebSocket :param request: Запрос JSON :return: Ответ JSON """ if not self.cws_socket: # Если не было подключения к серверу WebSocket - self.cws_socket = await connect(self.cws_server) # то пробуем к нему подключиться + self.cws_socket = connect(self.cws_server) # то пробуем к нему подключиться request['guid'] = str(uuid4()) # Получаем уникальный идентификатор запроса, ставим его в запрос - await self.cws_socket.send(dumps(request)) # Переводим JSON в строку, отправляем запрос - return await self.cws_socket.recv() # Дожидаемся ответа, возвращаем его + self.cws_socket.send(dumps(request)) # Переводим JSON в строку, отправляем запрос + response = self.cws_socket.recv() # Дожидаемся ответа, возвращаем его + return self.check_websocket_result(response) def check_websocket_result(self, response): """Анализ результата запроса WebSocket @@ -2003,29 +1996,29 @@ def subscribe(self, request) -> str: :param request request: Запрос :return: Уникальный идентификатор подписки """ - if not self.ws_ready: # Если WebSocket не готов принимать запросы - self.on_entering() # Событие начала входа (Thread) - Thread(target=run, args=(self.websocket_async(),)).start() # Создаем и запускаем поток управления подписками + if not self.ws_ready and not self.ws_running: # Если WebSocket не готов принимать запросы + self.ws_running = True # Запуск потока только один раз + self.on_entering() # Событие начала входа (Main) + self.ws_thread = Thread(target=self.websocket_loop) + self.ws_thread.start() # Создаем и запускаем поток управления подписками while not self.ws_ready: # Подключение к серверу WebSocket выполняется в отдельном потоке - pass # Подождем, пока WebSocket не будет готов принимать запросы + sleep(self.ws_reconnect_timeout/10) # Подождем, пока WebSocket не будет готов принимать запросы + guid = str(uuid4()) # Уникальный идентификатор подписки - thread = Thread(target=run, args=(self.subscribe_async(request, guid),)) # Поток подписки - thread.start() # Запускаем - thread.join() # Ожидаем завершения + self.subscribe_call(request, guid) return guid - async def websocket_async(self): + def websocket_loop(self): """Запуск и управление задачей подписок""" self.on_enter() # Событие входа (Thread) - while True: # Будем держать соединение с сервером WebSocket до отмены - self.ws_task = create_task(self.websocket_handler()) # Запускаем задачу (Task) подключения к серверу WebSocket и получения с него подписок - try: - await self.ws_task # Ожидаем отмены задачи - except CancelledError: # Если задачу отменили - break # то выходим, дальше не продолжаем + while self.ws_running: # Будем держать соединение с сервером WebSocket до отмены + self.websocket_handler() # Запускаем функцию подключения к серверу WebSocket и получения с него подписок + if self.ws_running: # Если отключение не было запрошено пользователем + self.logger.debug(f'websocket_loop: попытка переподключения к вебсокету через {self.ws_reconnect_timeout} секунд') + sleep(self.ws_reconnect_timeout) # выжидаем время до следующей попытки подключиться self.on_exit() # Событие выхода (Thread) - async def websocket_handler(self): + def websocket_handler(self): """ - Подключение к серверу WebSocket - Переподключение к серверу WebSocket. Возобновление подписок, если требуется @@ -2036,18 +2029,18 @@ async def websocket_handler(self): # Но подключение сбрасывается, если в очереди соединения находится более 5000 непрочитанных сообщений # Это может быть из-за медленного компьютера или слабого канала связи # В любом из этих случаев создание дополнительных подключений проблему не решит - self.ws_socket = await connect(self.ws_server) # Пробуем подключиться к серверу WebSocket - self.on_connect() # Событие подключения к серверу (Task) + self.ws_socket = connect(self.ws_server) # Пробуем подключиться к серверу WebSocket + self.on_connect() # Событие подключения к серверу (Thread) if len(self.subscriptions) > 0: # Если есть подписки, то будем их возобновлять - self.on_resubscribe() # Событие возобновления подписок (Task) + self.on_resubscribe() # Событие возобновления подписок (Thread) for guid, request in self.subscriptions.items(): # Пробегаемся по всем подпискам - await self.subscribe_async(request, guid) # Переподписываемся с тем же уникальным идентификатором + self.subscribe_call(request, guid) # Переподписываемся с тем же уникальным идентификатором self.ws_ready = True # Готов принимать запросы - self.on_ready() # Событие готовности к работе (Task) + self.on_ready() # Событие готовности к работе (Thread) - while True: # Получаем подписки до отмены - response_json = await self.ws_socket.recv() # Ожидаем следующую строку в виде JSON + while self.ws_running: # Получаем подписки до отмены + response_json = self.ws_socket.recv() # Ожидаем следующую строку в виде JSON try: response = loads(response_json) # Переводим JSON в словарь except JSONDecodeError: # Если вместо JSON сообщений получаем текст (проверка на всякий случай) @@ -2098,20 +2091,17 @@ async def websocket_handler(self): self.on_order(response) elif opcode == 'InstrumentsGetAndSubscribeV2': # Информация о финансовых инструментах self.on_symbol(response) - except CancelledError: # Задачу отменили - self.on_cancel() # Событие отмены и завершения (Task) - raise # Передаем исключение на родительский уровень WebSocketHandler except ConnectionClosed: # Отключились от сервера WebSockets - self.on_disconnect() # Событие отключения от сервера (Task) + self.on_disconnect() # Событие отключения от сервера (Thread) except (OSError, TimeoutError, MaxRetryError): # При системной ошибке, таймауте на websockets, достижении максимального кол-ва попыток подключения - self.on_timeout() # Событие таймаута/максимального кол-ва попыток подключения (Task) + self.on_timeout() # Событие таймаута/максимального кол-ва попыток подключения (Thread) except Exception as ex: # При других типах ошибок - self.on_error(f'Ошибка {ex}') # Событие ошибки (Task) + self.on_error(f'Ошибка {ex}') # Событие ошибки (Thread) finally: self.ws_ready = False # Не готов принимать запросы self.ws_socket = None # Сбрасываем подключение - async def subscribe_async(self, request, guid): + def subscribe_call(self, request, guid): """Отправка запроса (пере)подписки на сервер WebSocket :param request: Запрос @@ -2123,7 +2113,7 @@ async def subscribe_async(self, request, guid): self.subscriptions[guid] = request # Заносим подписку в справочник request['token'] = self.get_jwt_token() # Получаем JWT токен, ставим его в запрос request['guid'] = guid # Уникальный идентификатор подписки тоже ставим в запрос - await self.ws_socket.send(dumps(request)) # Отправляем запрос + self.ws_socket.send(dumps(request)) # Отправляем запрос # Выход и закрытие @@ -2136,8 +2126,11 @@ def __del__(self): def close_web_socket(self): """Закрытие соединения с сервером WebSocket""" + self.ws_running = False if self.ws_socket: # Если запущена задача управления подписками WebSocket - self.ws_task.cancel() # то отменяем задачу. Генерируем на ней исключение asyncio.CancelledError + self.ws_socket.close() # то отменяем закрываем соединение. + if self.cws_socket: # Если был открыт командный WebSocket + self.cws_socket.close() # Закрываем его # Функции конвертации