| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780 |
- import 'dart:async';
- import 'dart:convert';
- import 'dart:math';
- import 'package:decimal/decimal.dart';
- import 'package:web_socket_channel/web_socket_channel.dart';
- import '../config/app_config.dart';
- /// WebSocket 连接状态
- enum WsConnectionState { connecting, connected, disconnected, reconnecting }
- /// WebSocket 消息类型
- enum WsChannel { ticker, orderBook, trade }
- /// WebSocket 客户端
- /// Mock 模式:使用 Timer.periodic 模拟实时数据流
- /// 生产模式:对接真实 WebSocket 服务器
- class WsClient {
- WsClient({
- required bool mockMode,
- String? wsUrl,
- this.onPersistentFailure,
- }) : _mockMode = mockMode,
- _wsUrl = wsUrl ?? AppConfig.effectiveWsUrl {
- _connect();
- }
- final bool _mockMode;
- String _wsUrl;
- final _random = Random();
- /// 持续重连失败回调:连续 [_failoverNotifyCount] 次重连失败后触发一次,
- /// 由外部(ws_provider)桥接到 nodeProvider.reportFailure(immediate: true),
- /// 用于联动节点切换。reconnectWithUrl 后失败计数清零,可再次触发。
- final void Function()? onPersistentFailure;
- // ── 常量 ────────────────────────────────────────────────
- static const int _heartbeatIntervalSec = 15;
- static const int _heartbeatTimeoutSec = 30;
- static const int _maxReconnectCount = 50;
- static const int _lowFreqReconnectSec = 120;
- /// 连续失败到第几次时通知外部尝试切节点
- static const int _failoverNotifyCount = 2;
- // ── Stream Controllers ────────────────────────────────
- final _connStateCtrl = StreamController<WsConnectionState>.broadcast();
- final _tickerCtrl = StreamController<Map<String, dynamic>>.broadcast();
- final _orderBookCtrl = StreamController<Map<String, dynamic>>.broadcast();
- final _tradeCtrl = StreamController<Map<String, dynamic>>.broadcast();
- final _klineCtrl = StreamController<Map<String, dynamic>>.broadcast();
- final _markCtrl = StreamController<Map<String, dynamic>>.broadcast();
- // K线历史查询回调:reqId → Completer
- final _klineReqCompleters = <String, Completer<List<Map<String, dynamic>>>>{};
- // ── State ─────────────────────────────────────────────
- WsConnectionState _state = WsConnectionState.disconnected;
- final _subscribedTopics = <String>{}; // 已订阅的完整 topic
- final _subscribedSymbols = <String>{}; // 已订阅的 symbol(mock 用)
- final _mockPrices = <String, double>{};
- final _mockChanges = <String, double>{};
- int _reconnectCount = 0;
- Timer? _mockDataTimer;
- Timer? _reconnectTimer;
- Timer? _simulateDisconnectTimer;
- Timer? _heartbeatTimer;
- DateTime? _lastMessageTime;
- // 生产 WS
- WebSocketChannel? _channel;
- StreamSubscription? _channelSub;
- int _reqId = 0;
- // ── Public Streams ────────────────────────────────────
- Stream<WsConnectionState> get connectionStream => _connStateCtrl.stream;
- Stream<Map<String, dynamic>> get tickerStream => _tickerCtrl.stream;
- Stream<Map<String, dynamic>> get orderBookStream => _orderBookCtrl.stream;
- Stream<Map<String, dynamic>> get tradeStream => _tradeCtrl.stream;
- Stream<Map<String, dynamic>> get klineStream => _klineCtrl.stream;
- Stream<Map<String, dynamic>> get markStream => _markCtrl.stream;
- WsConnectionState get currentState => _state;
- // ── Connection Management ─────────────────────────────
- void _setState(WsConnectionState s) {
- _state = s;
- if (!_connStateCtrl.isClosed) _connStateCtrl.add(s);
- }
- void _connect() {
- if (_mockMode) {
- _connectMock();
- } else {
- _connectReal();
- }
- }
- // ══════════════════════════════════════════════════════
- // 生产模式:真实 WebSocket 连接
- // ══════════════════════════════════════════════════════
- /// 当前 WebSocket 地址
- String get currentWsUrl => _wsUrl;
- /// 使用新的 wsUrl 重新连接(节点切换时调用)
- void reconnectWithUrl(String url) {
- _wsUrl = url;
- _stopHeartbeat();
- _channelSub?.cancel();
- _channel?.sink.close();
- _reconnectTimer?.cancel();
- _reconnectCount = 0;
- _connectReal();
- }
- void _connectReal() async {
- _setState(WsConnectionState.connecting);
- try {
- final uri = Uri.parse(_wsUrl);
- _channel = WebSocketChannel.connect(uri);
- // 等待 WebSocket 握手完成,避免握手未完成就发送订阅消息
- await _channel!.ready;
- // 握手完成后再标记为 connected
- if (_connStateCtrl.isClosed) return;
- print('[WsClient] connected to $_wsUrl');
- _setState(WsConnectionState.connected);
- _reconnectCount = 0;
- _lastMessageTime = DateTime.now();
- // 先注册监听,再发送订阅
- _channelSub = _channel!.stream.listen(
- (raw) {
- _lastMessageTime = DateTime.now();
- _onMessage(raw);
- },
- onError: (error) {
- _onDisconnect();
- },
- onDone: () {
- _onDisconnect();
- },
- );
- // 重连后重新订阅之前的 topic
- if (_subscribedTopics.isNotEmpty) {
- _sendSub(_subscribedTopics.toList());
- }
- // 启动心跳检测
- _startHeartbeat();
- } catch (e) {
- _onDisconnect();
- }
- }
- // ── 心跳机制 ────────────────────────────────────────────
- void _startHeartbeat() {
- _heartbeatTimer?.cancel();
- _heartbeatTimer = Timer.periodic(
- const Duration(seconds: _heartbeatIntervalSec),
- (_) {
- if (_state != WsConnectionState.connected) return;
- // 超时检测:超过 _heartbeatTimeoutSec 秒未收到任何消息
- final now = DateTime.now();
- if (_lastMessageTime != null &&
- now.difference(_lastMessageTime!).inSeconds >
- _heartbeatTimeoutSec) {
- print('[WsClient] heartbeat timeout, triggering reconnect');
- _onDisconnect();
- return;
- }
- // 发送 ping
- try {
- _channel?.sink.add(
- jsonEncode({'ping': now.millisecondsSinceEpoch}),
- );
- } catch (_) {
- _onDisconnect();
- }
- },
- );
- }
- void _stopHeartbeat() {
- _heartbeatTimer?.cancel();
- _heartbeatTimer = null;
- }
- /// 处理服务器推送的消息,按 ch/rep 字段分发到对应 StreamController
- void _onMessage(dynamic raw) {
- if (raw is! String) return;
- final Map<String, dynamic> msg;
- try {
- msg = jsonDecode(raw) as Map<String, dynamic>;
- } catch (_) {
- return;
- }
- // pong 响应,忽略(心跳已通过 _lastMessageTime 更新)
- if (msg.containsKey('pong')) return;
- // K线历史查询响应(rep 字段)
- if (msg.containsKey('rep')) {
- _handleKlineHistoryResponse(msg);
- return;
- }
- // ACK 响应(sub/unsub 确认),忽略
- if (msg.containsKey('status')) return;
- final ch = msg['ch'] as String?;
- if (ch == null) return;
- final tick = msg['tick'] as Map<String, dynamic>?;
- if (tick == null) return;
- // 根据 ch 中的主题类型分发
- if (ch.endsWith('.ticket')) {
- _dispatchTicker(ch, tick);
- } else if (ch.endsWith('.mark')) {
- _dispatchMark(ch, tick);
- } else if (ch.contains('.kline.')) {
- _dispatchKline(ch, tick);
- } else if (ch.endsWith('.depth')) {
- _dispatchDepth(ch, tick);
- } else if (ch.endsWith('.trade')) {
- _dispatchTrade(ch, tick);
- }
- }
- /// 解析 K线实时推送: tick.k 包含 t/o/c/h/l/v/q
- void _dispatchKline(String ch, Map<String, dynamic> tick) {
- final k = tick['k'] as Map<String, dynamic>?;
- if (k == null) return;
- final parts = ch.split('.');
- if (parts.length < 4) return;
- final symbol = parts[1].toUpperCase();
- final interval = parts[3];
- if (!_klineCtrl.isClosed) {
- _klineCtrl.add({
- 'symbol': symbol,
- 'interval': interval,
- 'time': k['t'] as int? ?? 0,
- 'open': double.tryParse('${k['o']}') ?? 0,
- 'close': double.tryParse('${k['c']}') ?? 0,
- 'high': double.tryParse('${k['h']}') ?? 0,
- 'low': double.tryParse('${k['l']}') ?? 0,
- 'volume': double.tryParse('${k['v']}') ?? 0,
- });
- }
- }
- /// 处理 K线历史查询响应,完成对应的 Completer
- void _handleKlineHistoryResponse(Map<String, dynamic> msg) {
- final id = msg['id'] as String?;
- final completer = id != null ? _klineReqCompleters.remove(id) : null;
- if (completer == null) return;
- if (msg['status'] == 'error') {
- completer.complete([]);
- return;
- }
- final data = msg['data'] as List<dynamic>? ?? [];
- completer.complete(data.cast<Map<String, dynamic>>());
- }
- /// 解析盘口推送: tick.bids/asks 各 20 档,每档 [price, qty]
- void _dispatchDepth(String ch, Map<String, dynamic> tick) {
- final parts = ch.split('.');
- if (parts.length < 3) return;
- final symbol = parts[1].toUpperCase();
- final rawBids = tick['bids'] as List<dynamic>? ?? [];
- final rawAsks = tick['asks'] as List<dynamic>? ?? [];
- List<Map<String, double>> parseEntries(List<dynamic> raw) {
- return raw
- .whereType<List<dynamic>>()
- .map((e) => {
- 'price': double.tryParse('${e[0]}') ?? 0.0,
- 'quantity': double.tryParse('${e[1]}') ?? 0.0,
- })
- .toList();
- }
- if (!_orderBookCtrl.isClosed) {
- _orderBookCtrl.add({
- 'symbol': symbol,
- 'bids': parseEntries(rawBids),
- 'asks': parseEntries(rawAsks),
- });
- }
- }
- /// 解析成交推送: tick.p=价格, tick.q=数量, tick.m=是否主动卖出
- void _dispatchTrade(String ch, Map<String, dynamic> tick) {
- final parts = ch.split('.');
- if (parts.length < 3) return;
- final symbol = parts[1].toUpperCase();
- if (!_tradeCtrl.isClosed) {
- _tradeCtrl.add({
- 'symbol': symbol,
- 'price': double.tryParse('${tick['p']}') ?? 0,
- 'quantity': double.tryParse('${tick['q']}') ?? 0,
- 'isBuyerMaker': tick['m'] == true,
- 'time': tick['T'] as int? ?? 0,
- 'tradeId': tick['a']?.toString() ?? '',
- });
- }
- }
- /// 解析 ticker 推送,使用 Decimal 精确解析金融数值。
- /// API 返回: tick.c=最新价(string), tick.o=24h开盘价, tick.q=成交额
- /// 涨跌幅用 Decimal 计算后再转 double 传给 UI。
- void _dispatchTicker(String ch, Map<String, dynamic> tick) {
- // 从 ch 提取 symbol: "market.btcusdt.ticket" → "BTCUSDT"
- final parts = ch.split('.');
- if (parts.length < 3) return;
- final symbol = parts[1].toUpperCase();
- // 用 Decimal.parse 保留原始精度,避免浮点误差
- final close = Decimal.tryParse('${tick['c']}') ?? Decimal.zero;
- final open = Decimal.tryParse('${tick['o']}') ?? Decimal.zero;
- final high = Decimal.tryParse('${tick['h']}') ?? Decimal.zero;
- final low = Decimal.tryParse('${tick['l']}') ?? Decimal.zero;
- final volume = Decimal.tryParse('${tick['v']}') ?? Decimal.zero;
- final turnover = Decimal.tryParse('${tick['q']}') ?? Decimal.zero;
- // 24h 涨跌幅 = (最新价 - 开盘价) / 开盘价 * 100
- final change24h = open > Decimal.zero
- ? ((close - open) / open)
- .toDecimal(scaleOnInfinitePrecision: 6)
- .toDouble() *
- 100
- : 0.0;
- if (!_tickerCtrl.isClosed) {
- _tickerCtrl.add({
- 'symbol': symbol,
- 'price': close.toDouble(), // UI 展示用 double 足够
- 'priceStr': '${tick['c']}', // WS 原始价格字符串,保留精度
- 'high24h': high.toDouble(), // 24h 最高价
- 'low24h': low.toDouble(), // 24h 最低价
- 'volume24h': turnover.toDouble(), // tick.q 成交额(USDT)
- 'volumeCoin': volume.toDouble(), // tick.v 成交量(张/币)
- 'change24h': change24h,
- });
- }
- }
- /// 解析标记价格/资金费率推送: market.{symbol}.mark
- /// tick.p=标记价格, tick.r=资金费率, tick.T=下次结算时间(秒或毫秒)
- void _dispatchMark(String ch, Map<String, dynamic> tick) {
- final parts = ch.split('.');
- if (parts.length < 3) return;
- final symbol = parts[1].toUpperCase();
- final markPrice = double.tryParse('${tick['p']}') ?? 0;
- final fundingRate = double.tryParse('${tick['r']}') ?? 0;
- // T 可能是 int / double / String,统一转 int
- final tRaw = tick['T'];
- final rawT = tRaw is int
- ? tRaw
- : tRaw is double
- ? tRaw.toInt()
- : int.tryParse('${tRaw ?? ''}') ??
- (double.tryParse('${tRaw ?? ''}') ?? 0.0).toInt();
- // 统一转毫秒:10 位及以下视为秒制(与 K 线时间戳处理一致)
- final nextFundingTimeMs =
- rawT > 0 && rawT < 10000000000 ? rawT * 1000 : rawT;
- if (!_markCtrl.isClosed) {
- _markCtrl.add({
- 'symbol': symbol,
- 'markPrice': markPrice,
- 'fundingRate': fundingRate,
- 'nextFundingTime': nextFundingTimeMs,
- });
- }
- }
- void _onDisconnect() {
- _stopHeartbeat();
- _channelSub?.cancel();
- _channel = null;
- if (_connStateCtrl.isClosed) return;
- _setState(WsConnectionState.reconnecting);
- _reconnectCount++;
- // 连续失败到阈值时,通知外部尝试切节点(只在等于阈值的那次触发,
- // reconnectWithUrl 重置 _reconnectCount 后可再次触发,避免重复风暴)
- if (_reconnectCount == _failoverNotifyCount) {
- try {
- onPersistentFailure?.call();
- } catch (_) {}
- }
- // 指数退避重连:2s, 4s, 8s, 16s... 最大 30s;超过上限后进入低频模式
- int delay;
- if (_reconnectCount > _maxReconnectCount) {
- delay = _lowFreqReconnectSec;
- print(
- '[WsClient] max reconnect reached ($_reconnectCount), low-freq mode: ${delay}s');
- } else {
- delay = min(pow(2, _reconnectCount).toInt(), 30);
- }
- print('[WsClient] reconnecting in ${delay}s (attempt $_reconnectCount)');
- _reconnectTimer?.cancel();
- _reconnectTimer = Timer(Duration(seconds: delay), _connectReal);
- }
- // ── Subscription(统一接口,mock/real 都走这里)────────
- /// 订阅 ticker 主题,格式: market.{symbol_lowercase}.ticket
- void subscribeTicker(String symbol) {
- final topic = 'market.${symbol.toLowerCase()}.ticket';
- _subscribedTopics.add(topic);
- _subscribedSymbols.add(symbol);
- if (_mockMode) {
- // Mock 模式种子价格
- if (!_mockPrices.containsKey(symbol)) {
- _mockPrices[symbol] = _seedPrice(symbol);
- _mockChanges[symbol] = (_random.nextDouble() - 0.3) * 8;
- }
- } else if (_state == WsConnectionState.connected) {
- _sendSub([topic]);
- }
- }
- /// 批量订阅多个 symbol 的 ticker
- void subscribeTickerBatch(List<String> symbols) {
- final topics = <String>[];
- for (final symbol in symbols) {
- final topic = 'market.${symbol.toLowerCase()}.ticket';
- _subscribedTopics.add(topic);
- _subscribedSymbols.add(symbol);
- topics.add(topic);
- if (_mockMode && !_mockPrices.containsKey(symbol)) {
- _mockPrices[symbol] = _seedPrice(symbol);
- _mockChanges[symbol] = (_random.nextDouble() - 0.3) * 8;
- }
- }
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendSub(topics);
- }
- }
- /// 差量订阅:退订不在新列表中的 ticker,订阅新增的 ticker
- void resubscribeTickerBatch(List<String> newSymbols) {
- final newTopics =
- newSymbols.map((s) => 'market.${s.toLowerCase()}.ticket').toSet();
- final oldTopics =
- _subscribedTopics.where((t) => t.endsWith('.ticket')).toSet();
- // 退订旧的
- final toUnsub = oldTopics.difference(newTopics);
- if (toUnsub.isNotEmpty) {
- for (final t in toUnsub) {
- _subscribedTopics.remove(t);
- final parts = t.split('.');
- if (parts.length >= 2) {
- final sym = parts[1].toUpperCase();
- _subscribedSymbols.remove(sym);
- _mockPrices.remove(sym);
- _mockChanges.remove(sym);
- }
- }
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendUnsub(toUnsub.toList());
- }
- }
- // 订阅新的
- final toSub = newTopics.difference(oldTopics);
- if (toSub.isNotEmpty) {
- _subscribedTopics.addAll(toSub);
- for (final s in newSymbols) {
- _subscribedSymbols.add(s);
- if (_mockMode && !_mockPrices.containsKey(s)) {
- _mockPrices[s] = _seedPrice(s);
- _mockChanges[s] = (_random.nextDouble() - 0.3) * 8;
- }
- }
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendSub(toSub.toList());
- }
- }
- }
- /// 取消订阅
- void unsubscribeTicker(String symbol) {
- final topic = 'market.${symbol.toLowerCase()}.ticket';
- _subscribedTopics.remove(topic);
- _subscribedSymbols.remove(symbol);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendUnsub([topic]);
- }
- }
- // ── 标记价格/资金费率订阅 ──────────────────────────────
- /// 订阅标记价格+资金费率: market.{symbol}.mark
- void subscribeMark(String symbol) {
- final topic = 'market.${symbol.toLowerCase()}.mark';
- _subscribedTopics.add(topic);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendSub([topic]);
- }
- }
- void unsubscribeMark(String symbol) {
- final topic = 'market.${symbol.toLowerCase()}.mark';
- _subscribedTopics.remove(topic);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendUnsub([topic]);
- }
- }
- // ── 盘口订阅 ──────────────────────────────────────────
- /// 订阅盘口: market.{symbol}.depth
- void subscribeDepth(String symbol) {
- final topic = 'market.${symbol.toLowerCase()}.depth';
- _subscribedTopics.add(topic);
- _subscribedSymbols.add(symbol);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendSub([topic]);
- }
- }
- void unsubscribeDepth(String symbol) {
- final topic = 'market.${symbol.toLowerCase()}.depth';
- _subscribedTopics.remove(topic);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendUnsub([topic]);
- }
- }
- // ── 成交订阅 ──────────────────────────────────────────
- /// 订阅实时成交: market.{symbol}.trade
- void subscribeTrade(String symbol) {
- final topic = 'market.${symbol.toLowerCase()}.trade';
- _subscribedTopics.add(topic);
- _subscribedSymbols.add(symbol);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendSub([topic]);
- }
- }
- void unsubscribeTrade(String symbol) {
- final topic = 'market.${symbol.toLowerCase()}.trade';
- _subscribedTopics.remove(topic);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendUnsub([topic]);
- }
- }
- /// 兼容旧接口
- void subscribe(String symbol) => subscribeTicker(symbol);
- void unsubscribe(String symbol) => unsubscribeTicker(symbol);
- // ── K线订阅 ────────────────────────────────────────────
- /// 订阅 K线实时推送: market.{symbol}.kline.{interval}
- void subscribeKline(String symbol, String interval) {
- final topic = 'market.${symbol.toLowerCase()}.kline.$interval';
- _subscribedTopics.add(topic);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendSub([topic]);
- }
- }
- /// 取消 K线订阅
- void unsubscribeKline(String symbol, String interval) {
- final topic = 'market.${symbol.toLowerCase()}.kline.$interval';
- _subscribedTopics.remove(topic);
- if (!_mockMode && _state == WsConnectionState.connected) {
- _sendUnsub([topic]);
- }
- }
- /// 请求 K线历史数据(仅生产模式,mock 模式返回空列表)
- /// 返回的 List 中每项包含 open/close/high/low/volume/beginTime 等字段
- Future<List<Map<String, dynamic>>> requestKlineHistory({
- required String symbol,
- required String interval,
- required int from,
- required int to,
- }) async {
- if (_mockMode || _state != WsConnectionState.connected) {
- return [];
- }
- _reqId++;
- final id = '$_reqId';
- final completer = Completer<List<Map<String, dynamic>>>();
- _klineReqCompleters[id] = completer;
- final msg = jsonEncode({
- 'req': 'market.${symbol.toLowerCase()}.kline.$interval',
- 'id': id,
- 'from': from,
- 'to': to,
- });
- _channel?.sink.add(msg);
- // 超时保护,避免 completer 永远挂起
- return completer.future.timeout(
- const Duration(seconds: 10),
- onTimeout: () {
- _klineReqCompleters.remove(id);
- return [];
- },
- );
- }
- // ── WS 消息发送 ────────────────────────────────────────
- void _sendSub(List<String> topics) {
- _reqId++;
- final msg = jsonEncode({'sub': topics, 'id': '$_reqId'});
- _channel?.sink.add(msg);
- }
- void _sendUnsub(List<String> topics) {
- _reqId++;
- final msg = jsonEncode({'unsub': topics, 'id': '$_reqId'});
- _channel?.sink.add(msg);
- }
- // ══════════════════════════════════════════════════════
- // Mock 模式
- // ══════════════════════════════════════════════════════
- void _connectMock() {
- _setState(WsConnectionState.connecting);
- final isFirstConnect = _reconnectCount == 0;
- Future.delayed(const Duration(milliseconds: 700), () {
- if (_connStateCtrl.isClosed) return;
- _setState(WsConnectionState.connected);
- _reconnectCount = 0;
- _startMockData();
- if (isFirstConnect) {
- _simulateDisconnectTimer =
- Timer(const Duration(seconds: 45), _triggerMockDisconnect);
- }
- });
- }
- void _triggerMockDisconnect() {
- if (_state != WsConnectionState.connected) return;
- _mockDataTimer?.cancel();
- _setState(WsConnectionState.reconnecting);
- _reconnectCount++;
- _reconnectTimer = Timer(
- Duration(seconds: min(pow(2, _reconnectCount).toInt(), 30)),
- _connectMock);
- }
- // ── Mock Data Generation ──────────────────────────────
- void _startMockData() {
- _mockDataTimer?.cancel();
- _mockDataTimer = Timer.periodic(const Duration(milliseconds: 1800), (_) {
- if (_state != WsConnectionState.connected) return;
- _emitTickers();
- _emitOrderBooks();
- _emitTrades();
- });
- }
- double _seedPrice(String symbol) {
- if (symbol.startsWith('BTC')) return 67200 + _random.nextDouble() * 800;
- if (symbol.startsWith('ETH')) return 3480 + _random.nextDouble() * 80;
- if (symbol.startsWith('BNB')) return 558 + _random.nextDouble() * 15;
- if (symbol.startsWith('SOL')) return 142 + _random.nextDouble() * 6;
- if (symbol.startsWith('XRP')) return 0.61 + _random.nextDouble() * 0.04;
- if (symbol.startsWith('ADA')) return 0.45 + _random.nextDouble() * 0.02;
- if (symbol.startsWith('DOGE')) return 0.12 + _random.nextDouble() * 0.01;
- if (symbol.startsWith('AVAX')) return 38 + _random.nextDouble() * 2;
- if (symbol.startsWith('LINK')) return 14 + _random.nextDouble() * 0.5;
- if (symbol.startsWith('DOT')) return 7.5 + _random.nextDouble() * 0.3;
- return 1.0 + _random.nextDouble() * 0.5;
- }
- void _emitTickers() {
- for (final symbol in _subscribedSymbols) {
- final current = _mockPrices[symbol] ?? _seedPrice(symbol);
- final pct = (_random.nextDouble() - 0.5) * 0.002;
- final newPrice = (current * (1 + pct));
- _mockPrices[symbol] = newPrice;
- final currentChange = _mockChanges[symbol] ?? 0.0;
- final changeShift = (_random.nextDouble() - 0.5) * 0.05;
- _mockChanges[symbol] = currentChange + changeShift;
- if (!_tickerCtrl.isClosed) {
- _tickerCtrl.add({
- 'symbol': symbol,
- 'price': newPrice,
- 'change24h': _mockChanges[symbol],
- 'volume24h': 100000 + _random.nextDouble() * 900000,
- });
- }
- }
- }
- void _emitOrderBooks() {
- for (final symbol in _subscribedSymbols) {
- final mid = _mockPrices[symbol] ?? 100.0;
- final bids = List.generate(10, (i) {
- final px = mid * (1 - 0.0008 * (i + 1) - _random.nextDouble() * 0.0002);
- final qty = 0.05 + _random.nextDouble() * 3.0;
- return {'price': px, 'quantity': qty};
- });
- final asks = List.generate(10, (i) {
- final px = mid * (1 + 0.0008 * (i + 1) + _random.nextDouble() * 0.0002);
- final qty = 0.05 + _random.nextDouble() * 3.0;
- return {'price': px, 'quantity': qty};
- });
- if (!_orderBookCtrl.isClosed) {
- _orderBookCtrl.add({
- 'symbol': symbol,
- 'bids': bids,
- 'asks': asks,
- });
- }
- }
- }
- void _emitTrades() {
- for (final symbol in _subscribedSymbols) {
- if (_random.nextDouble() < 0.6) {
- final price = _mockPrices[symbol] ?? 100.0;
- final noise = (_random.nextDouble() - 0.5) * 0.001;
- if (!_tradeCtrl.isClosed) {
- _tradeCtrl.add({
- 'symbol': symbol,
- 'price': price * (1 + noise),
- 'quantity': 0.001 + _random.nextDouble() * 0.5,
- 'isBuyerMaker': _random.nextBool(),
- 'tradeId': 'mock_${DateTime.now().millisecondsSinceEpoch}',
- 'time': DateTime.now().toIso8601String(),
- });
- }
- }
- }
- }
- // ── Dispose ───────────────────────────────────────────
- void dispose() {
- _stopHeartbeat();
- _mockDataTimer?.cancel();
- _reconnectTimer?.cancel();
- _simulateDisconnectTimer?.cancel();
- _channelSub?.cancel();
- _channel?.sink.close();
- _connStateCtrl.close();
- _tickerCtrl.close();
- _orderBookCtrl.close();
- _tradeCtrl.close();
- _klineCtrl.close();
- // 取消所有未完成的 K线历史请求
- for (final c in _klineReqCompleters.values) {
- if (!c.isCompleted) c.complete([]);
- }
- _klineReqCompleters.clear();
- }
- }
|