apache camel with spring boot develop some draft - camel-quickfix-starter

1. create fix client communicate with fix server used camel-quickfix-starter
pom.xml:
  
  <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-ahc-ws-starter</artifactId>
   <version>2.19.2</version>
  </dependency>
  

application.xml:
my:
  okcoin:
    wsuri: wss://real.okcoin.cn:10440/websocket/okcoinapi
    cfg: okcoin
    sessionID: ?sessionID=FIX.4.4:f75dcc4d-13fa-4118-a908-050b530f6913->OKSERVER
    userName: f75dcc4d-13fa-4118-a908-050b530f6913
    password: MY_OKCOIN_PASSWORD

src/main/resources/okcoin/inprocess.cfg:
[session]
BeginString=FIX.4.4
#FileStorePath=data/okclient
#FileLogPath=data/okclientlog
ConnectionType=initiator
TargetCompID=OKSERVER
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=30
ReconnectInterval=5
UseDataDictionary=Y
DataDictionary=okcoin/FIX44.xml
ResetOnLogon=Y
ResetOnLogout=Y
FileStoreMaxCachedMsgs=10
ResetOnDisconnect=Y
ResetOnError=Y
SocketUseSSL=Y
ValidateUserDefinedFields=N
SenderCompID=f75dcc4d-13fa-4118-a908-050b530f6913

# 中国站
SocketConnectHost=fix.okcoin.cn
SocketConnectPort=9880

#国际站
# SocketConnectHost=api.okcoin.cn
# SocketConnectPort=9880

QFixConfig.java:
package com.hx98.server.config;

import java.io.Serializable;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component  
@ConfigurationProperties(prefix="my") 
public class QFixConfig {
 
 public static final class Service implements Serializable{
  private static final long serialVersionUID = 1L;

  private Integer minStartRqtId = 111111111;
  private Integer maxStartRqtId = 999999999;
  
  private String wsuri;
  private String cfg;
  private String sessionID;
  private String userName;
  private String password;
  
  public String getCfg() {
   return cfg;
  }
  public void setCfg(String cfg) {
   this.cfg = cfg;
  }
  public String getUserName() {
   return userName;
  }
  public void setUserName(String userName) {
   this.userName = userName;
  }
  public String getPassword() {
   return password;
  }
  public void setPassword(String password) {
   this.password = password;
  }
  public Integer getMinStartRqtId() {
   return minStartRqtId;
  }
  public void setMinStartRqtId(Integer minStartRqtId) {
   this.minStartRqtId = minStartRqtId;
  }
  public Integer getMaxStartRqtId() {
   return maxStartRqtId;
  }
  public void setMaxStartRqtId(Integer maxStartRqtId) {
   this.maxStartRqtId = maxStartRqtId;
  }
  public String getSessionID() {
   return sessionID;
  }
  public void setSessionID(String sessionID) {
   this.sessionID = sessionID;
  }
  public String getWsuri() {
   return wsuri;
  }
  public void setWsuri(String wsuri) {
   this.wsuri = wsuri;
  }
 }
 
 private Service okcoin = new Service();

 public Service getOkcoin() {
  return okcoin;
 }

 public void setOkcoin(Service okcoin) {
  this.okcoin = okcoin;
 }
}

FixApi.java:
package com.hx98.server.api.okcoin;

import java.util.concurrent.CountDownLatch;

import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Handler;
import org.apache.camel.Producer;
import org.apache.camel.builder.PredicateBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
import org.apache.log4j.Logger;

import com.hx98.server.api.FinanceFetchBaseApi;
import com.hx98.server.config.QFixConfig;
import com.hx98.server.transform.QuickfixjEventJsonTransformer;
import com.hx98.server.utils.CountDownLatchDecrementer;

import quickfix.ConfigError;
import quickfix.FieldNotFound;
import quickfix.Message;
import quickfix.RejectLogon;
import quickfix.field.MsgType;
import quickfix.field.RawData;
import quickfix.fix44.Logon;
import quickfix.fix44.MarketDataRequest;

public class FixApi extends RouteBuilder {

 private static final Logger log = Logger.getLogger(FixApi.class);

 final CountDownLatch logoutLatch = new CountDownLatch(1);

 private CamelContext camelContext;

//  @Bean
//  String myBean() {
//      return "I'm Spring bean!";
//  }
 
 @Override
 public void configure() { 
  final String tradeName = getOkcoinConfig().getCfg();

  camelContext = getContext();

  // from("timer:trigger")
  // .transform().simple("ref:myBean")
  // .to("log:out");

  try {
   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(PredicateBuilder.and(
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AdminMessageSent),
       header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON)))
     .bean(new CredentialInjector());

   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
       .isEqualTo(QuickfixjEventCategory.SessionLogoff))
     .bean(new CountDownLatchDecrementer("logout", logoutLatch));

   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(PredicateBuilder.or(
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AdminMessageSent),
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AppMessageSent),
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AdminMessageReceived),
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AppMessageReceived)))
     .bean(new QuickfixjEventJsonTransformer()).to("log:routing");

   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(PredicateBuilder.and(
       header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
         .isEqualTo(QuickfixjEventCategory.AdminMessageReceived),
       header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON)))
     .bean(new LogonAuthenticator());

   from("quickfix:" + tradeName + "/inprocess.cfg").filter(routeBuilder
     .header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon))
     .bean(new SessionLogon());

   from("quickfix:" + tradeName + "/inprocess.cfg")
     .filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY)
       .isEqualTo(MsgType.MARKET_DATA_SNAPSHOT_FULL_REFRESH))
     .bean(new QuickfixjEventJsonTransformer())
     .to("rabbitmq:rmq-srv/finance.quote")
     //.to("websocket://0.0.0.0:8082/jfix")
     ;

  } catch (ConfigError e) {
   e.printStackTrace();
  }
 }

 private static Integer curRqtId = null;

 private String generateRequestId() {
  QFixConfig.Service okcoinConfig = getOkcoinConfig();

  if (curRqtId == null) {
   curRqtId = okcoinConfig.getMinStartRqtId();
  }
  Integer ret = curRqtId++;
  if (curRqtId > okcoinConfig.getMaxStartRqtId())
   curRqtId = okcoinConfig.getMinStartRqtId();
  return ret.toString();
 }

 public static class LogonAuthenticator {
  @Handler
  public void authenticate(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound {
   log.info("LogonAuthenticator Acceptor is logon for "
     + exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY));
   Message message = exchange.getIn().getMandatoryBody(Message.class);
   if (message.isSetField(RawData.FIELD)) {
    log.info("LogonAuthenticator body: " + message.getString(RawData.FIELD));
   }
  }
 }

 public class SessionLogon {
  @Handler
  public void logon(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound {

   log.info("logon is received!");

   MarketDataRequest marketDataRequest = new MarketDataRequest(new quickfix.field.MDReqID(generateRequestId()),
     new quickfix.field.SubscriptionRequestType(
       quickfix.field.SubscriptionRequestType.SNAPSHOT_PLUS_UPDATES),
     new quickfix.field.MarketDepth(1));

   final char[] fields = new char[] { quickfix.field.MDEntryType.OPENING_PRICE,
     quickfix.field.MDEntryType.CLOSING_PRICE, quickfix.field.MDEntryType.TRADING_SESSION_HIGH_PRICE,
     quickfix.field.MDEntryType.TRADING_SESSION_LOW_PRICE,
     quickfix.field.MDEntryType.TRADING_SESSION_VWAP_PRICE, quickfix.field.MDEntryType.TRADE_VOLUME };

   MarketDataRequest.NoMDEntryTypes noMDEntryTypes = new MarketDataRequest.NoMDEntryTypes();
   for (char f : fields) {
    noMDEntryTypes.set(new quickfix.field.MDEntryType(f));
    marketDataRequest.addGroup(noMDEntryTypes);
   }

   final String[] symbols = new String[] { "BTC/CNY" }; // , "LTC/CNY"
                 // only 1

   MarketDataRequest.NoRelatedSym noRelatedSym = new MarketDataRequest.NoRelatedSym();
   for (String symbol : symbols) {
    noRelatedSym.setField(new quickfix.field.Symbol(symbol));
    marketDataRequest.addGroup(noRelatedSym);
   }

   marketDataRequest.setField(new quickfix.field.MDUpdateType(quickfix.field.MDUpdateType.FULL_REFRESH));

   // exchange.getOut().setBody(marketDataRequest);

   // above row can not worked.
   // below is create one new exchange messge and send messge to
   // server.
   send(marketDataRequest);
  }

 }

 private Endpoint requestEndpoint;
 private Producer requestProducer;

 public void send(Message message) {
  try {
   QFixConfig.Service okcoinConfig = getOkcoinConfig();

   if (requestEndpoint == null) {
    String marketUri = "quickfix:" + okcoinConfig.getCfg() + "/inprocess.cfg" + okcoinConfig.getSessionID();
    requestEndpoint = camelContext.getEndpoint(marketUri);
   }
   if (requestProducer == null) {
    requestProducer = requestEndpoint.createProducer();
   }
   Exchange requestExchange = requestEndpoint.createExchange(ExchangePattern.InOnly);
   requestExchange.getIn().setBody(message);
   requestProducer.process(requestExchange);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public class CredentialInjector {
  @Handler
  public void inject(Exchange exchange) throws CamelExchangeException {
   QFixConfig.Service okcoinConfig = getOkcoinConfig();

   log.info("Injecting password into outgoing logon message");
   Message message = exchange.getIn().getMandatoryBody(Message.class);
   Logon logon = (Logon) message;
   String s;

   s = okcoinConfig.getUserName();
   if ((s != null) && (!s.isEmpty()))
    logon.setField(new quickfix.field.Username(s));

   s = okcoinConfig.getPassword();
   if ((s != null) && (!s.isEmpty()))
    logon.setField(new quickfix.field.Password(s));
  }
 }
}


QuickfixjEventJsonTransformer.java:
package com.hx98.server.transform;

import org.apache.camel.Exchange;
import org.apache.camel.Handler;
import org.apache.camel.component.quickfixj.QuickfixjEndpoint;

import quickfix.ConfigError;
import quickfix.DataDictionary;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;

public class QuickfixjEventJsonTransformer {
    private final QuickfixjMessageJsonTransformer renderer;
    
    public QuickfixjEventJsonTransformer() throws ConfigError {
        renderer = new QuickfixjMessageJsonTransformer();
    }
    
    @Handler
    public String transform(Exchange exchange) {
        SessionID sessionID = exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY, SessionID.class);
        Session session = Session.lookupSession(sessionID);
        DataDictionary dataDictionary = session.getDataDictionary();
        
        if (dataDictionary == null) {
            throw new IllegalStateException("No Data Dictionary. Exchange must reference an existing session");
        }
        
        StringBuilder sb = new StringBuilder();
        sb.append("\"event\": {\n");
        
        org.apache.camel.Message in = exchange.getIn();
        for (String key : in.getHeaders().keySet()) {
            sb.append("  \"").append(key).append("\": ").append(in.getHeader(key)).append(",\n");                
        }
        
        sb.append(renderer.transform(in.getBody(Message.class), "  ", dataDictionary)).append("\n");
        sb.append("}\n");
        return sb.toString();
    }
}

Last:
Thanks!

Comments

Ajay Patil said…
Hi , is this code is working ? and can you please let me know where i can find this code
Ajay Patil said…
it would be helpful,if you add the working source code or link or github code link thanks in advance

Popular posts from this blog

#1064 - You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '' at line 3

2011-11

php openid test log