Skip to content

Commit b2d1b88

Browse files
authored
Merge pull request #166 from mcardillo55/orderbook_rework
Orderbook rework
2 parents d8a3a1a + 789bbab commit b2d1b88

File tree

3 files changed

+93
-78
lines changed

3 files changed

+93
-78
lines changed

README.md

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -346,16 +346,29 @@ python -m pytest
346346
```
347347

348348
### Real-time OrderBook
349-
The ```OrderBook``` subscribes to a websocket and keeps a real-time record of
350-
the orderbook for the product_id input. Please provide your feedback for future
349+
The ```OrderBook``` is a convenient data structure to keep a real-time record of
350+
the orderbook for the product_id input. It processes incoming messages from an
351+
already existing WebsocketClient. Please provide your feedback for future
351352
improvements.
352353

353354
```python
354-
import cbpro, time
355-
order_book = cbpro.OrderBook(product_id='BTC-USD')
356-
order_book.start()
355+
import cbpro, time, Queue
356+
class myWebsocketClient(cbpro.WebsocketClient):
357+
def on_open(self):
358+
self.products = ['BTC-USD', 'ETH-USD']
359+
self.order_book_btc = OrderBookConsole(product_id='BTC-USD')
360+
self.order_book_eth = OrderBookConsole(product_id='ETH-USD')
361+
def on_message(self, msg):
362+
self.order_book_btc.process_message(msg)
363+
self.order_book_eth.process_message(msg)
364+
365+
wsClient = myWebsocketClient()
366+
wsClient.start()
357367
time.sleep(10)
358-
order_book.close()
368+
while True:
369+
print(wsClient.order_book_btc.get_ask())
370+
print(wsClient.order_book_eth.get_bid())
371+
time.sleep(1)
359372
```
360373

361374
### Testing

cbpro/order_book.py

Lines changed: 72 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
from cbpro.websocket_client import WebsocketClient
1313

1414

15-
class OrderBook(WebsocketClient):
15+
class OrderBook(object):
1616
def __init__(self, product_id='BTC-USD', log_to=None):
17-
super(OrderBook, self).__init__(
18-
products=product_id, channels=['full'])
1917
self._asks = SortedDict()
2018
self._bids = SortedDict()
2119
self._client = PublicClient()
@@ -24,18 +22,7 @@ def __init__(self, product_id='BTC-USD', log_to=None):
2422
if self._log_to:
2523
assert hasattr(self._log_to, 'write')
2624
self._current_ticker = None
27-
28-
@property
29-
def product_id(self):
30-
''' Currently OrderBook only supports a single product even though it is stored as a list of products. '''
31-
return self.products[0]
32-
33-
def on_open(self):
34-
self._sequence = -1
35-
print("-- Subscribed to OrderBook! --\n")
36-
37-
def on_close(self):
38-
print("\n-- OrderBook Socket Closed! --")
25+
self.product_id = product_id
3926

4027
def reset_book(self):
4128
self._asks = SortedDict()
@@ -57,39 +44,37 @@ def reset_book(self):
5744
})
5845
self._sequence = res['sequence']
5946

60-
def on_message(self, message):
61-
if self._log_to:
62-
pickle.dump(message, self._log_to)
47+
def process_message(self, message):
48+
if message.get('product_id') == self.product_id:
49+
if self._log_to:
50+
pickle.dump(message, self._log_to)
6351

64-
sequence = message.get('sequence', -1)
65-
if self._sequence == -1:
66-
self.reset_book()
67-
return
68-
if sequence <= self._sequence:
69-
# ignore older messages (e.g. before order book initialization from getProductOrderBook)
70-
return
71-
elif sequence > self._sequence + 1:
72-
self.on_sequence_gap(self._sequence, sequence)
73-
return
52+
sequence = message.get('sequence', -1)
53+
if self._sequence == -1:
54+
self.reset_book()
55+
return
56+
if sequence <= self._sequence:
57+
# ignore older messages (e.g. before order book initialization from getProductOrderBook)
58+
return
59+
elif sequence > self._sequence + 1:
60+
self.on_sequence_gap(self._sequence, sequence)
61+
return
7462

75-
msg_type = message['type']
76-
if msg_type == 'open':
77-
self.add(message)
78-
elif msg_type == 'done' and 'price' in message:
79-
self.remove(message)
80-
elif msg_type == 'match':
81-
self.match(message)
82-
self._current_ticker = message
83-
elif msg_type == 'change':
84-
self.change(message)
63+
msg_type = message['type']
64+
if msg_type == 'open':
65+
self.add(message)
66+
elif msg_type == 'done' and 'price' in message:
67+
self.remove(message)
68+
elif msg_type == 'match':
69+
self.match(message)
70+
self._current_ticker = message
71+
elif msg_type == 'change':
72+
self.change(message)
8573

86-
self._sequence = sequence
74+
self._sequence = sequence
8775

8876
def on_sequence_gap(self, gap_start, gap_end):
8977
self.reset_book()
90-
print('Error: messages missing ({} - {}). Re-initializing book at sequence.'.format(
91-
gap_start, gap_end, self._sequence))
92-
9378

9479
def add(self, order):
9580
order = {
@@ -249,7 +234,6 @@ def set_bids(self, price, bids):
249234
import time
250235
import datetime as dt
251236

252-
253237
class OrderBookConsole(OrderBook):
254238
''' Logs real-time changes to the bid-ask spread to the console '''
255239

@@ -262,38 +246,55 @@ def __init__(self, product_id=None):
262246
self._bid_depth = None
263247
self._ask_depth = None
264248

265-
def on_message(self, message):
266-
super(OrderBookConsole, self).on_message(message)
267-
268-
# Calculate newest bid-ask spread
269-
bid = self.get_bid()
270-
bids = self.get_bids(bid)
271-
bid_depth = sum([b['size'] for b in bids])
272-
ask = self.get_ask()
273-
asks = self.get_asks(ask)
274-
ask_depth = sum([a['size'] for a in asks])
275-
276-
if self._bid == bid and self._ask == ask and self._bid_depth == bid_depth and self._ask_depth == ask_depth:
277-
# If there are no changes to the bid-ask spread since the last update, no need to print
278-
pass
279-
else:
280-
# If there are differences, update the cache
281-
self._bid = bid
282-
self._ask = ask
283-
self._bid_depth = bid_depth
284-
self._ask_depth = ask_depth
285-
print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(
286-
dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask))
287-
288-
order_book = OrderBookConsole()
289-
order_book.start()
249+
def process_message(self, message):
250+
if message.get('product_id') == self.product_id:
251+
super(OrderBookConsole, self).process_message(message)
252+
253+
try:
254+
# Calculate newest bid-ask spread
255+
bid = self.get_bid()
256+
bids = self.get_bids(bid)
257+
bid_depth = sum([b['size'] for b in bids])
258+
ask = self.get_ask()
259+
asks = self.get_asks(ask)
260+
ask_depth = sum([a['size'] for a in asks])
261+
262+
if self._bid == bid and self._ask == ask and self._bid_depth == bid_depth and self._ask_depth == ask_depth:
263+
# If there are no changes to the bid-ask spread since the last update, no need to print
264+
pass
265+
else:
266+
# If there are differences, update the cache
267+
self._bid = bid
268+
self._ask = ask
269+
self._bid_depth = bid_depth
270+
self._ask_depth = ask_depth
271+
print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(
272+
dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask))
273+
except Exception:
274+
pass
275+
276+
class WebsocketConsole(WebsocketClient):
277+
def on_open(self):
278+
self.products = ['BTC-USD', 'ETH-USD']
279+
self.order_book_btc = OrderBookConsole(product_id='BTC-USD')
280+
self.order_book_eth = OrderBookConsole(product_id='ETH-USD')
281+
282+
def on_message(self, msg):
283+
self.order_book_btc.process_message(msg)
284+
self.order_book_eth.process_message(msg)
285+
286+
wsClient = WebsocketConsole()
287+
wsClient.start()
288+
time.sleep(10)
290289
try:
291290
while True:
292-
time.sleep(10)
291+
pass
293292
except KeyboardInterrupt:
294-
order_book.close()
293+
wsClient.close()
294+
except Exception:
295+
pass
295296

296-
if order_book.error:
297+
if wsClient.error:
297298
sys.exit(1)
298299
else:
299300
sys.exit(0)

contributors.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ Leonard Lin
33
Jeff Gibson
44
David Caseria
55
Paul Mestemaker
6-
Drew Rice
6+
Drew Rice
7+
Mike Cardillo

0 commit comments

Comments
 (0)