1. create fix client communicate with fix server used camel-quickfix-starter
pom.xml:
application.xml:
src/main/resources/okcoin/inprocess.cfg:
QFixConfig.java:
FixApi.java:
QuickfixjEventJsonTransformer.java:
Last:
Thanks!
pom.xml:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ahc-ws-starter</artifactId> <version>2.19.2</version> </dependency>
application.xml:
my: okcoin: wsuri: wss://real.okcoin.cn:10440/websocket/okcoinapi cfg: okcoin sessionID: ?sessionID=FIX.4.4:f75dcc4d-13fa-4118-a908-050b530f6913->OKSERVER userName: f75dcc4d-13fa-4118-a908-050b530f6913 password: MY_OKCOIN_PASSWORD
src/main/resources/okcoin/inprocess.cfg:
[session] BeginString=FIX.4.4 #FileStorePath=data/okclient #FileLogPath=data/okclientlog ConnectionType=initiator TargetCompID=OKSERVER StartTime=00:00:00 EndTime=00:00:00 HeartBtInt=30 ReconnectInterval=5 UseDataDictionary=Y DataDictionary=okcoin/FIX44.xml ResetOnLogon=Y ResetOnLogout=Y FileStoreMaxCachedMsgs=10 ResetOnDisconnect=Y ResetOnError=Y SocketUseSSL=Y ValidateUserDefinedFields=N SenderCompID=f75dcc4d-13fa-4118-a908-050b530f6913 # 中国站 SocketConnectHost=fix.okcoin.cn SocketConnectPort=9880 #国际站 # SocketConnectHost=api.okcoin.cn # SocketConnectPort=9880
QFixConfig.java:
package com.hx98.server.config; import java.io.Serializable; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Component @ConfigurationProperties(prefix="my") public class QFixConfig { public static final class Service implements Serializable{ private static final long serialVersionUID = 1L; private Integer minStartRqtId = 111111111; private Integer maxStartRqtId = 999999999; private String wsuri; private String cfg; private String sessionID; private String userName; private String password; public String getCfg() { return cfg; } public void setCfg(String cfg) { this.cfg = cfg; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public Integer getMinStartRqtId() { return minStartRqtId; } public void setMinStartRqtId(Integer minStartRqtId) { this.minStartRqtId = minStartRqtId; } public Integer getMaxStartRqtId() { return maxStartRqtId; } public void setMaxStartRqtId(Integer maxStartRqtId) { this.maxStartRqtId = maxStartRqtId; } public String getSessionID() { return sessionID; } public void setSessionID(String sessionID) { this.sessionID = sessionID; } public String getWsuri() { return wsuri; } public void setWsuri(String wsuri) { this.wsuri = wsuri; } } private Service okcoin = new Service(); public Service getOkcoin() { return okcoin; } public void setOkcoin(Service okcoin) { this.okcoin = okcoin; } }
FixApi.java:
package com.hx98.server.api.okcoin; import java.util.concurrent.CountDownLatch; import org.apache.camel.CamelContext; import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Handler; import org.apache.camel.Producer; import org.apache.camel.builder.PredicateBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.quickfixj.QuickfixjEndpoint; import org.apache.camel.component.quickfixj.QuickfixjEventCategory; import org.apache.log4j.Logger; import com.hx98.server.api.FinanceFetchBaseApi; import com.hx98.server.config.QFixConfig; import com.hx98.server.transform.QuickfixjEventJsonTransformer; import com.hx98.server.utils.CountDownLatchDecrementer; import quickfix.ConfigError; import quickfix.FieldNotFound; import quickfix.Message; import quickfix.RejectLogon; import quickfix.field.MsgType; import quickfix.field.RawData; import quickfix.fix44.Logon; import quickfix.fix44.MarketDataRequest; public class FixApi extends RouteBuilder { private static final Logger log = Logger.getLogger(FixApi.class); final CountDownLatch logoutLatch = new CountDownLatch(1); private CamelContext camelContext; // @Bean // String myBean() { // return "I'm Spring bean!"; // } @Override public void configure() { final String tradeName = getOkcoinConfig().getCfg(); camelContext = getContext(); // from("timer:trigger") // .transform().simple("ref:myBean") // .to("log:out"); try { from("quickfix:" + tradeName + "/inprocess.cfg") .filter(PredicateBuilder.and( header(QuickfixjEndpoint.EVENT_CATEGORY_KEY) .isEqualTo(QuickfixjEventCategory.AdminMessageSent), header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON))) .bean(new CredentialInjector()); from("quickfix:" + tradeName + "/inprocess.cfg") .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY) .isEqualTo(QuickfixjEventCategory.SessionLogoff)) .bean(new CountDownLatchDecrementer("logout", logoutLatch)); from("quickfix:" + tradeName + "/inprocess.cfg") .filter(PredicateBuilder.or( header(QuickfixjEndpoint.EVENT_CATEGORY_KEY) .isEqualTo(QuickfixjEventCategory.AdminMessageSent), header(QuickfixjEndpoint.EVENT_CATEGORY_KEY) .isEqualTo(QuickfixjEventCategory.AppMessageSent), header(QuickfixjEndpoint.EVENT_CATEGORY_KEY) .isEqualTo(QuickfixjEventCategory.AdminMessageReceived), header(QuickfixjEndpoint.EVENT_CATEGORY_KEY) .isEqualTo(QuickfixjEventCategory.AppMessageReceived))) .bean(new QuickfixjEventJsonTransformer()).to("log:routing"); from("quickfix:" + tradeName + "/inprocess.cfg") .filter(PredicateBuilder.and( header(QuickfixjEndpoint.EVENT_CATEGORY_KEY) .isEqualTo(QuickfixjEventCategory.AdminMessageReceived), header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON))) .bean(new LogonAuthenticator()); from("quickfix:" + tradeName + "/inprocess.cfg").filter(routeBuilder .header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)) .bean(new SessionLogon()); from("quickfix:" + tradeName + "/inprocess.cfg") .filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY) .isEqualTo(MsgType.MARKET_DATA_SNAPSHOT_FULL_REFRESH)) .bean(new QuickfixjEventJsonTransformer()) .to("rabbitmq:rmq-srv/finance.quote") //.to("websocket://0.0.0.0:8082/jfix") ; } catch (ConfigError e) { e.printStackTrace(); } } private static Integer curRqtId = null; private String generateRequestId() { QFixConfig.Service okcoinConfig = getOkcoinConfig(); if (curRqtId == null) { curRqtId = okcoinConfig.getMinStartRqtId(); } Integer ret = curRqtId++; if (curRqtId > okcoinConfig.getMaxStartRqtId()) curRqtId = okcoinConfig.getMinStartRqtId(); return ret.toString(); } public static class LogonAuthenticator { @Handler public void authenticate(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound { log.info("LogonAuthenticator Acceptor is logon for " + exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY)); Message message = exchange.getIn().getMandatoryBody(Message.class); if (message.isSetField(RawData.FIELD)) { log.info("LogonAuthenticator body: " + message.getString(RawData.FIELD)); } } } public class SessionLogon { @Handler public void logon(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound { log.info("logon is received!"); MarketDataRequest marketDataRequest = new MarketDataRequest(new quickfix.field.MDReqID(generateRequestId()), new quickfix.field.SubscriptionRequestType( quickfix.field.SubscriptionRequestType.SNAPSHOT_PLUS_UPDATES), new quickfix.field.MarketDepth(1)); final char[] fields = new char[] { quickfix.field.MDEntryType.OPENING_PRICE, quickfix.field.MDEntryType.CLOSING_PRICE, quickfix.field.MDEntryType.TRADING_SESSION_HIGH_PRICE, quickfix.field.MDEntryType.TRADING_SESSION_LOW_PRICE, quickfix.field.MDEntryType.TRADING_SESSION_VWAP_PRICE, quickfix.field.MDEntryType.TRADE_VOLUME }; MarketDataRequest.NoMDEntryTypes noMDEntryTypes = new MarketDataRequest.NoMDEntryTypes(); for (char f : fields) { noMDEntryTypes.set(new quickfix.field.MDEntryType(f)); marketDataRequest.addGroup(noMDEntryTypes); } final String[] symbols = new String[] { "BTC/CNY" }; // , "LTC/CNY" // only 1 MarketDataRequest.NoRelatedSym noRelatedSym = new MarketDataRequest.NoRelatedSym(); for (String symbol : symbols) { noRelatedSym.setField(new quickfix.field.Symbol(symbol)); marketDataRequest.addGroup(noRelatedSym); } marketDataRequest.setField(new quickfix.field.MDUpdateType(quickfix.field.MDUpdateType.FULL_REFRESH)); // exchange.getOut().setBody(marketDataRequest); // above row can not worked. // below is create one new exchange messge and send messge to // server. send(marketDataRequest); } } private Endpoint requestEndpoint; private Producer requestProducer; public void send(Message message) { try { QFixConfig.Service okcoinConfig = getOkcoinConfig(); if (requestEndpoint == null) { String marketUri = "quickfix:" + okcoinConfig.getCfg() + "/inprocess.cfg" + okcoinConfig.getSessionID(); requestEndpoint = camelContext.getEndpoint(marketUri); } if (requestProducer == null) { requestProducer = requestEndpoint.createProducer(); } Exchange requestExchange = requestEndpoint.createExchange(ExchangePattern.InOnly); requestExchange.getIn().setBody(message); requestProducer.process(requestExchange); } catch (Exception e) { e.printStackTrace(); } } public class CredentialInjector { @Handler public void inject(Exchange exchange) throws CamelExchangeException { QFixConfig.Service okcoinConfig = getOkcoinConfig(); log.info("Injecting password into outgoing logon message"); Message message = exchange.getIn().getMandatoryBody(Message.class); Logon logon = (Logon) message; String s; s = okcoinConfig.getUserName(); if ((s != null) && (!s.isEmpty())) logon.setField(new quickfix.field.Username(s)); s = okcoinConfig.getPassword(); if ((s != null) && (!s.isEmpty())) logon.setField(new quickfix.field.Password(s)); } } }
QuickfixjEventJsonTransformer.java:
package com.hx98.server.transform; import org.apache.camel.Exchange; import org.apache.camel.Handler; import org.apache.camel.component.quickfixj.QuickfixjEndpoint; import quickfix.ConfigError; import quickfix.DataDictionary; import quickfix.Message; import quickfix.Session; import quickfix.SessionID; public class QuickfixjEventJsonTransformer { private final QuickfixjMessageJsonTransformer renderer; public QuickfixjEventJsonTransformer() throws ConfigError { renderer = new QuickfixjMessageJsonTransformer(); } @Handler public String transform(Exchange exchange) { SessionID sessionID = exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY, SessionID.class); Session session = Session.lookupSession(sessionID); DataDictionary dataDictionary = session.getDataDictionary(); if (dataDictionary == null) { throw new IllegalStateException("No Data Dictionary. Exchange must reference an existing session"); } StringBuilder sb = new StringBuilder(); sb.append("\"event\": {\n"); org.apache.camel.Message in = exchange.getIn(); for (String key : in.getHeaders().keySet()) { sb.append(" \"").append(key).append("\": ").append(in.getHeader(key)).append(",\n"); } sb.append(renderer.transform(in.getBody(Message.class), " ", dataDictionary)).append("\n"); sb.append("}\n"); return sb.toString(); } }
Last:
Thanks!