Sunday, August 27, 2017

apache camel with spring boot develop some draft - camel-quickfix-starter

1. create fix client communicate with fix server used camel-quickfix-starter
pom.xml:
  
  <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-ahc-ws-starter</artifactId>
   <version>2.19.2</version>
  </dependency>
  

application.xml:
my:
  okcoin:
    wsuri: wss://real.okcoin.cn:10440/websocket/okcoinapi
    cfg: okcoin
    sessionID: ?sessionID=FIX.4.4:f75dcc4d-13fa-4118-a908-050b530f6913->OKSERVER
    userName: f75dcc4d-13fa-4118-a908-050b530f6913
    password: MY_OKCOIN_PASSWORD

src/main/resources/okcoin/inprocess.cfg:
[session]
BeginString=FIX.4.4
#FileStorePath=data/okclient
#FileLogPath=data/okclientlog
ConnectionType=initiator
TargetCompID=OKSERVER
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=30
ReconnectInterval=5
UseDataDictionary=Y
DataDictionary=okcoin/FIX44.xml
ResetOnLogon=Y
ResetOnLogout=Y
FileStoreMaxCachedMsgs=10
ResetOnDisconnect=Y
ResetOnError=Y
SocketUseSSL=Y
ValidateUserDefinedFields=N
SenderCompID=f75dcc4d-13fa-4118-a908-050b530f6913

# 中国站
SocketConnectHost=fix.okcoin.cn
SocketConnectPort=9880

#国际站
# SocketConnectHost=api.okcoin.cn
# SocketConnectPort=9880

QFixConfig.java:
package com.hx98.server.config;

import java.io.Serializable;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component  
@ConfigurationProperties(prefix="my") 
public class QFixConfig {
 
 public static final class Service implements Serializable{
  private static final long serialVersionUID = 1L;

  private Integer minStartRqtId = 111111111;
  private Integer maxStartRqtId = 999999999;
  
  private String wsuri;
  private String cfg;
  private String sessionID;
  private String userName;
  private String password;
  
  public String getCfg() {
   return cfg;
  }
  public void setCfg(String cfg) {
   this.cfg = cfg;
  }
  public String getUserName() {
   return userName;
  }
  public void setUserName(String userName) {
   this.userName = userName;
  }
  public String getPassword() {
   return password;
  }
  public void setPassword(String password) {
   this.password = password;
  }
  public Integer getMinStartRqtId() {
   return minStartRqtId;
  }
  public void setMinStartRqtId(Integer minStartRqtId) {
   this.minStartRqtId = minStartRqtId;
  }
  public Integer getMaxStartRqtId() {
   return maxStartRqtId;
  }
  public void setMaxStartRqtId(Integer maxStartRqtId) {
   this.maxStartRqtId = maxStartRqtId;
  }
  public String getSessionID() {
   return sessionID;
  }
  public void setSessionID(String sessionID) {
   this.sessionID = sessionID;
  }
  public String getWsuri() {
   return wsuri;
  }
  public void setWsuri(String wsuri) {
   this.wsuri = wsuri;
  }
 }
 
 private Service okcoin = new Service();

 public Service getOkcoin() {
  return okcoin;
 }

 public void setOkcoin(Service okcoin) {
  this.okcoin = okcoin;
 }
}

FixApi.java:
package com.hx98.server.api.okcoin;

import java.util.concurrent.CountDownLatch;

import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Handler;
import org.apache.camel.Producer;
import org.apache.camel.builder.PredicateBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
import org.apache.log4j.Logger;

import com.hx98.server.api.FinanceFetchBaseApi;
import com.hx98.server.config.QFixConfig;
import com.hx98.server.transform.QuickfixjEventJsonTransformer;
import com.hx98.server.utils.CountDownLatchDecrementer;

import quickfix.ConfigError;
import quickfix.FieldNotFound;
import quickfix.Message;
import quickfix.RejectLogon;
import quickfix.field.MsgType;
import quickfix.field.RawData;
import quickfix.fix44.Logon;
import quickfix.fix44.MarketDataRequest;

public class FixApi extends RouteBuilder {

 private static final Logger log = Logger.getLogger(FixApi.class);

 final CountDownLatch logoutLatch = new CountDownLatch(1);

 private CamelContext camelContext;

//  @Bean
//  String myBean() {
//      return "I'm Spring bean!";
//  }
 
 @Override
 public void configure() { 
  final String tradeName = getOkcoinConfig().getCfg();

  camelContext = getContext();

  // from("timer:trigger")
  // .transform().simple("ref:myBean")
  // .to("log:out");

  try {
   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(PredicateBuilder.and(
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AdminMessageSent),
       header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON)))
     .bean(new CredentialInjector());

   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
       .isEqualTo(QuickfixjEventCategory.SessionLogoff))
     .bean(new CountDownLatchDecrementer("logout", logoutLatch));

   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(PredicateBuilder.or(
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AdminMessageSent),
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AppMessageSent),
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AdminMessageReceived),
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AppMessageReceived)))
     .bean(new QuickfixjEventJsonTransformer()).to("log:routing");

   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(PredicateBuilder.and(
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AdminMessageReceived),
       header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON)))
     .bean(new LogonAuthenticator());

   from("quickfix:" + tradeName + "/inprocess.cfg").filter(routeBuilder
     .header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon))
     .bean(new SessionLogon());

   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY)
       .isEqualTo(MsgType.MARKET_DATA_SNAPSHOT_FULL_REFRESH))
     .bean(new QuickfixjEventJsonTransformer())
     .to("rabbitmq:rmq-srv/finance.quote")
     //.to("websocket://0.0.0.0:8082/jfix")
     ;

  } catch (ConfigError e) {
   e.printStackTrace();
  }
 }

 private static Integer curRqtId = null;

 private String generateRequestId() {
  QFixConfig.Service okcoinConfig = getOkcoinConfig();

  if (curRqtId == null) {
   curRqtId = okcoinConfig.getMinStartRqtId();
  }
  Integer ret = curRqtId++;
  if (curRqtId > okcoinConfig.getMaxStartRqtId())
   curRqtId = okcoinConfig.getMinStartRqtId();
  return ret.toString();
 }

 public static class LogonAuthenticator {
  @Handler
  public void authenticate(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound {
   log.info("LogonAuthenticator Acceptor is logon for "
     + exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY));
   Message message = exchange.getIn().getMandatoryBody(Message.class);
   if (message.isSetField(RawData.FIELD)) {
    log.info("LogonAuthenticator body: " + message.getString(RawData.FIELD));
   }
  }
 }

 public class SessionLogon {
  @Handler
  public void logon(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound {

   log.info("logon is received!");

   MarketDataRequest marketDataRequest = new MarketDataRequest(new quickfix.field.MDReqID(generateRequestId()),
     new quickfix.field.SubscriptionRequestType(
       quickfix.field.SubscriptionRequestType.SNAPSHOT_PLUS_UPDATES),
     new quickfix.field.MarketDepth(1));

   final char[] fields = new char[] { quickfix.field.MDEntryType.OPENING_PRICE,
     quickfix.field.MDEntryType.CLOSING_PRICE, quickfix.field.MDEntryType.TRADING_SESSION_HIGH_PRICE,
     quickfix.field.MDEntryType.TRADING_SESSION_LOW_PRICE,
     quickfix.field.MDEntryType.TRADING_SESSION_VWAP_PRICE, quickfix.field.MDEntryType.TRADE_VOLUME };

   MarketDataRequest.NoMDEntryTypes noMDEntryTypes = new MarketDataRequest.NoMDEntryTypes();
   for (char f : fields) {
    noMDEntryTypes.set(new quickfix.field.MDEntryType(f));
    marketDataRequest.addGroup(noMDEntryTypes);
   }

   final String[] symbols = new String[] { "BTC/CNY" }; // , "LTC/CNY"
                 // only 1

   MarketDataRequest.NoRelatedSym noRelatedSym = new MarketDataRequest.NoRelatedSym();
   for (String symbol : symbols) {
    noRelatedSym.setField(new quickfix.field.Symbol(symbol));
    marketDataRequest.addGroup(noRelatedSym);
   }

   marketDataRequest.setField(new quickfix.field.MDUpdateType(quickfix.field.MDUpdateType.FULL_REFRESH));

   // exchange.getOut().setBody(marketDataRequest);

   // above row can not worked.
   // below is create one new exchange messge and send messge to
   // server.
   send(marketDataRequest);
  }

 }

 private Endpoint requestEndpoint;
 private Producer requestProducer;

 public void send(Message message) {
  try {
   QFixConfig.Service okcoinConfig = getOkcoinConfig();

   if (requestEndpoint == null) {
    String marketUri = "quickfix:" + okcoinConfig.getCfg() + "/inprocess.cfg" + okcoinConfig.getSessionID();
    requestEndpoint = camelContext.getEndpoint(marketUri);
   }
   if (requestProducer == null) {
    requestProducer = requestEndpoint.createProducer();
   }
   Exchange requestExchange = requestEndpoint.createExchange(ExchangePattern.InOnly);
   requestExchange.getIn().setBody(message);
   requestProducer.process(requestExchange);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public class CredentialInjector {
  @Handler
  public void inject(Exchange exchange) throws CamelExchangeException {
   QFixConfig.Service okcoinConfig = getOkcoinConfig();

   log.info("Injecting password into outgoing logon message");
   Message message = exchange.getIn().getMandatoryBody(Message.class);
   Logon logon = (Logon) message;
   String s;

   s = okcoinConfig.getUserName();
   if ((s != null) && (!s.isEmpty()))
    logon.setField(new quickfix.field.Username(s));

   s = okcoinConfig.getPassword();
   if ((s != null) && (!s.isEmpty()))
    logon.setField(new quickfix.field.Password(s));
  }
 }
}


QuickfixjEventJsonTransformer.java:
package com.hx98.server.transform;

import org.apache.camel.Exchange;
import org.apache.camel.Handler;
import org.apache.camel.component.quickfixj.QuickfixjEndpoint;

import quickfix.ConfigError;
import quickfix.DataDictionary;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;

public class QuickfixjEventJsonTransformer {
    private final QuickfixjMessageJsonTransformer renderer;
    
    public QuickfixjEventJsonTransformer() throws ConfigError {
        renderer = new QuickfixjMessageJsonTransformer();
    }
    
    @Handler
    public String transform(Exchange exchange) {
        SessionID sessionID = exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY, SessionID.class);
        Session session = Session.lookupSession(sessionID);
        DataDictionary dataDictionary = session.getDataDictionary();
        
        if (dataDictionary == null) {
            throw new IllegalStateException("No Data Dictionary. Exchange must reference an existing session");
        }
        
        StringBuilder sb = new StringBuilder();
        sb.append("\"event\": {\n");
        
        org.apache.camel.Message in = exchange.getIn();
        for (String key : in.getHeaders().keySet()) {
            sb.append("  \"").append(key).append("\": ").append(in.getHeader(key)).append(",\n");                
        }
        
        sb.append(renderer.transform(in.getBody(Message.class), "  ", dataDictionary)).append("\n");
        sb.append("}\n");
        return sb.toString();
    }
}

Last:
Thanks!