|
@@ -0,0 +1,262 @@
|
|
|
|
|
+package com.copy.trade.consumer;
|
|
|
|
|
+
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.copy.trade.constant.KafkaMessageType;
|
|
|
|
|
+import com.copy.trade.constant.KafkaTopic;
|
|
|
|
|
+import com.copy.trade.constant.RedisKey;
|
|
|
|
|
+import com.copy.trade.entity.dto.*;
|
|
|
|
|
+import com.copy.trade.service.IClosePositionService;
|
|
|
|
|
+import com.copy.trade.service.ICopiedTradeService;
|
|
|
|
|
+import com.copy.trade.service.IPositionService;
|
|
|
|
|
+import com.copy.trade.util.BeanCopior;
|
|
|
|
|
+
|
|
|
|
|
+import java.math.BigDecimal;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
+
|
|
|
|
|
+import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
|
|
+
|
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
|
+import org.springframework.kafka.support.Acknowledgment;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+@Component
|
|
|
|
|
+public class KafkaConsumer {
|
|
|
|
|
+ private final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
|
|
|
|
|
+
|
|
|
|
|
+ private final ExecutorService entrustCallbackExecutor = Executors.newFixedThreadPool(8);
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ICopiedTradeService copiedTradeServiceImpl;
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private IPositionService positionServiceImpl;
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private IClosePositionService closePositionServiceImpl;
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private StringRedisTemplate stringRedisTemplate;
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
|
+
|
|
|
|
|
+ //开仓成功消息
|
|
|
|
|
+ @KafkaListener(topics = {"swap-order-open-done"}, containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void onSwapOrderOpen(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ ContractOrderEntrust contractOrderEntrust = JSONObject.parseObject(record.value(), ContractOrderEntrust.class);
|
|
|
|
|
+ if (contractOrderEntrust.getFollowId() != null) {
|
|
|
|
|
+ final ContractOrderEntrust finalOrder = contractOrderEntrust;
|
|
|
|
|
+ entrustCallbackExecutor.submit(() -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ copiedTradeServiceImpl.entrustCallback(finalOrder);
|
|
|
|
|
+ } catch (org.springframework.dao.DataIntegrityViolationException ex) {
|
|
|
|
|
+ // 唯一索引冲突:说明另一线程已写入,忽略
|
|
|
|
|
+ log.warn("entrustCallback 重复写入已忽略, followSystemId={}", finalOrder.getFollowSystemId());
|
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
|
+ log.error("entrustCallback 异步执行异常, memberId={}", finalOrder.getMemberId(), ex);
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ Boolean entrust = copiedTradeServiceImpl.copyTrade(contractOrderEntrust);
|
|
|
|
|
+ if (entrust) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("开仓成功发生异常:", e);
|
|
|
|
|
+ log.info("order:" + record.value());
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ //平仓成功消息
|
|
|
|
|
+ @KafkaListener(topics = {"swap-order-close-done"}, containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void onSwapOrderClose(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ ContractOrderEntrust contractOrderEntrust = JSONObject.parseObject(record.value(), ContractOrderEntrust.class);
|
|
|
|
|
+ Integer entrust = closePositionServiceImpl.traderClosePositionSuccess(contractOrderEntrust);
|
|
|
|
|
+ if (entrust==0) {
|
|
|
|
|
+ stringRedisTemplate.delete("CLOSE_RETRY_COUNT:" + contractOrderEntrust.getPositionId());
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ } else if (entrust==2) {
|
|
|
|
|
+ log.error("平仓处理,开仓数据丢失,重建开仓平仓组合记录, positionId:{}", contractOrderEntrust.getPositionId());
|
|
|
|
|
+ closePositionServiceImpl.reconstructOpenAndClose(contractOrderEntrust);
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ // 通知 doge 刷新交易账户与跟单账户内存缓存
|
|
|
|
|
+ kafkaTemplate.send("trading-account-change", contractOrderEntrust.getMemberId() + "");
|
|
|
|
|
+ kafkaTemplate.send("follow-account-change", contractOrderEntrust.getMemberId() + "");
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("平仓成功发生异常, order:{}", record.value(), e);
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ //跟单开仓失败消息
|
|
|
|
|
+ @KafkaListener(topics = {"swap-follow-order-open-fail"}, containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void onSwapFollowOrderOpenFail(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ ContractOrderEntrust contractOrderEntrust = JSONObject.parseObject(record.value(), ContractOrderEntrust.class);
|
|
|
|
|
+ if (contractOrderEntrust.getFollowId() != null) {
|
|
|
|
|
+ Boolean entrust = copiedTradeServiceImpl.entrustFollowOpenfail(contractOrderEntrust);
|
|
|
|
|
+ if (entrust) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("开仓失败发生异常:", e);
|
|
|
|
|
+ log.info("order:" + record.value());
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ //跟单平仓失败消息
|
|
|
|
|
+ @KafkaListener(topics = {"swap-follow-order-close-fail"}, containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void onSwapFollowOrderCloseFail(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ ContractOrderEntrust contractOrderEntrust = JSONObject.parseObject(record.value(), ContractOrderEntrust.class);
|
|
|
|
|
+ if (contractOrderEntrust.getFollowId() != null) {
|
|
|
|
|
+ Boolean callback = closePositionServiceImpl.followClosePositionCallback(contractOrderEntrust);
|
|
|
|
|
+ if (callback) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("平仓失败发生异常:", e);
|
|
|
|
|
+ log.info("order:" + record.value());
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 委托/撤单消费者
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param record
|
|
|
|
|
+ * @param ack
|
|
|
|
|
+ */
|
|
|
|
|
+ @KafkaListener(topics = {KafkaTopic.COPY_TRADE}, containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void traderOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
|
|
+ Boolean valid = valid(KafkaTopic.COPY_TRADE, record);
|
|
|
|
|
+ if (!valid) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ Message msg = JSONObject.parseObject(record.value(), Message.class);
|
|
|
|
|
+ Boolean entrust = false;
|
|
|
|
|
+ //校验业务类型
|
|
|
|
|
+ switch (msg.getType()) {
|
|
|
|
|
+ case KafkaMessageType.CONTRACT_ENTRUST:
|
|
|
|
|
+ ContractOrderEntrust contractOrderEntrust = JSONObject.parseObject(msg.getValue(), ContractOrderEntrust.class);
|
|
|
|
|
+ entrust = copiedTradeServiceImpl.copyTrade(contractOrderEntrust);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case KafkaMessageType.ENTRUST_CANCEL:
|
|
|
|
|
+ EntrustCancel entrustCancel = JSONObject.parseObject(msg.getValue(), EntrustCancel.class);
|
|
|
|
|
+ entrust = copiedTradeServiceImpl.entrustCancel(entrustCancel);
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (entrust) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 委托回调
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param record
|
|
|
|
|
+ * @param ack
|
|
|
|
|
+ */
|
|
|
|
|
+ @KafkaListener(topics = {KafkaTopic.ENTRUST_CALLBACK}, containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void entrustCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
|
|
+ Boolean valid = valid(KafkaTopic.ENTRUST_CALLBACK, record);
|
|
|
|
|
+ if (!valid) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ //EntrustCallback entrustCallback = JSONObject.parseObject(record.value(), EntrustCallback.class);
|
|
|
|
|
+ //Boolean callback = copiedTradeServiceImpl.entrustCallback(entrustCallback);
|
|
|
|
|
+ //if(callback){
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ //}
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 成交持仓
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param record
|
|
|
|
|
+ * @param ack
|
|
|
|
|
+ */
|
|
|
|
|
+ @KafkaListener(topics = {KafkaTopic.FOLLOW_POSITION}, containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void entrustDeal(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
|
|
+ Boolean valid = valid(KafkaTopic.FOLLOW_POSITION, record);
|
|
|
|
|
+ if (!valid) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ Message msg = JSONObject.parseObject(record.value(), Message.class);
|
|
|
|
|
+ Boolean deal = false;
|
|
|
|
|
+ switch (msg.getType()) {
|
|
|
|
|
+ case KafkaMessageType.ENTRUST_DEAL:
|
|
|
|
|
+ EntrustDeal entrustDeal = JSONObject.parseObject(msg.getValue(), EntrustDeal.class);
|
|
|
|
|
+ deal = positionServiceImpl.entrustDeal(entrustDeal);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case KafkaMessageType.CLOSE_POSITION:
|
|
|
|
|
+ TraderClosePosition traderClosePosition = JSONObject.parseObject(msg.getValue(), TraderClosePosition.class);
|
|
|
|
|
+ deal = closePositionServiceImpl.traderClosePosition(traderClosePosition);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case KafkaMessageType.CLOSE_POSITION_CANCEL:
|
|
|
|
|
+ String contractClosePositionEntrustNo = msg.getValue();
|
|
|
|
|
+ deal = closePositionServiceImpl.cancel(contractClosePositionEntrustNo);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case KafkaMessageType.CLOSE_POSITION_DEAL:
|
|
|
|
|
+ ClosePositionDeal closePositionDeal = JSONObject.parseObject(msg.getValue(), ClosePositionDeal.class);
|
|
|
|
|
+ deal = closePositionServiceImpl.closePositionDeal(closePositionDeal);
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (deal) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ @KafkaListener(topics = {KafkaTopic.CLOSE_POSITION_CALLBACK}, containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void followClosePositionCallback(ConsumerRecord<String, String> record, Acknowledgment ack) {
|
|
|
|
|
+ Boolean valid = valid(KafkaTopic.CLOSE_POSITION_CALLBACK, record);
|
|
|
|
|
+ if (!valid) {
|
|
|
|
|
+ ack.acknowledge();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ EntrustCallback entrustCallback = JSONObject.parseObject(record.value(), EntrustCallback.class);
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Boolean callback =
|
|
|
|
|
+ * closePositionServiceImpl.followClosePositionCallback(entrustCallback);
|
|
|
|
|
+ * if(callback){ ack.acknowledge(); return ; }
|
|
|
|
|
+ */
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private Boolean valid(String topic, ConsumerRecord<String, String> record) {
|
|
|
|
|
+ String messsage = record.value();
|
|
|
|
|
+ log.info("{}队列收到消息offset == {},key == {},value == {}", topic, record.offset(), record.key(), messsage);
|
|
|
|
|
+ if (!messsage.startsWith("{") || !messsage.endsWith("}")) {
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+}
|