完整的 DIFF 协议 WebSocket 接入实现,提供可复用的 WebSocket 连接管理、业务截面管理和协议封装能力。
- ✅ WebSocket 连接管理 - 完整的连接生命周期管理
- ✅ 自动重连 - 断线自动重连,可配置重连次数和间隔
- ✅ 心跳检测 - ping/pong 心跳保活,自动检测连接断开
- ✅ peek_message 机制 - 自动发送 peek_message 获取数据更新
- ✅ 消息队列 - 离线消息自动缓存,连接成功后自动发送
- ✅ 业务截面管理 - 维护完整的业务状态镜像
- ✅ JSON Merge Patch - RFC 7386 标准的差分更新
- ✅ 事件监听 - 完善的事件系统(open/close/error/message/stateChange)
- ✅ DIFF 协议 - 完整的 DIFF 协议消息封装
- 行情订阅 -
subscribe_quote
实时行情订阅 - 交易指令 -
insert_order
下单、cancel_order
撤单 - 账户查询 - 实时账户、持仓、订单、成交数据
- 银期转账 -
req_transfer
银期转账 - 图表数据 -
set_chart
K线和Tick数据订阅 - 登录鉴权 -
req_login
用户登录
import WebSocketManager from '@/websocket'
// 或者
import { WebSocketManager, ConnectionState } from '@/websocket'
const ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: 'user123',
autoConnect: true,
logLevel: 'INFO'
})
// 连接成功
ws.on('open', () => {
console.log('WebSocket 已连接')
// 订阅行情
ws.subscribeQuote(['SHFE.cu2501', 'SHFE.ag2506'])
})
// 接收消息
ws.on('message', (message) => {
// 获取完整的业务截面
const snapshot = ws.getSnapshot()
console.log('账户信息:', snapshot.accounts)
console.log('持仓信息:', snapshot.positions)
console.log('订单信息:', snapshot.orders)
console.log('行情信息:', snapshot.quotes)
})
// 连接关闭
ws.on('close', (event) => {
console.log('WebSocket 已关闭', event)
})
// 连接错误
ws.on('error', (error) => {
console.error('WebSocket 错误', error)
})
// 状态变化
ws.on('stateChange', ({ oldState, newState }) => {
console.log(`状态变化: ${oldState} -> ${newState}`)
})
// 下单
ws.insertOrder({
user_id: 'user123',
order_id: 'order001',
instrument_id: 'SHFE.cu2501',
direction: 'BUY',
offset: 'OPEN',
volume: 1,
price_type: 'LIMIT',
limit_price: 50000
})
// 撤单
ws.cancelOrder('user123', 'order001')
ws.disconnect()
websocket/
├── index.js # 模块导出
├── WebSocketManager.js # WebSocket 连接管理器
├── SnapshotManager.js # 业务截面管理器
├── DiffProtocol.js # DIFF 协议封装
└── utils/
├── jsonMergePatch.js # JSON Merge Patch 实现
└── logger.js # 日志工具
WebSocket 连接管理器,负责:
- WebSocket 连接生命周期管理
- 自动重连和心跳检测
- 消息发送和接收
- 事件监听和触发
- 业务截面同步
业务截面管理器,负责:
- 维护业务截面(账户、持仓、订单、行情等)
- 应用 JSON Merge Patch 差分更新
- 监听截面变化
- 提供便捷的数据访问方法
DIFF 协议封装,负责:
- 创建各种协议消息
- 解析服务端消息
- 提取和判断消息类型
new WebSocketManager(options)
参数:
options.url
- WebSocket 服务器地址(默认:ws://localhost:8001/ws
)options.userId
- 用户IDoptions.autoConnect
- 自动连接(默认:true
)options.autoReconnect
- 自动重连(默认:true
)options.reconnectInterval
- 重连间隔(默认:3000
ms)options.reconnectMaxAttempts
- 最大重连次数(默认:10
)options.heartbeatInterval
- 心跳间隔(默认:5000
ms)options.heartbeatTimeout
- 心跳超时(默认:10000
ms)options.peekMessageDelay
- peek_message 延迟(默认:100
ms)options.logLevel
- 日志级别(默认:INFO
,可选:DEBUG/INFO/WARN/ERROR/NONE
)
连接 WebSocket。
ws.connect('user123')
.then(() => console.log('连接成功'))
.catch((error) => console.error('连接失败', error))
参数:
userId
- 用户ID(可选,如果已在构造函数中提供)
返回: Promise
断开 WebSocket 连接。
ws.disconnect()
订阅行情。
ws.subscribeQuote(['SHFE.cu2501', 'SHFE.ag2506'])
// 或
ws.subscribeQuote('SHFE.cu2501,SHFE.ag2506')
参数:
instruments
- 合约代码列表(数组或逗号分隔的字符串)
下单。
ws.insertOrder({
user_id: 'user123',
order_id: 'order001',
instrument_id: 'SHFE.cu2501',
direction: 'BUY', // BUY/SELL
offset: 'OPEN', // OPEN/CLOSE
volume: 1,
price_type: 'LIMIT', // LIMIT/MARKET
limit_price: 50000,
volume_condition: 'ANY', // 可选
time_condition: 'GFD' // 可选
})
参数:
order.user_id
- 用户IDorder.order_id
- 订单IDorder.instrument_id
- 合约代码order.direction
- 买卖方向(BUY/SELL)order.offset
- 开平标志(OPEN/CLOSE)order.volume
- 委托量order.price_type
- 价格类型(LIMIT/MARKET)order.limit_price
- 限价(LIMIT时必需)order.volume_condition
- 成交量条件(ANY/ALL,可选)order.time_condition
- 时间条件(GFD/GTD/GTC/IOC,可选)
撤单。
ws.cancelOrder('user123', 'order001')
参数:
userId
- 用户IDorderId
- 订单ID
订阅频道(扩展功能)。
ws.subscribe(['trade', 'account', 'position', 'order'])
参数:
channels
- 频道列表
监听事件。
const unsubscribe = ws.on('message', (message) => {
console.log('收到消息:', message)
})
// 取消监听
unsubscribe()
参数:
event
- 事件名称(open/close/error/message/stateChange)callback
- 回调函数
返回: 取消监听函数
获取完整的业务截面。
const snapshot = ws.getSnapshot()
console.log(snapshot.accounts) // 账户信息
console.log(snapshot.positions) // 持仓信息
console.log(snapshot.orders) // 订单信息
console.log(snapshot.trades) // 成交记录
console.log(snapshot.quotes) // 行情信息
console.log(snapshot.notify) // 通知信息
返回: 业务截面对象
获取业务截面管理器。
const snapshotManager = ws.getSnapshotManager()
// 监听特定路径的变化
snapshotManager.onPathChange('quotes.SHFE.cu2501', (newValue, oldValue) => {
console.log('cu2501 行情变化:', newValue)
})
// 获取特定数据
const account = snapshotManager.getAccount('CNY')
const position = snapshotManager.getPosition('SHFE.cu2501')
const order = snapshotManager.getOrder('order001')
const quote = snapshotManager.getQuote('SHFE.cu2501')
返回: SnapshotManager 实例
获取当前连接状态。
const state = ws.getState()
// DISCONNECTED/CONNECTING/CONNECTED/RECONNECTING/CLOSING
返回: 连接状态字符串
判断是否已连接。
if (ws.isConnected()) {
console.log('已连接')
}
返回: boolean
销毁 WebSocket 管理器。
ws.destroy()
应用 rtn_data 数据包。
const snapshot = snapshotManager.applyRtnData([
{ balance: 10000 },
{ quotes: { "SHFE.cu2501": { last_price: 50000 } } }
])
参数:
dataArray
- rtn_data.data 数组
返回: 更新后的业务截面
获取业务截面的某个字段。
const balance = snapshotManager.get('accounts.CNY.balance')
const lastPrice = snapshotManager.get('quotes.SHFE.cu2501.last_price')
参数:
path
- 字段路径(支持点分隔)
返回: 字段值
监听业务截面变更。
const unsubscribe = snapshotManager.onChange((newSnapshot, oldSnapshot) => {
console.log('截面更新:', newSnapshot)
})
// 取消监听
unsubscribe()
参数:
callback
- 回调函数(newSnapshot, oldSnapshot) => void
返回: 取消监听函数
监听特定路径的变更。
const unsubscribe = snapshotManager.onPathChange('quotes.SHFE.cu2501', (newValue, oldValue) => {
console.log('cu2501 行情变化:', newValue)
})
// 取消监听
unsubscribe()
参数:
path
- 监听路径callback
- 回调函数(newValue, oldValue) => void
返回: 取消监听函数
获取账户信息。
const account = snapshotManager.getAccount('CNY')
参数:
currency
- 货币类型(默认:CNY
)
返回: 账户对象
获取持仓信息。
const position = snapshotManager.getPosition('SHFE.cu2501')
参数:
instrumentId
- 合约ID
返回: 持仓对象
获取订单信息。
const order = snapshotManager.getOrder('order001')
参数:
orderId
- 订单ID
返回: 订单对象
获取行情信息。
const quote = snapshotManager.getQuote('SHFE.cu2501')
参数:
instrumentId
- 合约ID
返回: 行情对象
获取所有持仓。
const positions = snapshotManager.getAllPositions()
返回: 所有持仓对象
获取所有订单。
const orders = snapshotManager.getAllOrders()
返回: 所有订单对象
获取所有行情。
const quotes = snapshotManager.getAllQuotes()
返回: 所有行情对象
创建 peek_message 消息。
const message = protocol.createPeekMessage()
// {"aid": "peek_message"}
返回: JSON 字符串
创建 subscribe_quote 消息。
const message = protocol.createSubscribeQuote(['SHFE.cu2501', 'SHFE.ag2506'])
// {"aid": "subscribe_quote", "ins_list": "SHFE.cu2501,SHFE.ag2506"}
参数:
instruments
- 合约代码列表(数组或逗号分隔的字符串)
返回: JSON 字符串
创建 insert_order 消息。
const message = protocol.createInsertOrder({
user_id: 'user123',
order_id: 'order001',
instrument_id: 'SHFE.cu2501',
direction: 'BUY',
offset: 'OPEN',
volume: 1,
price_type: 'LIMIT',
limit_price: 50000
})
参数:
order
- 订单对象
返回: JSON 字符串
创建 cancel_order 消息。
const message = protocol.createCancelOrder('user123', 'order001')
// {"aid": "cancel_order", "user_id": "user123", "order_id": "order001"}
参数:
userId
- 用户IDorderId
- 订单ID
返回: JSON 字符串
解析接收到的消息。
const parsed = protocol.parseMessage('{"aid": "rtn_data", "data": [...]}')
参数:
message
- JSON 字符串
返回: 解析后的消息对象
判断消息类型。
const type = protocol.getMessageType(message)
// "rtn_data" / "pong" / "peek_message" / ...
参数:
message
- 消息对象
返回: 消息类型字符串
判断是否为 rtn_data 消息。
if (protocol.isRtnData(message)) {
// 处理数据
}
参数:
message
- 消息对象
返回: boolean
提取 rtn_data 的 data 数组。
const dataArray = protocol.extractRtnData(message)
参数:
message
- 消息对象
返回: data 数组
const DEFAULT_OPTIONS = {
url: 'ws://localhost:8001/ws', // WebSocket 服务器地址
userId: null, // 用户ID
autoConnect: true, // 自动连接
autoReconnect: true, // 自动重连
reconnectInterval: 3000, // 重连间隔(毫秒)
reconnectMaxAttempts: 10, // 最大重连次数
heartbeatInterval: 5000, // 心跳间隔(毫秒)
heartbeatTimeout: 10000, // 心跳超时(毫秒)
peekMessageDelay: 100, // peek_message 延迟(毫秒)
logLevel: 'INFO' // 日志级别
}
const ws = new WebSocketManager({
url: 'ws://192.168.1.100:8001/ws',
userId: 'user123',
autoConnect: false, // 手动连接
reconnectInterval: 5000, // 5秒重连间隔
reconnectMaxAttempts: 20, // 最多重连20次
heartbeatInterval: 3000, // 3秒心跳
logLevel: 'DEBUG' // 开启调试日志
})
// 稍后手动连接
ws.connect()
WebSocketManager 提供完善的事件系统,支持以下事件:
连接成功时触发。
ws.on('open', (event) => {
console.log('WebSocket 已连接')
})
连接关闭时触发。
ws.on('close', (event) => {
console.log('WebSocket 已关闭', event.code, event.reason)
})
连接错误时触发。
ws.on('error', (error) => {
console.error('WebSocket 错误', error)
})
收到消息时触发。
ws.on('message', (message) => {
console.log('收到消息:', message)
const type = message.aid
if (type === 'rtn_data') {
// 处理 rtn_data
} else if (type === 'pong') {
// 处理 pong
}
})
连接状态变化时触发。
ws.on('stateChange', ({ oldState, newState }) => {
console.log(`状态变化: ${oldState} -> ${newState}`)
if (newState === 'CONNECTED') {
// 连接成功
} else if (newState === 'RECONNECTING') {
// 正在重连
}
})
所有事件监听器都会返回一个取消监听函数:
const unsubscribe = ws.on('message', (message) => {
console.log(message)
})
// 取消监听
unsubscribe()
import WebSocketManager from '@/websocket'
const ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: 'user123'
})
ws.on('open', () => {
console.log('连接成功')
// 订阅行情
ws.subscribeQuote(['SHFE.cu2501', 'SHFE.ag2506'])
})
ws.on('message', (message) => {
// 获取行情数据
const snapshot = ws.getSnapshot()
const cuQuote = snapshot.quotes['SHFE.cu2501']
if (cuQuote) {
console.log('cu2501 最新价:', cuQuote.last_price)
console.log('cu2501 买一价:', cuQuote.bid_price1)
console.log('cu2501 卖一价:', cuQuote.ask_price1)
}
})
import WebSocketManager from '@/websocket'
const ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: 'user123'
})
ws.on('open', () => {
console.log('连接成功')
// 订阅行情
ws.subscribeQuote(['SHFE.cu2501'])
})
ws.on('message', (message) => {
const snapshot = ws.getSnapshot()
// 查看账户信息
const account = snapshot.accounts?.CNY
if (account) {
console.log('可用资金:', account.available)
console.log('持仓保证金:', account.margin)
console.log('风险率:', account.risk_ratio)
}
// 查看持仓
const position = snapshot.positions?.['SHFE.cu2501']
if (position) {
console.log('多头持仓:', position.volume_long)
console.log('空头持仓:', position.volume_short)
console.log('浮动盈亏:', position.float_profit)
}
// 查看订单
const orders = snapshot.orders || {}
for (const orderId in orders) {
const order = orders[orderId]
console.log(`订单 ${orderId}:`, order.status, order.volume_left)
}
})
// 下单
function placeOrder() {
ws.insertOrder({
user_id: 'user123',
order_id: `order_${Date.now()}`,
instrument_id: 'SHFE.cu2501',
direction: 'BUY',
offset: 'OPEN',
volume: 1,
price_type: 'LIMIT',
limit_price: 50000
})
}
// 撤单
function cancelOrder(orderId) {
ws.cancelOrder('user123', orderId)
}
import WebSocketManager from '@/websocket'
const ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: 'user123'
})
ws.on('open', () => {
const snapshotManager = ws.getSnapshotManager()
// 监听 cu2501 行情变化
snapshotManager.onPathChange('quotes.SHFE.cu2501', (newQuote, oldQuote) => {
if (newQuote && oldQuote) {
const priceDiff = newQuote.last_price - oldQuote.last_price
console.log('价格变化:', priceDiff)
}
})
// 监听账户余额变化
snapshotManager.onPathChange('accounts.CNY.balance', (newBalance, oldBalance) => {
console.log('余额变化:', newBalance - oldBalance)
})
// 监听订单状态变化
snapshotManager.onChange((newSnapshot, oldSnapshot) => {
const newOrders = newSnapshot.orders || {}
const oldOrders = oldSnapshot.orders || {}
for (const orderId in newOrders) {
const newOrder = newOrders[orderId]
const oldOrder = oldOrders[orderId]
if (!oldOrder || newOrder.status !== oldOrder.status) {
console.log(`订单 ${orderId} 状态变化:`, oldOrder?.status, '->', newOrder.status)
}
}
})
// 订阅行情
ws.subscribeQuote(['SHFE.cu2501'])
})
import WebSocketManager from '@/websocket'
const ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: 'user123',
autoConnect: false, // 不自动连接
autoReconnect: false // 不自动重连
})
// 手动连接
function connect() {
ws.connect()
.then(() => {
console.log('连接成功')
ws.subscribeQuote(['SHFE.cu2501'])
})
.catch((error) => {
console.error('连接失败', error)
})
}
// 手动断开
function disconnect() {
ws.disconnect()
}
// 手动重连
function reconnect() {
ws.reconnect()
}
// 监听状态变化
ws.on('stateChange', ({ newState }) => {
if (newState === 'DISCONNECTED') {
// 连接断开,可以显示重连按钮
showReconnectButton()
} else if (newState === 'CONNECTED') {
// 连接成功,隐藏重连按钮
hideReconnectButton()
}
})
<template>
<div>
<div>连接状态: {{ state }}</div>
<div>余额: {{ account?.balance }}</div>
<div>cu2501 最新价: {{ cuQuote?.last_price }}</div>
<button @click="placeOrder">下单</button>
</div>
</template>
<script>
import WebSocketManager from '@/websocket'
export default {
data() {
return {
ws: null,
state: 'DISCONNECTED',
snapshot: {}
}
},
computed: {
account() {
return this.snapshot.accounts?.CNY
},
cuQuote() {
return this.snapshot.quotes?.['SHFE.cu2501']
}
},
mounted() {
this.ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: this.$store.state.user.userId
})
this.ws.on('open', () => {
console.log('连接成功')
this.ws.subscribeQuote(['SHFE.cu2501'])
})
this.ws.on('message', () => {
// 更新 snapshot 触发 Vue 响应式更新
this.snapshot = { ...this.ws.getSnapshot() }
})
this.ws.on('stateChange', ({ newState }) => {
this.state = newState
})
},
methods: {
placeOrder() {
this.ws.insertOrder({
user_id: this.$store.state.user.userId,
order_id: `order_${Date.now()}`,
instrument_id: 'SHFE.cu2501',
direction: 'BUY',
offset: 'OPEN',
volume: 1,
price_type: 'LIMIT',
limit_price: 50000
})
}
},
beforeUnmount() {
if (this.ws) {
this.ws.destroy()
}
}
}
</script>
建议将业务截面存储在 Vuex 中,方便全局访问:
// store/websocket.js
export default {
state: {
ws: null,
snapshot: {},
connectionState: 'DISCONNECTED'
},
mutations: {
SET_WS(state, ws) {
state.ws = ws
},
SET_SNAPSHOT(state, snapshot) {
state.snapshot = snapshot
},
SET_CONNECTION_STATE(state, connectionState) {
state.connectionState = connectionState
}
},
actions: {
initWebSocket({ commit, rootState }) {
const ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: rootState.user.userId
})
ws.on('message', () => {
commit('SET_SNAPSHOT', ws.getSnapshot())
})
ws.on('stateChange', ({ newState }) => {
commit('SET_CONNECTION_STATE', newState)
})
commit('SET_WS', ws)
},
subscribeQuote({ state }, instruments) {
if (state.ws) {
state.ws.subscribeQuote(instruments)
}
},
insertOrder({ state }, order) {
if (state.ws) {
state.ws.insertOrder(order)
}
},
cancelOrder({ state }, { userId, orderId }) {
if (state.ws) {
state.ws.cancelOrder(userId, orderId)
}
}
},
getters: {
account: (state) => state.snapshot.accounts?.CNY,
positions: (state) => state.snapshot.positions || {},
orders: (state) => state.snapshot.orders || {},
quotes: (state) => state.snapshot.quotes || {},
quote: (state) => (instrumentId) => {
return state.snapshot.quotes?.[instrumentId]
},
position: (state) => (instrumentId) => {
return state.snapshot.positions?.[instrumentId]
}
}
}
在组件中使用:
<template>
<div>
<div>余额: {{ account?.balance }}</div>
<div>cu2501 最新价: {{ quote('SHFE.cu2501')?.last_price }}</div>
</div>
</template>
<script>
import { mapGetters, mapActions } from 'vuex'
export default {
computed: {
...mapGetters('websocket', ['account', 'quote'])
},
methods: {
...mapActions('websocket', ['subscribeQuote', 'insertOrder'])
},
mounted() {
this.subscribeQuote(['SHFE.cu2501'])
}
}
</script>
ws.on('error', (error) => {
console.error('WebSocket 错误:', error)
// 通知用户
this.$message.error('WebSocket 连接错误')
})
ws.on('close', (event) => {
if (!event.wasClean) {
// 异常关闭
console.error('连接异常关闭:', event.code, event.reason)
this.$message.warning('连接已断开,正在尝试重连...')
}
})
避免频繁更新 Vue 响应式数据:
// ❌ 不推荐:每次消息都更新
ws.on('message', () => {
this.snapshot = { ...ws.getSnapshot() } // 触发重新渲染
})
// ✅ 推荐:使用节流
import { throttle } from 'lodash'
ws.on('message', throttle(() => {
this.snapshot = { ...ws.getSnapshot() }
}, 100)) // 100ms 更新一次
使用路径监听:
// ❌ 不推荐:监听整个 snapshot
ws.on('message', () => {
const snapshot = ws.getSnapshot()
this.cuPrice = snapshot.quotes?.['SHFE.cu2501']?.last_price
})
// ✅ 推荐:只监听需要的字段
const snapshotManager = ws.getSnapshotManager()
snapshotManager.onPathChange('quotes.SHFE.cu2501.last_price', (newPrice) => {
this.cuPrice = newPrice
})
export default {
data() {
return {
ws: null,
unsubscribers: [] // 存储取消监听函数
}
},
mounted() {
this.ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: this.$store.state.user.userId
})
// 保存取消监听函数
this.unsubscribers.push(
this.ws.on('open', this.handleOpen),
this.ws.on('message', this.handleMessage),
this.ws.on('close', this.handleClose)
)
},
beforeUnmount() {
// 取消所有监听
this.unsubscribers.forEach(unsubscribe => unsubscribe())
// 销毁 WebSocket
if (this.ws) {
this.ws.destroy()
}
}
}
开发环境开启调试日志:
const ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: 'user123',
logLevel: process.env.NODE_ENV === 'development' ? 'DEBUG' : 'WARN'
})
生产环境关闭日志:
const ws = new WebSocketManager({
url: 'ws://localhost:8001/ws',
userId: 'user123',
logLevel: 'NONE' // 不输出任何日志
})
完整的 Vue 组件示例请参考 examples/
目录:
examples/basic-usage.js
- 基础用法examples/vue-component.vue
- Vue 组件集成examples/trading-component.vue
- 完整交易示例
A: 检查以下几点:
- WebSocket 服务器是否启动(
cargo run --bin qaexchange-server
) - URL 是否正确(默认:
ws://localhost:8001/ws
) - 是否提供了正确的
userId
- 浏览器控制台是否有错误信息
A:
- 检查是否已订阅行情(
ws.subscribeQuote(['SHFE.cu2501'])
) - 检查
peek_message
机制是否正常(应该自动发送) - 检查日志输出(设置
logLevel: 'DEBUG'
)
A:
WebSocketManager 默认开启自动重连,无需手动处理。可以通过 stateChange
事件监听连接状态:
ws.on('stateChange', ({ newState }) => {
if (newState === 'RECONNECTING') {
console.log('正在重连...')
} else if (newState === 'CONNECTED') {
console.log('重连成功')
}
})
A: 建议将 WebSocket 实例存储在 Vuex 中,参考上面的 使用 Vuex 存储业务截面 示例。
A: 使用节流(throttle)或防抖(debounce):
import { throttle } from 'lodash'
ws.on('message', throttle(() => {
this.snapshot = { ...ws.getSnapshot() }
}, 100)) // 100ms 更新一次
MIT