ws_client.dart 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. import 'dart:async';
  2. import 'dart:convert';
  3. import 'dart:math';
  4. import 'package:decimal/decimal.dart';
  5. import 'package:web_socket_channel/web_socket_channel.dart';
  6. import '../config/app_config.dart';
  7. /// WebSocket 连接状态
  8. enum WsConnectionState { connecting, connected, disconnected, reconnecting }
  9. /// WebSocket 消息类型
  10. enum WsChannel { ticker, orderBook, trade }
  11. /// WebSocket 客户端
  12. /// Mock 模式:使用 Timer.periodic 模拟实时数据流
  13. /// 生产模式:对接真实 WebSocket 服务器
  14. class WsClient {
  15. WsClient({
  16. required bool mockMode,
  17. String? wsUrl,
  18. this.onPersistentFailure,
  19. }) : _mockMode = mockMode,
  20. _wsUrl = wsUrl ?? AppConfig.effectiveWsUrl {
  21. _connect();
  22. }
  23. final bool _mockMode;
  24. String _wsUrl;
  25. final _random = Random();
  26. /// 持续重连失败回调:连续 [_failoverNotifyCount] 次重连失败后触发一次,
  27. /// 由外部(ws_provider)桥接到 nodeProvider.reportFailure(immediate: true),
  28. /// 用于联动节点切换。reconnectWithUrl 后失败计数清零,可再次触发。
  29. final void Function()? onPersistentFailure;
  30. // ── 常量 ────────────────────────────────────────────────
  31. static const int _heartbeatIntervalSec = 15;
  32. static const int _heartbeatTimeoutSec = 30;
  33. static const int _maxReconnectCount = 50;
  34. static const int _lowFreqReconnectSec = 120;
  35. /// 连续失败到第几次时通知外部尝试切节点
  36. static const int _failoverNotifyCount = 2;
  37. // ── Stream Controllers ────────────────────────────────
  38. final _connStateCtrl = StreamController<WsConnectionState>.broadcast();
  39. final _tickerCtrl = StreamController<Map<String, dynamic>>.broadcast();
  40. final _orderBookCtrl = StreamController<Map<String, dynamic>>.broadcast();
  41. final _tradeCtrl = StreamController<Map<String, dynamic>>.broadcast();
  42. final _klineCtrl = StreamController<Map<String, dynamic>>.broadcast();
  43. final _markCtrl = StreamController<Map<String, dynamic>>.broadcast();
  44. // K线历史查询回调:reqId → Completer
  45. final _klineReqCompleters = <String, Completer<List<Map<String, dynamic>>>>{};
  46. // ── State ─────────────────────────────────────────────
  47. WsConnectionState _state = WsConnectionState.disconnected;
  48. final _subscribedTopics = <String>{}; // 已订阅的完整 topic
  49. final _subscribedSymbols = <String>{}; // 已订阅的 symbol(mock 用)
  50. final _mockPrices = <String, double>{};
  51. final _mockChanges = <String, double>{};
  52. int _reconnectCount = 0;
  53. Timer? _mockDataTimer;
  54. Timer? _reconnectTimer;
  55. Timer? _simulateDisconnectTimer;
  56. Timer? _heartbeatTimer;
  57. DateTime? _lastMessageTime;
  58. // 生产 WS
  59. WebSocketChannel? _channel;
  60. StreamSubscription? _channelSub;
  61. int _reqId = 0;
  62. // ── Public Streams ────────────────────────────────────
  63. Stream<WsConnectionState> get connectionStream => _connStateCtrl.stream;
  64. Stream<Map<String, dynamic>> get tickerStream => _tickerCtrl.stream;
  65. Stream<Map<String, dynamic>> get orderBookStream => _orderBookCtrl.stream;
  66. Stream<Map<String, dynamic>> get tradeStream => _tradeCtrl.stream;
  67. Stream<Map<String, dynamic>> get klineStream => _klineCtrl.stream;
  68. Stream<Map<String, dynamic>> get markStream => _markCtrl.stream;
  69. WsConnectionState get currentState => _state;
  70. // ── Connection Management ─────────────────────────────
  71. void _setState(WsConnectionState s) {
  72. _state = s;
  73. if (!_connStateCtrl.isClosed) _connStateCtrl.add(s);
  74. }
  75. void _connect() {
  76. if (_mockMode) {
  77. _connectMock();
  78. } else {
  79. _connectReal();
  80. }
  81. }
  82. // ══════════════════════════════════════════════════════
  83. // 生产模式:真实 WebSocket 连接
  84. // ══════════════════════════════════════════════════════
  85. /// 当前 WebSocket 地址
  86. String get currentWsUrl => _wsUrl;
  87. /// 使用新的 wsUrl 重新连接(节点切换时调用)
  88. void reconnectWithUrl(String url) {
  89. _wsUrl = url;
  90. _stopHeartbeat();
  91. _channelSub?.cancel();
  92. _channel?.sink.close();
  93. _reconnectTimer?.cancel();
  94. _reconnectCount = 0;
  95. _connectReal();
  96. }
  97. void _connectReal() async {
  98. _setState(WsConnectionState.connecting);
  99. try {
  100. final uri = Uri.parse(_wsUrl);
  101. _channel = WebSocketChannel.connect(uri);
  102. // 等待 WebSocket 握手完成,避免握手未完成就发送订阅消息
  103. await _channel!.ready;
  104. // 握手完成后再标记为 connected
  105. if (_connStateCtrl.isClosed) return;
  106. print('[WsClient] connected to $_wsUrl');
  107. _setState(WsConnectionState.connected);
  108. _reconnectCount = 0;
  109. _lastMessageTime = DateTime.now();
  110. // 先注册监听,再发送订阅
  111. _channelSub = _channel!.stream.listen(
  112. (raw) {
  113. _lastMessageTime = DateTime.now();
  114. _onMessage(raw);
  115. },
  116. onError: (error) {
  117. _onDisconnect();
  118. },
  119. onDone: () {
  120. _onDisconnect();
  121. },
  122. );
  123. // 重连后重新订阅之前的 topic
  124. if (_subscribedTopics.isNotEmpty) {
  125. _sendSub(_subscribedTopics.toList());
  126. }
  127. // 启动心跳检测
  128. _startHeartbeat();
  129. } catch (e) {
  130. _onDisconnect();
  131. }
  132. }
  133. // ── 心跳机制 ────────────────────────────────────────────
  134. void _startHeartbeat() {
  135. _heartbeatTimer?.cancel();
  136. _heartbeatTimer = Timer.periodic(
  137. const Duration(seconds: _heartbeatIntervalSec),
  138. (_) {
  139. if (_state != WsConnectionState.connected) return;
  140. // 超时检测:超过 _heartbeatTimeoutSec 秒未收到任何消息
  141. final now = DateTime.now();
  142. if (_lastMessageTime != null &&
  143. now.difference(_lastMessageTime!).inSeconds >
  144. _heartbeatTimeoutSec) {
  145. print('[WsClient] heartbeat timeout, triggering reconnect');
  146. _onDisconnect();
  147. return;
  148. }
  149. // 发送 ping
  150. try {
  151. _channel?.sink.add(
  152. jsonEncode({'ping': now.millisecondsSinceEpoch}),
  153. );
  154. } catch (_) {
  155. _onDisconnect();
  156. }
  157. },
  158. );
  159. }
  160. void _stopHeartbeat() {
  161. _heartbeatTimer?.cancel();
  162. _heartbeatTimer = null;
  163. }
  164. /// 处理服务器推送的消息,按 ch/rep 字段分发到对应 StreamController
  165. void _onMessage(dynamic raw) {
  166. if (raw is! String) return;
  167. final Map<String, dynamic> msg;
  168. try {
  169. msg = jsonDecode(raw) as Map<String, dynamic>;
  170. } catch (_) {
  171. return;
  172. }
  173. // pong 响应,忽略(心跳已通过 _lastMessageTime 更新)
  174. if (msg.containsKey('pong')) return;
  175. // K线历史查询响应(rep 字段)
  176. if (msg.containsKey('rep')) {
  177. _handleKlineHistoryResponse(msg);
  178. return;
  179. }
  180. // ACK 响应(sub/unsub 确认),忽略
  181. if (msg.containsKey('status')) return;
  182. final ch = msg['ch'] as String?;
  183. if (ch == null) return;
  184. final tick = msg['tick'] as Map<String, dynamic>?;
  185. if (tick == null) return;
  186. // 根据 ch 中的主题类型分发
  187. if (ch.endsWith('.ticket')) {
  188. _dispatchTicker(ch, tick);
  189. } else if (ch.endsWith('.mark')) {
  190. _dispatchMark(ch, tick);
  191. } else if (ch.contains('.kline.')) {
  192. _dispatchKline(ch, tick);
  193. } else if (ch.endsWith('.depth')) {
  194. _dispatchDepth(ch, tick);
  195. } else if (ch.endsWith('.trade')) {
  196. _dispatchTrade(ch, tick);
  197. }
  198. }
  199. /// 解析 K线实时推送: tick.k 包含 t/o/c/h/l/v/q
  200. void _dispatchKline(String ch, Map<String, dynamic> tick) {
  201. final k = tick['k'] as Map<String, dynamic>?;
  202. if (k == null) return;
  203. final parts = ch.split('.');
  204. if (parts.length < 4) return;
  205. final symbol = parts[1].toUpperCase();
  206. final interval = parts[3];
  207. if (!_klineCtrl.isClosed) {
  208. _klineCtrl.add({
  209. 'symbol': symbol,
  210. 'interval': interval,
  211. 'time': k['t'] as int? ?? 0,
  212. 'open': double.tryParse('${k['o']}') ?? 0,
  213. 'close': double.tryParse('${k['c']}') ?? 0,
  214. 'high': double.tryParse('${k['h']}') ?? 0,
  215. 'low': double.tryParse('${k['l']}') ?? 0,
  216. 'volume': double.tryParse('${k['v']}') ?? 0,
  217. });
  218. }
  219. }
  220. /// 处理 K线历史查询响应,完成对应的 Completer
  221. void _handleKlineHistoryResponse(Map<String, dynamic> msg) {
  222. final id = msg['id'] as String?;
  223. final completer = id != null ? _klineReqCompleters.remove(id) : null;
  224. if (completer == null) return;
  225. if (msg['status'] == 'error') {
  226. completer.complete([]);
  227. return;
  228. }
  229. final data = msg['data'] as List<dynamic>? ?? [];
  230. completer.complete(data.cast<Map<String, dynamic>>());
  231. }
  232. /// 解析盘口推送: tick.bids/asks 各 20 档,每档 [price, qty]
  233. void _dispatchDepth(String ch, Map<String, dynamic> tick) {
  234. final parts = ch.split('.');
  235. if (parts.length < 3) return;
  236. final symbol = parts[1].toUpperCase();
  237. final rawBids = tick['bids'] as List<dynamic>? ?? [];
  238. final rawAsks = tick['asks'] as List<dynamic>? ?? [];
  239. List<Map<String, double>> parseEntries(List<dynamic> raw) {
  240. return raw
  241. .whereType<List<dynamic>>()
  242. .map((e) => {
  243. 'price': double.tryParse('${e[0]}') ?? 0.0,
  244. 'quantity': double.tryParse('${e[1]}') ?? 0.0,
  245. })
  246. .toList();
  247. }
  248. if (!_orderBookCtrl.isClosed) {
  249. _orderBookCtrl.add({
  250. 'symbol': symbol,
  251. 'bids': parseEntries(rawBids),
  252. 'asks': parseEntries(rawAsks),
  253. });
  254. }
  255. }
  256. /// 解析成交推送: tick.p=价格, tick.q=数量, tick.m=是否主动卖出
  257. void _dispatchTrade(String ch, Map<String, dynamic> tick) {
  258. final parts = ch.split('.');
  259. if (parts.length < 3) return;
  260. final symbol = parts[1].toUpperCase();
  261. if (!_tradeCtrl.isClosed) {
  262. _tradeCtrl.add({
  263. 'symbol': symbol,
  264. 'price': double.tryParse('${tick['p']}') ?? 0,
  265. 'quantity': double.tryParse('${tick['q']}') ?? 0,
  266. 'isBuyerMaker': tick['m'] == true,
  267. 'time': tick['T'] as int? ?? 0,
  268. 'tradeId': tick['a']?.toString() ?? '',
  269. });
  270. }
  271. }
  272. /// 解析 ticker 推送,使用 Decimal 精确解析金融数值。
  273. /// API 返回: tick.c=最新价(string), tick.o=24h开盘价, tick.q=成交额
  274. /// 涨跌幅用 Decimal 计算后再转 double 传给 UI。
  275. void _dispatchTicker(String ch, Map<String, dynamic> tick) {
  276. // 从 ch 提取 symbol: "market.btcusdt.ticket" → "BTCUSDT"
  277. final parts = ch.split('.');
  278. if (parts.length < 3) return;
  279. final symbol = parts[1].toUpperCase();
  280. // 用 Decimal.parse 保留原始精度,避免浮点误差
  281. final close = Decimal.tryParse('${tick['c']}') ?? Decimal.zero;
  282. final open = Decimal.tryParse('${tick['o']}') ?? Decimal.zero;
  283. final high = Decimal.tryParse('${tick['h']}') ?? Decimal.zero;
  284. final low = Decimal.tryParse('${tick['l']}') ?? Decimal.zero;
  285. final volume = Decimal.tryParse('${tick['v']}') ?? Decimal.zero;
  286. final turnover = Decimal.tryParse('${tick['q']}') ?? Decimal.zero;
  287. // 24h 涨跌幅 = (最新价 - 开盘价) / 开盘价 * 100
  288. final change24h = open > Decimal.zero
  289. ? ((close - open) / open)
  290. .toDecimal(scaleOnInfinitePrecision: 6)
  291. .toDouble() *
  292. 100
  293. : 0.0;
  294. if (!_tickerCtrl.isClosed) {
  295. _tickerCtrl.add({
  296. 'symbol': symbol,
  297. 'price': close.toDouble(), // UI 展示用 double 足够
  298. 'priceStr': '${tick['c']}', // WS 原始价格字符串,保留精度
  299. 'high24h': high.toDouble(), // 24h 最高价
  300. 'low24h': low.toDouble(), // 24h 最低价
  301. 'volume24h': turnover.toDouble(), // tick.q 成交额(USDT)
  302. 'volumeCoin': volume.toDouble(), // tick.v 成交量(张/币)
  303. 'change24h': change24h,
  304. });
  305. }
  306. }
  307. /// 解析标记价格/资金费率推送: market.{symbol}.mark
  308. /// tick.p=标记价格, tick.r=资金费率, tick.T=下次结算时间(秒或毫秒)
  309. void _dispatchMark(String ch, Map<String, dynamic> tick) {
  310. final parts = ch.split('.');
  311. if (parts.length < 3) return;
  312. final symbol = parts[1].toUpperCase();
  313. final markPrice = double.tryParse('${tick['p']}') ?? 0;
  314. final fundingRate = double.tryParse('${tick['r']}') ?? 0;
  315. // T 可能是 int / double / String,统一转 int
  316. final tRaw = tick['T'];
  317. final rawT = tRaw is int
  318. ? tRaw
  319. : tRaw is double
  320. ? tRaw.toInt()
  321. : int.tryParse('${tRaw ?? ''}') ??
  322. (double.tryParse('${tRaw ?? ''}') ?? 0.0).toInt();
  323. // 统一转毫秒:10 位及以下视为秒制(与 K 线时间戳处理一致)
  324. final nextFundingTimeMs =
  325. rawT > 0 && rawT < 10000000000 ? rawT * 1000 : rawT;
  326. if (!_markCtrl.isClosed) {
  327. _markCtrl.add({
  328. 'symbol': symbol,
  329. 'markPrice': markPrice,
  330. 'fundingRate': fundingRate,
  331. 'nextFundingTime': nextFundingTimeMs,
  332. });
  333. }
  334. }
  335. void _onDisconnect() {
  336. _stopHeartbeat();
  337. _channelSub?.cancel();
  338. _channel = null;
  339. if (_connStateCtrl.isClosed) return;
  340. _setState(WsConnectionState.reconnecting);
  341. _reconnectCount++;
  342. // 连续失败到阈值时,通知外部尝试切节点(只在等于阈值的那次触发,
  343. // reconnectWithUrl 重置 _reconnectCount 后可再次触发,避免重复风暴)
  344. if (_reconnectCount == _failoverNotifyCount) {
  345. try {
  346. onPersistentFailure?.call();
  347. } catch (_) {}
  348. }
  349. // 指数退避重连:2s, 4s, 8s, 16s... 最大 30s;超过上限后进入低频模式
  350. int delay;
  351. if (_reconnectCount > _maxReconnectCount) {
  352. delay = _lowFreqReconnectSec;
  353. print(
  354. '[WsClient] max reconnect reached ($_reconnectCount), low-freq mode: ${delay}s');
  355. } else {
  356. delay = min(pow(2, _reconnectCount).toInt(), 30);
  357. }
  358. print('[WsClient] reconnecting in ${delay}s (attempt $_reconnectCount)');
  359. _reconnectTimer?.cancel();
  360. _reconnectTimer = Timer(Duration(seconds: delay), _connectReal);
  361. }
  362. // ── Subscription(统一接口,mock/real 都走这里)────────
  363. /// 订阅 ticker 主题,格式: market.{symbol_lowercase}.ticket
  364. void subscribeTicker(String symbol) {
  365. final topic = 'market.${symbol.toLowerCase()}.ticket';
  366. _subscribedTopics.add(topic);
  367. _subscribedSymbols.add(symbol);
  368. if (_mockMode) {
  369. // Mock 模式种子价格
  370. if (!_mockPrices.containsKey(symbol)) {
  371. _mockPrices[symbol] = _seedPrice(symbol);
  372. _mockChanges[symbol] = (_random.nextDouble() - 0.3) * 8;
  373. }
  374. } else if (_state == WsConnectionState.connected) {
  375. _sendSub([topic]);
  376. }
  377. }
  378. /// 批量订阅多个 symbol 的 ticker
  379. void subscribeTickerBatch(List<String> symbols) {
  380. final topics = <String>[];
  381. for (final symbol in symbols) {
  382. final topic = 'market.${symbol.toLowerCase()}.ticket';
  383. _subscribedTopics.add(topic);
  384. _subscribedSymbols.add(symbol);
  385. topics.add(topic);
  386. if (_mockMode && !_mockPrices.containsKey(symbol)) {
  387. _mockPrices[symbol] = _seedPrice(symbol);
  388. _mockChanges[symbol] = (_random.nextDouble() - 0.3) * 8;
  389. }
  390. }
  391. if (!_mockMode && _state == WsConnectionState.connected) {
  392. _sendSub(topics);
  393. }
  394. }
  395. /// 差量订阅:退订不在新列表中的 ticker,订阅新增的 ticker
  396. void resubscribeTickerBatch(List<String> newSymbols) {
  397. final newTopics =
  398. newSymbols.map((s) => 'market.${s.toLowerCase()}.ticket').toSet();
  399. final oldTopics =
  400. _subscribedTopics.where((t) => t.endsWith('.ticket')).toSet();
  401. // 退订旧的
  402. final toUnsub = oldTopics.difference(newTopics);
  403. if (toUnsub.isNotEmpty) {
  404. for (final t in toUnsub) {
  405. _subscribedTopics.remove(t);
  406. final parts = t.split('.');
  407. if (parts.length >= 2) {
  408. final sym = parts[1].toUpperCase();
  409. _subscribedSymbols.remove(sym);
  410. _mockPrices.remove(sym);
  411. _mockChanges.remove(sym);
  412. }
  413. }
  414. if (!_mockMode && _state == WsConnectionState.connected) {
  415. _sendUnsub(toUnsub.toList());
  416. }
  417. }
  418. // 订阅新的
  419. final toSub = newTopics.difference(oldTopics);
  420. if (toSub.isNotEmpty) {
  421. _subscribedTopics.addAll(toSub);
  422. for (final s in newSymbols) {
  423. _subscribedSymbols.add(s);
  424. if (_mockMode && !_mockPrices.containsKey(s)) {
  425. _mockPrices[s] = _seedPrice(s);
  426. _mockChanges[s] = (_random.nextDouble() - 0.3) * 8;
  427. }
  428. }
  429. if (!_mockMode && _state == WsConnectionState.connected) {
  430. _sendSub(toSub.toList());
  431. }
  432. }
  433. }
  434. /// 取消订阅
  435. void unsubscribeTicker(String symbol) {
  436. final topic = 'market.${symbol.toLowerCase()}.ticket';
  437. _subscribedTopics.remove(topic);
  438. _subscribedSymbols.remove(symbol);
  439. if (!_mockMode && _state == WsConnectionState.connected) {
  440. _sendUnsub([topic]);
  441. }
  442. }
  443. // ── 标记价格/资金费率订阅 ──────────────────────────────
  444. /// 订阅标记价格+资金费率: market.{symbol}.mark
  445. void subscribeMark(String symbol) {
  446. final topic = 'market.${symbol.toLowerCase()}.mark';
  447. _subscribedTopics.add(topic);
  448. if (!_mockMode && _state == WsConnectionState.connected) {
  449. _sendSub([topic]);
  450. }
  451. }
  452. void unsubscribeMark(String symbol) {
  453. final topic = 'market.${symbol.toLowerCase()}.mark';
  454. _subscribedTopics.remove(topic);
  455. if (!_mockMode && _state == WsConnectionState.connected) {
  456. _sendUnsub([topic]);
  457. }
  458. }
  459. // ── 盘口订阅 ──────────────────────────────────────────
  460. /// 订阅盘口: market.{symbol}.depth
  461. void subscribeDepth(String symbol) {
  462. final topic = 'market.${symbol.toLowerCase()}.depth';
  463. _subscribedTopics.add(topic);
  464. _subscribedSymbols.add(symbol);
  465. if (!_mockMode && _state == WsConnectionState.connected) {
  466. _sendSub([topic]);
  467. }
  468. }
  469. void unsubscribeDepth(String symbol) {
  470. final topic = 'market.${symbol.toLowerCase()}.depth';
  471. _subscribedTopics.remove(topic);
  472. if (!_mockMode && _state == WsConnectionState.connected) {
  473. _sendUnsub([topic]);
  474. }
  475. }
  476. // ── 成交订阅 ──────────────────────────────────────────
  477. /// 订阅实时成交: market.{symbol}.trade
  478. void subscribeTrade(String symbol) {
  479. final topic = 'market.${symbol.toLowerCase()}.trade';
  480. _subscribedTopics.add(topic);
  481. _subscribedSymbols.add(symbol);
  482. if (!_mockMode && _state == WsConnectionState.connected) {
  483. _sendSub([topic]);
  484. }
  485. }
  486. void unsubscribeTrade(String symbol) {
  487. final topic = 'market.${symbol.toLowerCase()}.trade';
  488. _subscribedTopics.remove(topic);
  489. if (!_mockMode && _state == WsConnectionState.connected) {
  490. _sendUnsub([topic]);
  491. }
  492. }
  493. /// 兼容旧接口
  494. void subscribe(String symbol) => subscribeTicker(symbol);
  495. void unsubscribe(String symbol) => unsubscribeTicker(symbol);
  496. // ── K线订阅 ────────────────────────────────────────────
  497. /// 订阅 K线实时推送: market.{symbol}.kline.{interval}
  498. void subscribeKline(String symbol, String interval) {
  499. final topic = 'market.${symbol.toLowerCase()}.kline.$interval';
  500. _subscribedTopics.add(topic);
  501. if (!_mockMode && _state == WsConnectionState.connected) {
  502. _sendSub([topic]);
  503. }
  504. }
  505. /// 取消 K线订阅
  506. void unsubscribeKline(String symbol, String interval) {
  507. final topic = 'market.${symbol.toLowerCase()}.kline.$interval';
  508. _subscribedTopics.remove(topic);
  509. if (!_mockMode && _state == WsConnectionState.connected) {
  510. _sendUnsub([topic]);
  511. }
  512. }
  513. /// 请求 K线历史数据(仅生产模式,mock 模式返回空列表)
  514. /// 返回的 List 中每项包含 open/close/high/low/volume/beginTime 等字段
  515. Future<List<Map<String, dynamic>>> requestKlineHistory({
  516. required String symbol,
  517. required String interval,
  518. required int from,
  519. required int to,
  520. }) async {
  521. if (_mockMode || _state != WsConnectionState.connected) {
  522. return [];
  523. }
  524. _reqId++;
  525. final id = '$_reqId';
  526. final completer = Completer<List<Map<String, dynamic>>>();
  527. _klineReqCompleters[id] = completer;
  528. final msg = jsonEncode({
  529. 'req': 'market.${symbol.toLowerCase()}.kline.$interval',
  530. 'id': id,
  531. 'from': from,
  532. 'to': to,
  533. });
  534. _channel?.sink.add(msg);
  535. // 超时保护,避免 completer 永远挂起
  536. return completer.future.timeout(
  537. const Duration(seconds: 10),
  538. onTimeout: () {
  539. _klineReqCompleters.remove(id);
  540. return [];
  541. },
  542. );
  543. }
  544. // ── WS 消息发送 ────────────────────────────────────────
  545. void _sendSub(List<String> topics) {
  546. _reqId++;
  547. final msg = jsonEncode({'sub': topics, 'id': '$_reqId'});
  548. _channel?.sink.add(msg);
  549. }
  550. void _sendUnsub(List<String> topics) {
  551. _reqId++;
  552. final msg = jsonEncode({'unsub': topics, 'id': '$_reqId'});
  553. _channel?.sink.add(msg);
  554. }
  555. // ══════════════════════════════════════════════════════
  556. // Mock 模式
  557. // ══════════════════════════════════════════════════════
  558. void _connectMock() {
  559. _setState(WsConnectionState.connecting);
  560. final isFirstConnect = _reconnectCount == 0;
  561. Future.delayed(const Duration(milliseconds: 700), () {
  562. if (_connStateCtrl.isClosed) return;
  563. _setState(WsConnectionState.connected);
  564. _reconnectCount = 0;
  565. _startMockData();
  566. if (isFirstConnect) {
  567. _simulateDisconnectTimer =
  568. Timer(const Duration(seconds: 45), _triggerMockDisconnect);
  569. }
  570. });
  571. }
  572. void _triggerMockDisconnect() {
  573. if (_state != WsConnectionState.connected) return;
  574. _mockDataTimer?.cancel();
  575. _setState(WsConnectionState.reconnecting);
  576. _reconnectCount++;
  577. _reconnectTimer = Timer(
  578. Duration(seconds: min(pow(2, _reconnectCount).toInt(), 30)),
  579. _connectMock);
  580. }
  581. // ── Mock Data Generation ──────────────────────────────
  582. void _startMockData() {
  583. _mockDataTimer?.cancel();
  584. _mockDataTimer = Timer.periodic(const Duration(milliseconds: 1800), (_) {
  585. if (_state != WsConnectionState.connected) return;
  586. _emitTickers();
  587. _emitOrderBooks();
  588. _emitTrades();
  589. });
  590. }
  591. double _seedPrice(String symbol) {
  592. if (symbol.startsWith('BTC')) return 67200 + _random.nextDouble() * 800;
  593. if (symbol.startsWith('ETH')) return 3480 + _random.nextDouble() * 80;
  594. if (symbol.startsWith('BNB')) return 558 + _random.nextDouble() * 15;
  595. if (symbol.startsWith('SOL')) return 142 + _random.nextDouble() * 6;
  596. if (symbol.startsWith('XRP')) return 0.61 + _random.nextDouble() * 0.04;
  597. if (symbol.startsWith('ADA')) return 0.45 + _random.nextDouble() * 0.02;
  598. if (symbol.startsWith('DOGE')) return 0.12 + _random.nextDouble() * 0.01;
  599. if (symbol.startsWith('AVAX')) return 38 + _random.nextDouble() * 2;
  600. if (symbol.startsWith('LINK')) return 14 + _random.nextDouble() * 0.5;
  601. if (symbol.startsWith('DOT')) return 7.5 + _random.nextDouble() * 0.3;
  602. return 1.0 + _random.nextDouble() * 0.5;
  603. }
  604. void _emitTickers() {
  605. for (final symbol in _subscribedSymbols) {
  606. final current = _mockPrices[symbol] ?? _seedPrice(symbol);
  607. final pct = (_random.nextDouble() - 0.5) * 0.002;
  608. final newPrice = (current * (1 + pct));
  609. _mockPrices[symbol] = newPrice;
  610. final currentChange = _mockChanges[symbol] ?? 0.0;
  611. final changeShift = (_random.nextDouble() - 0.5) * 0.05;
  612. _mockChanges[symbol] = currentChange + changeShift;
  613. if (!_tickerCtrl.isClosed) {
  614. _tickerCtrl.add({
  615. 'symbol': symbol,
  616. 'price': newPrice,
  617. 'change24h': _mockChanges[symbol],
  618. 'volume24h': 100000 + _random.nextDouble() * 900000,
  619. });
  620. }
  621. }
  622. }
  623. void _emitOrderBooks() {
  624. for (final symbol in _subscribedSymbols) {
  625. final mid = _mockPrices[symbol] ?? 100.0;
  626. final bids = List.generate(10, (i) {
  627. final px = mid * (1 - 0.0008 * (i + 1) - _random.nextDouble() * 0.0002);
  628. final qty = 0.05 + _random.nextDouble() * 3.0;
  629. return {'price': px, 'quantity': qty};
  630. });
  631. final asks = List.generate(10, (i) {
  632. final px = mid * (1 + 0.0008 * (i + 1) + _random.nextDouble() * 0.0002);
  633. final qty = 0.05 + _random.nextDouble() * 3.0;
  634. return {'price': px, 'quantity': qty};
  635. });
  636. if (!_orderBookCtrl.isClosed) {
  637. _orderBookCtrl.add({
  638. 'symbol': symbol,
  639. 'bids': bids,
  640. 'asks': asks,
  641. });
  642. }
  643. }
  644. }
  645. void _emitTrades() {
  646. for (final symbol in _subscribedSymbols) {
  647. if (_random.nextDouble() < 0.6) {
  648. final price = _mockPrices[symbol] ?? 100.0;
  649. final noise = (_random.nextDouble() - 0.5) * 0.001;
  650. if (!_tradeCtrl.isClosed) {
  651. _tradeCtrl.add({
  652. 'symbol': symbol,
  653. 'price': price * (1 + noise),
  654. 'quantity': 0.001 + _random.nextDouble() * 0.5,
  655. 'isBuyerMaker': _random.nextBool(),
  656. 'tradeId': 'mock_${DateTime.now().millisecondsSinceEpoch}',
  657. 'time': DateTime.now().toIso8601String(),
  658. });
  659. }
  660. }
  661. }
  662. }
  663. // ── Dispose ───────────────────────────────────────────
  664. void dispose() {
  665. _stopHeartbeat();
  666. _mockDataTimer?.cancel();
  667. _reconnectTimer?.cancel();
  668. _simulateDisconnectTimer?.cancel();
  669. _channelSub?.cancel();
  670. _channel?.sink.close();
  671. _connStateCtrl.close();
  672. _tickerCtrl.close();
  673. _orderBookCtrl.close();
  674. _tradeCtrl.close();
  675. _klineCtrl.close();
  676. // 取消所有未完成的 K线历史请求
  677. for (final c in _klineReqCompleters.values) {
  678. if (!c.isCompleted) c.complete([]);
  679. }
  680. _klineReqCompleters.clear();
  681. }
  682. }