Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 38 additions & 39 deletions gdax/websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import hashlib
import time
from threading import Thread
from websocket import create_connection, WebSocketConnectionClosedException
import websocket
from pymongo import MongoClient
from gdax.gdax_auth import get_auth_headers
import ssl
import traceback


class WebsocketClient(object):
Expand All @@ -36,17 +38,23 @@ def __init__(self, url="wss://ws-feed.gdax.com", products=None, message_type="su
self.mongo_collection = mongo_collection

def start(self):
def _go():
self._connect()
self._listen()
self._disconnect()

self.stop = False
self.on_open()
self.thread = Thread(target=_go)
if self.ws:
self.ws.close()

self.ws = websocket.WebSocketApp(
self.url,
on_message=self._listen,
on_error=self.on_error,
on_close=self._disconnect,
on_open=self._connect
)
self.thread = Thread(target=self._go)
self.thread.start()

def _connect(self):
def _go(self):
self.ws.run_forever(ping_interval=25, ping_timeout=10, sslopt={"cert_reqs": ssl.CERT_NONE})

def _connect(self, ws):
if self.products is None:
self.products = ["BTC-USD"]
elif not isinstance(self.products, list):
Expand All @@ -63,9 +71,8 @@ def _connect(self):
if self.auth:
timestamp = str(time.time())
message = timestamp + 'GET' + '/users/self'
sub_params.update(get_auth_headers(timestamp, message, self.api_key, self.api_secret, self.api_passphrase))
sub_params.update(get_auth_headers(timestamp, message, self.api_key, self.api_secret, self.api_passphrase))

self.ws = create_connection(self.url)
self.ws.send(json.dumps(sub_params))

if self.type == "heartbeat":
Expand All @@ -74,35 +81,23 @@ def _connect(self):
sub_params = {"type": "heartbeat", "on": False}
self.ws.send(json.dumps(sub_params))

def _listen(self):
while not self.stop:
try:
if int(time.time() % 30) == 0:
# Set a 30 second ping to keep connection alive
self.ws.ping("keepalive")
data = self.ws.recv()
msg = json.loads(data)
except ValueError as e:
self.on_error(e)
except Exception as e:
self.on_error(e)
else:
self.on_message(msg)

def _disconnect(self):
if self.type == "heartbeat":
self.ws.send(json.dumps({"type": "heartbeat", "on": False}))
self.on_open()

def _listen(self, ws, message):
try:
if self.ws:
self.ws.close()
except WebSocketConnectionClosedException as e:
pass
msg = json.loads(message)
except ValueError as e:
self.on_error(ws, e)
except Exception as e:
self.on_error(ws, e)
else:
self.on_message(msg)

def _disconnect(self, ws):
self.on_close()

def close(self):
self.stop = True
self.thread.join()
self.ws.close()

def on_open(self):
if self.should_print:
Expand All @@ -118,10 +113,14 @@ def on_message(self, msg):
if self.mongo_collection: # dump JSON to given mongo collection
self.mongo_collection.insert_one(msg)

def on_error(self, e, data=None):
def on_error(self, ws, e):
self.error = e
self.stop
print('{} - data: {}'.format(e, data))
try:
self.thread.join()
except Exception:
pass
traceback.print_tb(e.__traceback__)
print(e)


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
bintrees==2.0.7
requests==2.13.0
six==1.10.0
websocket-client==0.40.0
pymongo==3.5.1
websocket-client==0.43.0
pymongo==3.5.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
'bintrees==2.0.7',
'requests==2.13.0',
'six==1.10.0',
'websocket-client==0.40.0',
'websocket-client==0.43.0',
'pymongo==3.5.1'
]

Expand Down