Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 51 additions & 58 deletions AlorPy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
from typing import Union, Any # Объединение типов, любой тип
from math import log10 # Кол-во десятичных знаков будем получать из шага цены через десятичный логарифм
from datetime import datetime, UTC
from time import time_ns # Текущее время в наносекундах, прошедших с 01.01.1970 UTC
from time import time_ns, sleep # Текущее время в наносекундах, прошедших с 01.01.1970 UTC
from uuid import uuid4 # Номера подписок должны быть уникальными во времени и пространстве
from json import loads, JSONDecodeError, dumps # Сервер WebSockets работает с JSON сообщениями
from asyncio import get_event_loop, create_task, run, CancelledError # Работа с асинхронными функциями
from threading import Thread # Подписки сервера WebSockets будем получать в отдельном потоке

from pytz import timezone, utc # Работаем с временнОй зоной и UTC
import requests.adapters # Настройки запросов/ответов
from requests import post, get, put, delete, Response # Запросы/ответы от сервера запросов
from jwt import decode # Декодирование токена JWT для получения договоров и портфелей
from urllib3.exceptions import MaxRetryError # Соединение с сервером не установлено за максимальное кол-во попыток подключения
from websockets import connect, ConnectionClosed # Работа с сервером WebSockets
from websockets.sync.client import connect
from websockets.exceptions import ConnectionClosed # Работа с сервером WebSockets

from AlorPy import Config # Файл конфигурации

Expand All @@ -40,8 +40,10 @@ def __init__(self, refresh_token=Config.refresh_token, demo=False):
self.cws_socket = None # Подключение к серверу WebSocket
self.ws_server = f'wss://api{"dev" if demo else ""}.alor.ru/ws' # Сервис подписок и событий WebSocket
self.ws_socket = None # Подключение к серверу WebSocket
self.ws_task = None # Задача управления подписками WebSocket
self.ws_thread = None # Поток получения сообщений из WebSocket
self.ws_ready = False # WebSocket готов принимать запросы
self.ws_running = False # Флаг управления потоком WebSocket
self.ws_reconnect_timeout = 5 # Задержка между попытками подключиться к серверу, секунды

# События Alor OpenAPI V2
self.on_change_order_book = self.default_handler # Биржевой стакан
Expand All @@ -58,16 +60,15 @@ def __init__(self, refresh_token=Config.refresh_token, demo=False):
self.on_order = self.default_handler # Заявки
self.on_symbol = self.default_handler # Информация о финансовых инструментах

# События WebSocket Thread/Task
self.on_entering = lambda: self.logger.debug(f'WebSocket Thread: Запуск') # Начало входа (Thread)
# События WebSocket Thread
self.on_entering = lambda: self.logger.debug(f'WebSocket Main: Запуск') # Начало входа (Main)
self.on_enter = lambda: self.logger.debug(f'WebSocket Thread: Запущен') # Вход (Thread)
self.on_connect = lambda: self.logger.debug(f'WebSocket Task: Подключен к серверу') # Подключение к серверу (Task)
self.on_resubscribe = lambda: self.logger.debug(f'WebSocket Task: Возобновление подписок ({len(self.subscriptions)})') # Возобновление подписок (Task)
self.on_ready = lambda: self.logger.debug(f'WebSocket Task: Готов') # Готовность к работе (Task)
self.on_disconnect = lambda: self.logger.debug(f'WebSocket Task: Отключен от сервера') # Отключение от сервера (Task)
self.on_timeout = lambda: self.logger.debug(f'WebSocket Task: Таймаут') # Таймаут/максимальное кол-во попыток подключения (Task)
self.on_error = lambda response: self.logger.debug(f'WebSocket Task: {response}') # Ошибка (Task)
self.on_cancel = lambda: self.logger.debug(f'WebSocket Task: Отмена') # Отмена (Task)
self.on_connect = lambda: self.logger.debug(f'WebSocket Thread: Подключен к серверу') # Подключение к серверу (Thread)
self.on_resubscribe = lambda: self.logger.debug(f'WebSocket Thread: Возобновление подписок ({len(self.subscriptions)})') # Возобновление подписок (Thread)
self.on_ready = lambda: self.logger.debug(f'WebSocket Thread: Готов') # Готовность к работе (Thread)
self.on_disconnect = lambda: self.logger.debug(f'WebSocket Thread: Отключен от сервера') # Отключение от сервера (Thread)
self.on_timeout = lambda: self.logger.debug(f'WebSocket Thread: Таймаут') # Таймаут/максимальное кол-во попыток подключения (Thread)
self.on_error = lambda response: self.logger.debug(f'WebSocket Thread: {response}') # Ошибка (Thread)
self.on_exit = lambda: self.logger.debug(f'WebSocket Thread: Завершение') # Выход (Thread)

self.refresh_token = refresh_token # Токен
Expand Down Expand Up @@ -1566,7 +1567,7 @@ def unsubscribe(self, guid): # https://alor.dev/docs/api/websocket/data-subscri
:return: Уникальный идентификатор подписки
"""
request = {'opcode': 'unsubscribe', 'token': str(self.get_jwt_token()), 'guid': guid} # Запрос на отмену подписки
get_event_loop().run_until_complete(self.ws_socket.send(dumps(request))) # Отправляем запрос. Дожидаемся его выполнения
self.ws_socket.send(dumps(request)) # Отправляем запрос. Дожидаемся его выполнения
del self.subscriptions[guid] # Удаляем подписку из справочника
return self.subscribe(request) # Отправляем запрос, возвращаем уникальный идентификатор подписки

Expand Down Expand Up @@ -1955,25 +1956,17 @@ def check_result(self, response):
# Запросы WebSocket

def send_websocket(self, request):
"""Отправка запроса WebSocket

:param request: Запрос JSON
:return: JSON, текст, None в случае веб ошибки
"""
response = get_event_loop().run_until_complete(self.send_websocket_async(request)) # Запускаем асинхронную фукнцию с параметрами. Дожидаемся выполнения. Получаем результат
return self.check_websocket_result(response) # Возвращаем результат после анализа

async def send_websocket_async(self, request):
"""Отправка асинхронного запроса WebSocket
"""Отправка запроса через командный WebSocket

:param request: Запрос JSON
:return: Ответ JSON
"""
if not self.cws_socket: # Если не было подключения к серверу WebSocket
self.cws_socket = await connect(self.cws_server) # то пробуем к нему подключиться
self.cws_socket = connect(self.cws_server) # то пробуем к нему подключиться
request['guid'] = str(uuid4()) # Получаем уникальный идентификатор запроса, ставим его в запрос
await self.cws_socket.send(dumps(request)) # Переводим JSON в строку, отправляем запрос
return await self.cws_socket.recv() # Дожидаемся ответа, возвращаем его
self.cws_socket.send(dumps(request)) # Переводим JSON в строку, отправляем запрос
response = self.cws_socket.recv() # Дожидаемся ответа, возвращаем его
return self.check_websocket_result(response)

def check_websocket_result(self, response):
"""Анализ результата запроса WebSocket
Expand Down Expand Up @@ -2003,29 +1996,29 @@ def subscribe(self, request) -> str:
:param request request: Запрос
:return: Уникальный идентификатор подписки
"""
if not self.ws_ready: # Если WebSocket не готов принимать запросы
self.on_entering() # Событие начала входа (Thread)
Thread(target=run, args=(self.websocket_async(),)).start() # Создаем и запускаем поток управления подписками
if not self.ws_ready and not self.ws_running: # Если WebSocket не готов принимать запросы
self.ws_running = True # Запуск потока только один раз
self.on_entering() # Событие начала входа (Main)
self.ws_thread = Thread(target=self.websocket_loop)
self.ws_thread.start() # Создаем и запускаем поток управления подписками
while not self.ws_ready: # Подключение к серверу WebSocket выполняется в отдельном потоке
pass # Подождем, пока WebSocket не будет готов принимать запросы
sleep(self.ws_reconnect_timeout/10) # Подождем, пока WebSocket не будет готов принимать запросы

guid = str(uuid4()) # Уникальный идентификатор подписки
thread = Thread(target=run, args=(self.subscribe_async(request, guid),)) # Поток подписки
thread.start() # Запускаем
thread.join() # Ожидаем завершения
self.subscribe_call(request, guid)
return guid

async def websocket_async(self):
def websocket_loop(self):
"""Запуск и управление задачей подписок"""
self.on_enter() # Событие входа (Thread)
while True: # Будем держать соединение с сервером WebSocket до отмены
self.ws_task = create_task(self.websocket_handler()) # Запускаем задачу (Task) подключения к серверу WebSocket и получения с него подписок
try:
await self.ws_task # Ожидаем отмены задачи
except CancelledError: # Если задачу отменили
break # то выходим, дальше не продолжаем
while self.ws_running: # Будем держать соединение с сервером WebSocket до отмены
self.websocket_handler() # Запускаем функцию подключения к серверу WebSocket и получения с него подписок
if self.ws_running: # Если отключение не было запрошено пользователем
self.logger.debug(f'websocket_loop: попытка переподключения к вебсокету через {self.ws_reconnect_timeout} секунд')
sleep(self.ws_reconnect_timeout) # выжидаем время до следующей попытки подключиться
self.on_exit() # Событие выхода (Thread)

async def websocket_handler(self):
def websocket_handler(self):
"""
- Подключение к серверу WebSocket
- Переподключение к серверу WebSocket. Возобновление подписок, если требуется
Expand All @@ -2036,18 +2029,18 @@ async def websocket_handler(self):
# Но подключение сбрасывается, если в очереди соединения находится более 5000 непрочитанных сообщений
# Это может быть из-за медленного компьютера или слабого канала связи
# В любом из этих случаев создание дополнительных подключений проблему не решит
self.ws_socket = await connect(self.ws_server) # Пробуем подключиться к серверу WebSocket
self.on_connect() # Событие подключения к серверу (Task)
self.ws_socket = connect(self.ws_server) # Пробуем подключиться к серверу WebSocket
self.on_connect() # Событие подключения к серверу (Thread)

if len(self.subscriptions) > 0: # Если есть подписки, то будем их возобновлять
self.on_resubscribe() # Событие возобновления подписок (Task)
self.on_resubscribe() # Событие возобновления подписок (Thread)
for guid, request in self.subscriptions.items(): # Пробегаемся по всем подпискам
await self.subscribe_async(request, guid) # Переподписываемся с тем же уникальным идентификатором
self.subscribe_call(request, guid) # Переподписываемся с тем же уникальным идентификатором
self.ws_ready = True # Готов принимать запросы
self.on_ready() # Событие готовности к работе (Task)
self.on_ready() # Событие готовности к работе (Thread)

while True: # Получаем подписки до отмены
response_json = await self.ws_socket.recv() # Ожидаем следующую строку в виде JSON
while self.ws_running: # Получаем подписки до отмены
response_json = self.ws_socket.recv() # Ожидаем следующую строку в виде JSON
try:
response = loads(response_json) # Переводим JSON в словарь
except JSONDecodeError: # Если вместо JSON сообщений получаем текст (проверка на всякий случай)
Expand Down Expand Up @@ -2098,20 +2091,17 @@ async def websocket_handler(self):
self.on_order(response)
elif opcode == 'InstrumentsGetAndSubscribeV2': # Информация о финансовых инструментах
self.on_symbol(response)
except CancelledError: # Задачу отменили
self.on_cancel() # Событие отмены и завершения (Task)
raise # Передаем исключение на родительский уровень WebSocketHandler
except ConnectionClosed: # Отключились от сервера WebSockets
self.on_disconnect() # Событие отключения от сервера (Task)
self.on_disconnect() # Событие отключения от сервера (Thread)
except (OSError, TimeoutError, MaxRetryError): # При системной ошибке, таймауте на websockets, достижении максимального кол-ва попыток подключения
self.on_timeout() # Событие таймаута/максимального кол-ва попыток подключения (Task)
self.on_timeout() # Событие таймаута/максимального кол-ва попыток подключения (Thread)
except Exception as ex: # При других типах ошибок
self.on_error(f'Ошибка {ex}') # Событие ошибки (Task)
self.on_error(f'Ошибка {ex}') # Событие ошибки (Thread)
finally:
self.ws_ready = False # Не готов принимать запросы
self.ws_socket = None # Сбрасываем подключение

async def subscribe_async(self, request, guid):
def subscribe_call(self, request, guid):
"""Отправка запроса (пере)подписки на сервер WebSocket

:param request: Запрос
Expand All @@ -2123,7 +2113,7 @@ async def subscribe_async(self, request, guid):
self.subscriptions[guid] = request # Заносим подписку в справочник
request['token'] = self.get_jwt_token() # Получаем JWT токен, ставим его в запрос
request['guid'] = guid # Уникальный идентификатор подписки тоже ставим в запрос
await self.ws_socket.send(dumps(request)) # Отправляем запрос
self.ws_socket.send(dumps(request)) # Отправляем запрос

# Выход и закрытие

Expand All @@ -2136,8 +2126,11 @@ def __del__(self):

def close_web_socket(self):
"""Закрытие соединения с сервером WebSocket"""
self.ws_running = False
if self.ws_socket: # Если запущена задача управления подписками WebSocket
self.ws_task.cancel() # то отменяем задачу. Генерируем на ней исключение asyncio.CancelledError
self.ws_socket.close() # то отменяем закрываем соединение.
if self.cws_socket: # Если был открыт командный WebSocket
self.cws_socket.close() # Закрываем его

# Функции конвертации

Expand Down