import 'dart:async'; import 'dart:convert'; import 'dart:developer' as developer; import 'dart:math'; import 'package:flutter/foundation.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; /// 现货 WS:`event=sub/unsub/req`;行情 `market_{symbol}_*`;用户 `spot_asset` / `spot_order`(需 uid)。 class SpotWsClient { SpotWsClient({ required String wsSpotUrl, String? uid, this.onPersistentFailure, }) : _wsUrl = wsSpotUrl, _uid = uid { _connect(); } String _wsUrl; final String? _uid; final void Function()? onPersistentFailure; static const int _maxReconnectCount = 50; static const int _lowFreqReconnectSec = 120; static const int _failoverNotifyCount = 2; final _connStateCtrl = StreamController.broadcast(); final _tickerCtrl = StreamController>.broadcast(); final _depthCtrl = StreamController>.broadcast(); final _tradeCtrl = StreamController>.broadcast(); final _klineCtrl = StreamController>.broadcast(); final _assetCtrl = StreamController>.broadcast(); final _orderCtrl = StreamController>.broadcast(); final _klineReqCompleters = >>>{}; SpotWsState _state = SpotWsState.disconnected; final _subscribedChannels = {}; int _spotAssetChannelHolds = 0; // spot_asset 页面 retain 计数 int _spotOrderChannelHolds = 0; // spot_order int _reconnectCount = 0; int _spotWsRawLogCount = 0; int _spotWsDispatchTickerLogCount = 0; int _spotWsRawDepthLogCount = 0; int _spotWsParsedDepthLogCount = 0; WebSocketChannel? _channel; StreamSubscription? _channelSub; Timer? _reconnectTimer; Stream get connectionStream => _connStateCtrl.stream; Stream> get tickerStream => _tickerCtrl.stream; Stream> get depthStream => _depthCtrl.stream; Stream> get tradeStream => _tradeCtrl.stream; Stream> get klineStream => _klineCtrl.stream; Stream> get assetStream => _assetCtrl.stream; Stream> get orderStream => _orderCtrl.stream; SpotWsState get currentState => _state; void _connect() { _setState(SpotWsState.connecting); _connectReal(); } void reconnectWithUrl(String url, {String? uid}) { _wsUrl = url; _channelSub?.cancel(); _channel?.sink.close(); _reconnectTimer?.cancel(); _reconnectCount = 0; _connectReal(); } void _connectReal() async { _setState(SpotWsState.connecting); try { final uid = _uid; final query = [ 'clientType=flutter', if (uid != null && uid.isNotEmpty) 'uid=$uid', ].join('&'); final uri = Uri.parse('$_wsUrl?$query'); _channel = WebSocketChannel.connect(uri); await _channel!.ready; if (_connStateCtrl.isClosed) return; _setState(SpotWsState.connected); _reconnectCount = 0; if (!kReleaseMode) { debugPrint('[SpotWs] connected uri=$uri'); developer.log('connected SpotWs uri=$uri', name: 'SpotWs'); } _channelSub = _channel!.stream.listen( _onMessage, onError: (_) => _onDisconnect(), onDone: () => _onDisconnect(), ); if (_subscribedChannels.isNotEmpty) { _sendSub(_subscribedChannels.toList()); } } catch (_) { _onDisconnect(); } } void _onDisconnect() { _channelSub?.cancel(); _channel = null; if (_connStateCtrl.isClosed) return; if (!kReleaseMode) { developer.log('disconnect reconnectCount=$_reconnectCount', name: 'SpotWs'); } _setState(SpotWsState.reconnecting); _reconnectCount++; if (_reconnectCount == _failoverNotifyCount) { try { onPersistentFailure?.call(); } catch (_) {} } final delay = _reconnectCount > _maxReconnectCount ? _lowFreqReconnectSec : min(pow(2, _reconnectCount).toInt(), 30); _reconnectTimer?.cancel(); _reconnectTimer = Timer(Duration(seconds: delay), _connectReal); } void _onMessage(dynamic raw) { if (raw is! String) return; final Map msg; try { msg = jsonDecode(raw) as Map; } catch (_) { return; } if (msg.containsKey('ping')) { try { _channel?.sink.add(jsonEncode({'pong': msg['ping']})); } catch (_) {} return; } // 用户资产/订单:body 无 channel,仅有 type=asset|order final pushType = msg['type']?.toString(); if (pushType == 'asset') { if (!_assetCtrl.isClosed) { _assetCtrl.add(Map.from(msg)); } return; } if (pushType == 'order') { if (!_orderCtrl.isClosed) { _orderCtrl.add(Map.from(msg)); } return; } final channel = msg['channel'] as String?; if (channel == null) return; final tick = msg['tick']; if (!kReleaseMode && channel.endsWith('_ticker') && _spotWsRawLogCount < 6) { _spotWsRawLogCount++; final s = raw.length > 420 ? '${raw.substring(0, 420)}…' : raw; developer.log('raw ticker msg: $s', name: 'SpotWs'); } // 必须先判断 trade_ticker:channel 为 market_{sym}_trade_ticker,同样以 `_ticker` 结尾, // 若先匹配 `_ticker` 会误走行情 ticker,导致最新成交永远进不了 _dispatchTrade。 if (channel.endsWith('_trade_ticker')) { _dispatchTrade(channel, tick); } else if (channel.endsWith('_ticker')) { _dispatchTicker(channel, tick); } else if (channel.endsWith('_depth')) { if (!kReleaseMode && _spotWsRawDepthLogCount < 3) { _spotWsRawDepthLogCount++; final s = raw.length > 800 ? '${raw.substring(0, 800)}…' : raw; developer.log('[SPOT_DEPTH_DEBUG] raw[$_spotWsRawDepthLogCount]: $s', name: 'SpotWs'); } _dispatchDepth(channel, tick); } else if (channel.contains('_kline_')) { _dispatchKline(channel, tick); } // kline req 响应 if (msg.containsKey('event_rep') && msg['event_rep'] == 'req') { _handleKlineReqResponse(msg); } } static String? _symbolFromChannel(String channel) { if (channel.startsWith('market_')) { final parts = channel.split('_'); if (parts.length >= 3) return parts[1].toUpperCase(); } return null; } /// 涨跌幅:解析 rose / 或用 open、close 推算(不盲目 ×100)。 static double? _parseSpotRose(dynamic rose, double open, double close) { if (rose != null && '$rose'.trim().isNotEmpty) { final raw = rose.toString().trim().replaceAll('%', '').replaceAll(',', ''); final n = double.tryParse(raw); if (n != null && !n.isNaN) return n; } if (open > 0 && close > 0) return (close - open) / open * 100; return null; } /// ticker channel:标准字段 + 兼容 o/h/l/c、v/q、b/a、P。 void _dispatchTicker(String channel, dynamic tick) { if (tick is! Map) { if (!kReleaseMode) { developer.log( '_dispatchTicker: tick not map channel=$channel tickType=${tick.runtimeType}', name: 'SpotWs', ); } return; } final t = Map.from(tick); final symbol = _symbolFromChannel(channel); if (symbol == null) return; final rawClose = t['close'] ?? t['last'] ?? t['price'] ?? t['c'] ?? t['latest'] ?? t['es_price']; final close = _toDouble(rawClose); final open = _toDouble(t['open'] ?? t['o']); if (!kReleaseMode && close <= 0 && _spotWsDispatchTickerLogCount < 4) { _spotWsDispatchTickerLogCount++; developer.log( 'ticker close<=0 ch=$channel tickKeys=${t.keys.toList()}', name: 'SpotWs', ); } final change24h = _parseSpotRose( t['rose'] ?? t['P'] ?? t['priceChangePercent'], open, close, ); if (!_tickerCtrl.isClosed) { _tickerCtrl.add({ 'symbol': symbol, 'price': close, 'priceStr': rawClose?.toString() ?? '', // WS 原始价格字符串,保留精度 'open': open, 'high': _toDouble(t['high'] ?? t['h']), 'low': _toDouble(t['low'] ?? t['l']), 'volume': _toDouble( t['vol'] ?? t['volume'] ?? t['v'] ?? t['baseVolume'], ), 'turnover': _toDouble( t['amount'] ?? t['quoteVolume'] ?? t['q'] ?? t['turnover'], ), 'change24h': change24h, 'buyOne': _toDouble(t['buyOne'] ?? t['b']), 'sellOne': _toDouble(t['sellOne'] ?? t['a']), }); } } void _dispatchDepth(String channel, dynamic tick) { if (tick is! Map) return; var t = Map.from(tick); final symbol = _symbolFromChannel(channel); if (symbol == null) return; if (t['bids'] == null && t['buys'] == null && t['asks'] == null && t['sells'] == null && t['data'] is Map) { t = Map.from(t['data'] as Map); } List> parseLevels(dynamic raw) { if (raw is! List) return []; final out = >[]; for (final e in raw) { if (e is List && e.isNotEmpty) { final price = _toDouble(e[0]); final qty = _toDouble(e.length > 1 ? e[1] : 0); if (price > 0 && qty > 0) { out.add({'price': price, 'quantity': qty}); } } else if (e is Map) { final m = Map.from(e); final price = _toDouble( m['price'] ?? m['limitPrice'] ?? m['entrustPrice'] ?? m['tradePrice'] ?? m['p'] ?? m['P'], ); final qty = _toDouble( m['quantity'] ?? m['amount'] ?? m['qty'] ?? m['q'] ?? m['a'], ); if (price > 0 && qty > 0) { out.add({'price': price, 'quantity': qty}); } } } return out; } final payload = {'symbol': symbol}; payload['asks'] = parseLevels(t['asks'] ?? t['sells']); payload['bids'] = parseLevels(t['bids'] ?? t['buys']); if ((payload['asks'] as List).isEmpty && (payload['bids'] as List).isEmpty) return; if (!kReleaseMode && _spotWsParsedDepthLogCount < 3) { _spotWsParsedDepthLogCount++; final asks = payload['asks'] as List; final bids = payload['bids'] as List; final ask0 = asks.isNotEmpty ? asks.first : null; final bid0 = bids.isNotEmpty ? bids.first : null; developer.log( '[SPOT_DEPTH_DEBUG] parsed[$_spotWsParsedDepthLogCount] symbol=$symbol ' 'asks=${asks.length} bids=${bids.length} ' 'ask0=$ask0 bid0=$bid0', name: 'SpotWs', ); } if (!_depthCtrl.isClosed) { _depthCtrl.add(payload); } } void _dispatchTrade(String channel, dynamic tick) { if (tick is! Map) return; final m = Map.from(tick); final symbol = _symbolFromChannel(channel); if (symbol == null) return; void emitOne(Map one) { final ts = one['time'] ?? one['ts']; int timeMs = 0; if (ts is int) { timeMs = ts; } else if (ts is num) { timeMs = ts.toInt(); } if (timeMs > 0 && timeMs < 10000000000) timeMs *= 1000; if (!_tradeCtrl.isClosed) { _tradeCtrl.add({ 'symbol': symbol, 'id': one['id'], 'price': _toDouble(one['price']), 'quantity': _toDouble(one['vol'] ?? one['quantity']), 'amount': _toDouble(one['amount']), 'side': '${one['side'] ?? ''}', 'time': timeMs, }); } } final data = m['data']; if (data is List) { for (final e in data) { if (e is Map) emitOne(Map.from(e)); } return; } emitOne(m); } void _dispatchKline(String channel, dynamic tick) { if (tick is! Map) return; final t = Map.from(tick); final symbol = _symbolFromChannel(channel); if (symbol == null) return; if (!channel.contains('_kline_')) return; final parts = channel.split('_'); if (parts.length < 4) return; final interval = parts.sublist(3).join('_'); final rawTime = t['time']; int timeMs = 0; if (rawTime is int) { timeMs = rawTime; } else if (rawTime is num) { timeMs = rawTime.toInt(); } if (timeMs > 0 && timeMs < 10000000000) timeMs *= 1000; final rawId = t['id']; int id = 0; if (rawId is int) { id = rawId; } else if (rawId is num) { id = rawId.toInt(); } if (!_klineCtrl.isClosed) { _klineCtrl.add({ 'symbol': symbol, 'interval': interval, 'time': timeMs, 'open': _toDouble(t['open']), 'close': _toDouble(t['close']), 'high': _toDouble(t['high']), 'low': _toDouble(t['low']), 'volume': _toDouble(t['vol']), 'amount': _toDouble(t['amount']), 'ratio': _toDouble(t['ratio']), 'id': id, }); } } void _handleKlineReqResponse(Map msg) { final channel = msg['channel'] as String?; if (channel == null) return; final completer = _klineReqCompleters.remove(channel); if (completer == null || completer.isCompleted) return; final data = msg['data']; if (data is List) { completer.complete(data.whereType>().toList()); } else { completer.complete([]); } } void subscribeTicker(String symbol) { _sub('market_${symbol.toLowerCase()}_ticker'); } void unsubscribeTicker(String symbol) { _unsub('market_${symbol.toLowerCase()}_ticker'); } void subscribeTickerBatch(List symbols) { for (final s in symbols.map((e) => e.toLowerCase())) { _sub('market_${s}_ticker'); } } void resubscribeTickerBatch(List newSymbols) { final newChannels = {}; for (final s in newSymbols.map((e) => e.toLowerCase())) { newChannels.add('market_${s}_ticker'); } final oldChannels = _subscribedChannels.where((c) => c.endsWith('_ticker')).toSet(); final toUnsub = oldChannels.difference(newChannels); if (toUnsub.isNotEmpty) { for (final c in toUnsub) { _subscribedChannels.remove(c); } if (_state == SpotWsState.connected) _sendUnsub(toUnsub.toList()); } final toSub = newChannels.difference(oldChannels); for (final c in toSub) { _sub(c); } } void subscribeDepth(String symbol) { _sub('market_${symbol.toLowerCase()}_depth'); } void unsubscribeDepth(String symbol) { _unsub('market_${symbol.toLowerCase()}_depth'); } void subscribeTrade(String symbol) { _sub('market_${symbol.toLowerCase()}_trade_ticker'); } void unsubscribeTrade(String symbol) { _unsub('market_${symbol.toLowerCase()}_trade_ticker'); } void subscribeKline(String symbol, String interval) { _sub('market_${symbol.toLowerCase()}_kline_$interval'); } void unsubscribeKline(String symbol, String interval) { _unsub('market_${symbol.toLowerCase()}_kline_$interval'); } void retainSpotAssetChannel() { _spotAssetChannelHolds++; if (_spotAssetChannelHolds == 1) { _sub('spot_asset'); } } void releaseSpotAssetChannel() { if (_spotAssetChannelHolds <= 0) return; _spotAssetChannelHolds--; if (_spotAssetChannelHolds == 0) { _unsub('spot_asset'); } } void retainSpotOrderChannel() { _spotOrderChannelHolds++; if (_spotOrderChannelHolds == 1) { _sub('spot_order'); } } void releaseSpotOrderChannel() { if (_spotOrderChannelHolds <= 0) return; _spotOrderChannelHolds--; if (_spotOrderChannelHolds == 0) { _unsub('spot_order'); } } /// K 线历史 `event=req`;[endIdx] 秒,0=最新。 Future>> requestKlineHistory({ required String symbol, required String interval, int pageSize = 150, int endIdx = 0, }) { if (_state != SpotWsState.connected) return Future.value([]); final channel = 'market_${symbol.toLowerCase()}_kline_$interval'; final completer = Completer>>(); _klineReqCompleters[channel] = completer; final msg = jsonEncode({ 'event': 'req', 'params': { 'channel': channel, 'cb_id': symbol.toLowerCase(), 'pageSize': pageSize, 'endIdx': endIdx, }, }); try { _channel?.sink.add(msg); } catch (_) { _klineReqCompleters.remove(channel); return Future.value([]); } return completer.future.timeout( const Duration(seconds: 10), onTimeout: () { _klineReqCompleters.remove(channel); return []; }, ); } void _sub(String channel) { _subscribedChannels.add(channel); if (_state == SpotWsState.connected) _sendSub([channel]); } void _unsub(String channel) { _subscribedChannels.remove(channel); if (_state == SpotWsState.connected) _sendUnsub([channel]); } void _sendSub(List channels) { if (!kReleaseMode && channels.isNotEmpty) { debugPrint('[SpotWs] sub count=${channels.length} $channels'); developer.log( 'send sub count=${channels.length} first=${channels.take(4)}', name: 'SpotWs', ); } for (final ch in channels) { try { _channel?.sink.add(jsonEncode({ 'event': 'sub', 'params': {'channel': ch}, })); } catch (_) {} } } void _sendUnsub(List channels) { for (final ch in channels) { try { _channel?.sink.add(jsonEncode({ 'event': 'unsub', 'params': {'channel': ch}, })); } catch (_) {} } } void _setState(SpotWsState s) { _state = s; if (!_connStateCtrl.isClosed) _connStateCtrl.add(s); } static double _toDouble(dynamic v) { if (v == null) return 0.0; if (v is num) return v.toDouble(); return double.tryParse(v.toString()) ?? 0.0; } void dispose() { _spotAssetChannelHolds = 0; _spotOrderChannelHolds = 0; _channelSub?.cancel(); _reconnectTimer?.cancel(); _channel?.sink.close(); _connStateCtrl.close(); _tickerCtrl.close(); _depthCtrl.close(); _tradeCtrl.close(); _klineCtrl.close(); _assetCtrl.close(); _orderCtrl.close(); for (final c in _klineReqCompleters.values) { if (!c.isCompleted) c.complete([]); } _klineReqCompleters.clear(); } } enum SpotWsState { connecting, connected, disconnected, reconnecting }