import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'package:decimal/decimal.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import '../config/app_config.dart'; /// WebSocket 连接状态 enum WsConnectionState { connecting, connected, disconnected, reconnecting } /// WebSocket 消息类型 enum WsChannel { ticker, orderBook, trade } /// WebSocket 客户端 /// Mock 模式:使用 Timer.periodic 模拟实时数据流 /// 生产模式:对接真实 WebSocket 服务器 class WsClient { WsClient({ required bool mockMode, String? wsUrl, this.onPersistentFailure, }) : _mockMode = mockMode, _wsUrl = wsUrl ?? AppConfig.effectiveWsUrl { _connect(); } final bool _mockMode; String _wsUrl; final _random = Random(); /// 持续重连失败回调:连续 [_failoverNotifyCount] 次重连失败后触发一次, /// 由外部(ws_provider)桥接到 nodeProvider.reportFailure(immediate: true), /// 用于联动节点切换。reconnectWithUrl 后失败计数清零,可再次触发。 final void Function()? onPersistentFailure; // ── 常量 ──────────────────────────────────────────────── static const int _heartbeatIntervalSec = 15; static const int _heartbeatTimeoutSec = 30; static const int _maxReconnectCount = 50; static const int _lowFreqReconnectSec = 120; /// 连续失败到第几次时通知外部尝试切节点 static const int _failoverNotifyCount = 2; // ── Stream Controllers ──────────────────────────────── final _connStateCtrl = StreamController.broadcast(); final _tickerCtrl = StreamController>.broadcast(); final _orderBookCtrl = StreamController>.broadcast(); final _tradeCtrl = StreamController>.broadcast(); final _klineCtrl = StreamController>.broadcast(); final _markCtrl = StreamController>.broadcast(); // K线历史查询回调:reqId → Completer final _klineReqCompleters = >>>{}; // ── State ───────────────────────────────────────────── WsConnectionState _state = WsConnectionState.disconnected; final _subscribedTopics = {}; // 已订阅的完整 topic final _subscribedSymbols = {}; // 已订阅的 symbol(mock 用) final _mockPrices = {}; final _mockChanges = {}; int _reconnectCount = 0; Timer? _mockDataTimer; Timer? _reconnectTimer; Timer? _simulateDisconnectTimer; Timer? _heartbeatTimer; DateTime? _lastMessageTime; // 生产 WS WebSocketChannel? _channel; StreamSubscription? _channelSub; int _reqId = 0; // ── Public Streams ──────────────────────────────────── Stream get connectionStream => _connStateCtrl.stream; Stream> get tickerStream => _tickerCtrl.stream; Stream> get orderBookStream => _orderBookCtrl.stream; Stream> get tradeStream => _tradeCtrl.stream; Stream> get klineStream => _klineCtrl.stream; Stream> get markStream => _markCtrl.stream; WsConnectionState get currentState => _state; // ── Connection Management ───────────────────────────── void _setState(WsConnectionState s) { _state = s; if (!_connStateCtrl.isClosed) _connStateCtrl.add(s); } void _connect() { if (_mockMode) { _connectMock(); } else { _connectReal(); } } // ══════════════════════════════════════════════════════ // 生产模式:真实 WebSocket 连接 // ══════════════════════════════════════════════════════ /// 当前 WebSocket 地址 String get currentWsUrl => _wsUrl; /// 使用新的 wsUrl 重新连接(节点切换时调用) void reconnectWithUrl(String url) { _wsUrl = url; _stopHeartbeat(); _channelSub?.cancel(); _channel?.sink.close(); _reconnectTimer?.cancel(); _reconnectCount = 0; _connectReal(); } void _connectReal() async { _setState(WsConnectionState.connecting); try { final uri = Uri.parse(_wsUrl); _channel = WebSocketChannel.connect(uri); // 等待 WebSocket 握手完成,避免握手未完成就发送订阅消息 await _channel!.ready; // 握手完成后再标记为 connected if (_connStateCtrl.isClosed) return; print('[WsClient] connected to $_wsUrl'); _setState(WsConnectionState.connected); _reconnectCount = 0; _lastMessageTime = DateTime.now(); // 先注册监听,再发送订阅 _channelSub = _channel!.stream.listen( (raw) { _lastMessageTime = DateTime.now(); _onMessage(raw); }, onError: (error) { _onDisconnect(); }, onDone: () { _onDisconnect(); }, ); // 重连后重新订阅之前的 topic if (_subscribedTopics.isNotEmpty) { _sendSub(_subscribedTopics.toList()); } // 启动心跳检测 _startHeartbeat(); } catch (e) { _onDisconnect(); } } // ── 心跳机制 ──────────────────────────────────────────── void _startHeartbeat() { _heartbeatTimer?.cancel(); _heartbeatTimer = Timer.periodic( const Duration(seconds: _heartbeatIntervalSec), (_) { if (_state != WsConnectionState.connected) return; // 超时检测:超过 _heartbeatTimeoutSec 秒未收到任何消息 final now = DateTime.now(); if (_lastMessageTime != null && now.difference(_lastMessageTime!).inSeconds > _heartbeatTimeoutSec) { print('[WsClient] heartbeat timeout, triggering reconnect'); _onDisconnect(); return; } // 发送 ping try { _channel?.sink.add( jsonEncode({'ping': now.millisecondsSinceEpoch}), ); } catch (_) { _onDisconnect(); } }, ); } void _stopHeartbeat() { _heartbeatTimer?.cancel(); _heartbeatTimer = null; } /// 处理服务器推送的消息,按 ch/rep 字段分发到对应 StreamController void _onMessage(dynamic raw) { if (raw is! String) return; final Map msg; try { msg = jsonDecode(raw) as Map; } catch (_) { return; } // pong 响应,忽略(心跳已通过 _lastMessageTime 更新) if (msg.containsKey('pong')) return; // K线历史查询响应(rep 字段) if (msg.containsKey('rep')) { _handleKlineHistoryResponse(msg); return; } // ACK 响应(sub/unsub 确认),忽略 if (msg.containsKey('status')) return; final ch = msg['ch'] as String?; if (ch == null) return; final tick = msg['tick'] as Map?; if (tick == null) return; // 根据 ch 中的主题类型分发 if (ch.endsWith('.ticket')) { _dispatchTicker(ch, tick); } else if (ch.endsWith('.mark')) { _dispatchMark(ch, tick); } else if (ch.contains('.kline.')) { _dispatchKline(ch, tick); } else if (ch.endsWith('.depth')) { _dispatchDepth(ch, tick); } else if (ch.endsWith('.trade')) { _dispatchTrade(ch, tick); } } /// 解析 K线实时推送: tick.k 包含 t/o/c/h/l/v/q void _dispatchKline(String ch, Map tick) { final k = tick['k'] as Map?; if (k == null) return; final parts = ch.split('.'); if (parts.length < 4) return; final symbol = parts[1].toUpperCase(); final interval = parts[3]; if (!_klineCtrl.isClosed) { _klineCtrl.add({ 'symbol': symbol, 'interval': interval, 'time': k['t'] as int? ?? 0, 'open': double.tryParse('${k['o']}') ?? 0, 'close': double.tryParse('${k['c']}') ?? 0, 'high': double.tryParse('${k['h']}') ?? 0, 'low': double.tryParse('${k['l']}') ?? 0, 'volume': double.tryParse('${k['v']}') ?? 0, }); } } /// 处理 K线历史查询响应,完成对应的 Completer void _handleKlineHistoryResponse(Map msg) { final id = msg['id'] as String?; final completer = id != null ? _klineReqCompleters.remove(id) : null; if (completer == null) return; if (msg['status'] == 'error') { completer.complete([]); return; } final data = msg['data'] as List? ?? []; completer.complete(data.cast>()); } /// 解析盘口推送: tick.bids/asks 各 20 档,每档 [price, qty] void _dispatchDepth(String ch, Map tick) { final parts = ch.split('.'); if (parts.length < 3) return; final symbol = parts[1].toUpperCase(); final rawBids = tick['bids'] as List? ?? []; final rawAsks = tick['asks'] as List? ?? []; List> parseEntries(List raw) { return raw .whereType>() .map((e) => { 'price': double.tryParse('${e[0]}') ?? 0.0, 'quantity': double.tryParse('${e[1]}') ?? 0.0, }) .toList(); } if (!_orderBookCtrl.isClosed) { _orderBookCtrl.add({ 'symbol': symbol, 'bids': parseEntries(rawBids), 'asks': parseEntries(rawAsks), }); } } /// 解析成交推送: tick.p=价格, tick.q=数量, tick.m=是否主动卖出 void _dispatchTrade(String ch, Map tick) { final parts = ch.split('.'); if (parts.length < 3) return; final symbol = parts[1].toUpperCase(); if (!_tradeCtrl.isClosed) { _tradeCtrl.add({ 'symbol': symbol, 'price': double.tryParse('${tick['p']}') ?? 0, 'quantity': double.tryParse('${tick['q']}') ?? 0, 'isBuyerMaker': tick['m'] == true, 'time': tick['T'] as int? ?? 0, 'tradeId': tick['a']?.toString() ?? '', }); } } /// 解析 ticker 推送,使用 Decimal 精确解析金融数值。 /// API 返回: tick.c=最新价(string), tick.o=24h开盘价, tick.q=成交额 /// 涨跌幅用 Decimal 计算后再转 double 传给 UI。 void _dispatchTicker(String ch, Map tick) { // 从 ch 提取 symbol: "market.btcusdt.ticket" → "BTCUSDT" final parts = ch.split('.'); if (parts.length < 3) return; final symbol = parts[1].toUpperCase(); // 用 Decimal.parse 保留原始精度,避免浮点误差 final close = Decimal.tryParse('${tick['c']}') ?? Decimal.zero; final open = Decimal.tryParse('${tick['o']}') ?? Decimal.zero; final high = Decimal.tryParse('${tick['h']}') ?? Decimal.zero; final low = Decimal.tryParse('${tick['l']}') ?? Decimal.zero; final volume = Decimal.tryParse('${tick['v']}') ?? Decimal.zero; final turnover = Decimal.tryParse('${tick['q']}') ?? Decimal.zero; // 24h 涨跌幅 = (最新价 - 开盘价) / 开盘价 * 100 final change24h = open > Decimal.zero ? ((close - open) / open) .toDecimal(scaleOnInfinitePrecision: 6) .toDouble() * 100 : 0.0; if (!_tickerCtrl.isClosed) { _tickerCtrl.add({ 'symbol': symbol, 'price': close.toDouble(), // UI 展示用 double 足够 'priceStr': '${tick['c']}', // WS 原始价格字符串,保留精度 'high24h': high.toDouble(), // 24h 最高价 'low24h': low.toDouble(), // 24h 最低价 'volume24h': turnover.toDouble(), // tick.q 成交额(USDT) 'volumeCoin': volume.toDouble(), // tick.v 成交量(张/币) 'change24h': change24h, }); } } /// 解析标记价格/资金费率推送: market.{symbol}.mark /// tick.p=标记价格, tick.r=资金费率, tick.T=下次结算时间(秒或毫秒) void _dispatchMark(String ch, Map tick) { final parts = ch.split('.'); if (parts.length < 3) return; final symbol = parts[1].toUpperCase(); final markPrice = double.tryParse('${tick['p']}') ?? 0; final fundingRate = double.tryParse('${tick['r']}') ?? 0; // T 可能是 int / double / String,统一转 int final tRaw = tick['T']; final rawT = tRaw is int ? tRaw : tRaw is double ? tRaw.toInt() : int.tryParse('${tRaw ?? ''}') ?? (double.tryParse('${tRaw ?? ''}') ?? 0.0).toInt(); // 统一转毫秒:10 位及以下视为秒制(与 K 线时间戳处理一致) final nextFundingTimeMs = rawT > 0 && rawT < 10000000000 ? rawT * 1000 : rawT; if (!_markCtrl.isClosed) { _markCtrl.add({ 'symbol': symbol, 'markPrice': markPrice, 'fundingRate': fundingRate, 'nextFundingTime': nextFundingTimeMs, }); } } void _onDisconnect() { _stopHeartbeat(); _channelSub?.cancel(); _channel = null; if (_connStateCtrl.isClosed) return; _setState(WsConnectionState.reconnecting); _reconnectCount++; // 连续失败到阈值时,通知外部尝试切节点(只在等于阈值的那次触发, // reconnectWithUrl 重置 _reconnectCount 后可再次触发,避免重复风暴) if (_reconnectCount == _failoverNotifyCount) { try { onPersistentFailure?.call(); } catch (_) {} } // 指数退避重连:2s, 4s, 8s, 16s... 最大 30s;超过上限后进入低频模式 int delay; if (_reconnectCount > _maxReconnectCount) { delay = _lowFreqReconnectSec; print( '[WsClient] max reconnect reached ($_reconnectCount), low-freq mode: ${delay}s'); } else { delay = min(pow(2, _reconnectCount).toInt(), 30); } print('[WsClient] reconnecting in ${delay}s (attempt $_reconnectCount)'); _reconnectTimer?.cancel(); _reconnectTimer = Timer(Duration(seconds: delay), _connectReal); } // ── Subscription(统一接口,mock/real 都走这里)──────── /// 订阅 ticker 主题,格式: market.{symbol_lowercase}.ticket void subscribeTicker(String symbol) { final topic = 'market.${symbol.toLowerCase()}.ticket'; _subscribedTopics.add(topic); _subscribedSymbols.add(symbol); if (_mockMode) { // Mock 模式种子价格 if (!_mockPrices.containsKey(symbol)) { _mockPrices[symbol] = _seedPrice(symbol); _mockChanges[symbol] = (_random.nextDouble() - 0.3) * 8; } } else if (_state == WsConnectionState.connected) { _sendSub([topic]); } } /// 批量订阅多个 symbol 的 ticker void subscribeTickerBatch(List symbols) { final topics = []; for (final symbol in symbols) { final topic = 'market.${symbol.toLowerCase()}.ticket'; _subscribedTopics.add(topic); _subscribedSymbols.add(symbol); topics.add(topic); if (_mockMode && !_mockPrices.containsKey(symbol)) { _mockPrices[symbol] = _seedPrice(symbol); _mockChanges[symbol] = (_random.nextDouble() - 0.3) * 8; } } if (!_mockMode && _state == WsConnectionState.connected) { _sendSub(topics); } } /// 差量订阅:退订不在新列表中的 ticker,订阅新增的 ticker void resubscribeTickerBatch(List newSymbols) { final newTopics = newSymbols.map((s) => 'market.${s.toLowerCase()}.ticket').toSet(); final oldTopics = _subscribedTopics.where((t) => t.endsWith('.ticket')).toSet(); // 退订旧的 final toUnsub = oldTopics.difference(newTopics); if (toUnsub.isNotEmpty) { for (final t in toUnsub) { _subscribedTopics.remove(t); final parts = t.split('.'); if (parts.length >= 2) { final sym = parts[1].toUpperCase(); _subscribedSymbols.remove(sym); _mockPrices.remove(sym); _mockChanges.remove(sym); } } if (!_mockMode && _state == WsConnectionState.connected) { _sendUnsub(toUnsub.toList()); } } // 订阅新的 final toSub = newTopics.difference(oldTopics); if (toSub.isNotEmpty) { _subscribedTopics.addAll(toSub); for (final s in newSymbols) { _subscribedSymbols.add(s); if (_mockMode && !_mockPrices.containsKey(s)) { _mockPrices[s] = _seedPrice(s); _mockChanges[s] = (_random.nextDouble() - 0.3) * 8; } } if (!_mockMode && _state == WsConnectionState.connected) { _sendSub(toSub.toList()); } } } /// 取消订阅 void unsubscribeTicker(String symbol) { final topic = 'market.${symbol.toLowerCase()}.ticket'; _subscribedTopics.remove(topic); _subscribedSymbols.remove(symbol); if (!_mockMode && _state == WsConnectionState.connected) { _sendUnsub([topic]); } } // ── 标记价格/资金费率订阅 ────────────────────────────── /// 订阅标记价格+资金费率: market.{symbol}.mark void subscribeMark(String symbol) { final topic = 'market.${symbol.toLowerCase()}.mark'; _subscribedTopics.add(topic); if (!_mockMode && _state == WsConnectionState.connected) { _sendSub([topic]); } } void unsubscribeMark(String symbol) { final topic = 'market.${symbol.toLowerCase()}.mark'; _subscribedTopics.remove(topic); if (!_mockMode && _state == WsConnectionState.connected) { _sendUnsub([topic]); } } // ── 盘口订阅 ────────────────────────────────────────── /// 订阅盘口: market.{symbol}.depth void subscribeDepth(String symbol) { final topic = 'market.${symbol.toLowerCase()}.depth'; _subscribedTopics.add(topic); _subscribedSymbols.add(symbol); if (!_mockMode && _state == WsConnectionState.connected) { _sendSub([topic]); } } void unsubscribeDepth(String symbol) { final topic = 'market.${symbol.toLowerCase()}.depth'; _subscribedTopics.remove(topic); if (!_mockMode && _state == WsConnectionState.connected) { _sendUnsub([topic]); } } // ── 成交订阅 ────────────────────────────────────────── /// 订阅实时成交: market.{symbol}.trade void subscribeTrade(String symbol) { final topic = 'market.${symbol.toLowerCase()}.trade'; _subscribedTopics.add(topic); _subscribedSymbols.add(symbol); if (!_mockMode && _state == WsConnectionState.connected) { _sendSub([topic]); } } void unsubscribeTrade(String symbol) { final topic = 'market.${symbol.toLowerCase()}.trade'; _subscribedTopics.remove(topic); if (!_mockMode && _state == WsConnectionState.connected) { _sendUnsub([topic]); } } /// 兼容旧接口 void subscribe(String symbol) => subscribeTicker(symbol); void unsubscribe(String symbol) => unsubscribeTicker(symbol); // ── K线订阅 ──────────────────────────────────────────── /// 订阅 K线实时推送: market.{symbol}.kline.{interval} void subscribeKline(String symbol, String interval) { final topic = 'market.${symbol.toLowerCase()}.kline.$interval'; _subscribedTopics.add(topic); if (!_mockMode && _state == WsConnectionState.connected) { _sendSub([topic]); } } /// 取消 K线订阅 void unsubscribeKline(String symbol, String interval) { final topic = 'market.${symbol.toLowerCase()}.kline.$interval'; _subscribedTopics.remove(topic); if (!_mockMode && _state == WsConnectionState.connected) { _sendUnsub([topic]); } } /// 请求 K线历史数据(仅生产模式,mock 模式返回空列表) /// 返回的 List 中每项包含 open/close/high/low/volume/beginTime 等字段 Future>> requestKlineHistory({ required String symbol, required String interval, required int from, required int to, }) async { if (_mockMode || _state != WsConnectionState.connected) { return []; } _reqId++; final id = '$_reqId'; final completer = Completer>>(); _klineReqCompleters[id] = completer; final msg = jsonEncode({ 'req': 'market.${symbol.toLowerCase()}.kline.$interval', 'id': id, 'from': from, 'to': to, }); _channel?.sink.add(msg); // 超时保护,避免 completer 永远挂起 return completer.future.timeout( const Duration(seconds: 10), onTimeout: () { _klineReqCompleters.remove(id); return []; }, ); } // ── WS 消息发送 ──────────────────────────────────────── void _sendSub(List topics) { _reqId++; final msg = jsonEncode({'sub': topics, 'id': '$_reqId'}); _channel?.sink.add(msg); } void _sendUnsub(List topics) { _reqId++; final msg = jsonEncode({'unsub': topics, 'id': '$_reqId'}); _channel?.sink.add(msg); } // ══════════════════════════════════════════════════════ // Mock 模式 // ══════════════════════════════════════════════════════ void _connectMock() { _setState(WsConnectionState.connecting); final isFirstConnect = _reconnectCount == 0; Future.delayed(const Duration(milliseconds: 700), () { if (_connStateCtrl.isClosed) return; _setState(WsConnectionState.connected); _reconnectCount = 0; _startMockData(); if (isFirstConnect) { _simulateDisconnectTimer = Timer(const Duration(seconds: 45), _triggerMockDisconnect); } }); } void _triggerMockDisconnect() { if (_state != WsConnectionState.connected) return; _mockDataTimer?.cancel(); _setState(WsConnectionState.reconnecting); _reconnectCount++; _reconnectTimer = Timer( Duration(seconds: min(pow(2, _reconnectCount).toInt(), 30)), _connectMock); } // ── Mock Data Generation ────────────────────────────── void _startMockData() { _mockDataTimer?.cancel(); _mockDataTimer = Timer.periodic(const Duration(milliseconds: 1800), (_) { if (_state != WsConnectionState.connected) return; _emitTickers(); _emitOrderBooks(); _emitTrades(); }); } double _seedPrice(String symbol) { if (symbol.startsWith('BTC')) return 67200 + _random.nextDouble() * 800; if (symbol.startsWith('ETH')) return 3480 + _random.nextDouble() * 80; if (symbol.startsWith('BNB')) return 558 + _random.nextDouble() * 15; if (symbol.startsWith('SOL')) return 142 + _random.nextDouble() * 6; if (symbol.startsWith('XRP')) return 0.61 + _random.nextDouble() * 0.04; if (symbol.startsWith('ADA')) return 0.45 + _random.nextDouble() * 0.02; if (symbol.startsWith('DOGE')) return 0.12 + _random.nextDouble() * 0.01; if (symbol.startsWith('AVAX')) return 38 + _random.nextDouble() * 2; if (symbol.startsWith('LINK')) return 14 + _random.nextDouble() * 0.5; if (symbol.startsWith('DOT')) return 7.5 + _random.nextDouble() * 0.3; return 1.0 + _random.nextDouble() * 0.5; } void _emitTickers() { for (final symbol in _subscribedSymbols) { final current = _mockPrices[symbol] ?? _seedPrice(symbol); final pct = (_random.nextDouble() - 0.5) * 0.002; final newPrice = (current * (1 + pct)); _mockPrices[symbol] = newPrice; final currentChange = _mockChanges[symbol] ?? 0.0; final changeShift = (_random.nextDouble() - 0.5) * 0.05; _mockChanges[symbol] = currentChange + changeShift; if (!_tickerCtrl.isClosed) { _tickerCtrl.add({ 'symbol': symbol, 'price': newPrice, 'change24h': _mockChanges[symbol], 'volume24h': 100000 + _random.nextDouble() * 900000, }); } } } void _emitOrderBooks() { for (final symbol in _subscribedSymbols) { final mid = _mockPrices[symbol] ?? 100.0; final bids = List.generate(10, (i) { final px = mid * (1 - 0.0008 * (i + 1) - _random.nextDouble() * 0.0002); final qty = 0.05 + _random.nextDouble() * 3.0; return {'price': px, 'quantity': qty}; }); final asks = List.generate(10, (i) { final px = mid * (1 + 0.0008 * (i + 1) + _random.nextDouble() * 0.0002); final qty = 0.05 + _random.nextDouble() * 3.0; return {'price': px, 'quantity': qty}; }); if (!_orderBookCtrl.isClosed) { _orderBookCtrl.add({ 'symbol': symbol, 'bids': bids, 'asks': asks, }); } } } void _emitTrades() { for (final symbol in _subscribedSymbols) { if (_random.nextDouble() < 0.6) { final price = _mockPrices[symbol] ?? 100.0; final noise = (_random.nextDouble() - 0.5) * 0.001; if (!_tradeCtrl.isClosed) { _tradeCtrl.add({ 'symbol': symbol, 'price': price * (1 + noise), 'quantity': 0.001 + _random.nextDouble() * 0.5, 'isBuyerMaker': _random.nextBool(), 'tradeId': 'mock_${DateTime.now().millisecondsSinceEpoch}', 'time': DateTime.now().toIso8601String(), }); } } } } // ── Dispose ─────────────────────────────────────────── void dispose() { _stopHeartbeat(); _mockDataTimer?.cancel(); _reconnectTimer?.cancel(); _simulateDisconnectTimer?.cancel(); _channelSub?.cancel(); _channel?.sink.close(); _connStateCtrl.close(); _tickerCtrl.close(); _orderBookCtrl.close(); _tradeCtrl.close(); _klineCtrl.close(); // 取消所有未完成的 K线历史请求 for (final c in _klineReqCompleters.values) { if (!c.isCompleted) c.complete([]); } _klineReqCompleters.clear(); } }