apache camel with spring boot develop some draft - camel-quickfix-starter
1. create fix client communicate with fix server used camel-quickfix-starter
pom.xml:
application.xml:
src/main/resources/okcoin/inprocess.cfg:
QFixConfig.java:
FixApi.java:
QuickfixjEventJsonTransformer.java:
Last:
Thanks!
pom.xml:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ahc-ws-starter</artifactId> <version>2.19.2</version> </dependency>
application.xml:
my:
okcoin:
wsuri: wss://real.okcoin.cn:10440/websocket/okcoinapi
cfg: okcoin
sessionID: ?sessionID=FIX.4.4:f75dcc4d-13fa-4118-a908-050b530f6913->OKSERVER
userName: f75dcc4d-13fa-4118-a908-050b530f6913
password: MY_OKCOIN_PASSWORD
src/main/resources/okcoin/inprocess.cfg:
[session] BeginString=FIX.4.4 #FileStorePath=data/okclient #FileLogPath=data/okclientlog ConnectionType=initiator TargetCompID=OKSERVER StartTime=00:00:00 EndTime=00:00:00 HeartBtInt=30 ReconnectInterval=5 UseDataDictionary=Y DataDictionary=okcoin/FIX44.xml ResetOnLogon=Y ResetOnLogout=Y FileStoreMaxCachedMsgs=10 ResetOnDisconnect=Y ResetOnError=Y SocketUseSSL=Y ValidateUserDefinedFields=N SenderCompID=f75dcc4d-13fa-4118-a908-050b530f6913 # 中国站 SocketConnectHost=fix.okcoin.cn SocketConnectPort=9880 #国际站 # SocketConnectHost=api.okcoin.cn # SocketConnectPort=9880
QFixConfig.java:
package com.hx98.server.config;
import java.io.Serializable;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix="my")
public class QFixConfig {
public static final class Service implements Serializable{
private static final long serialVersionUID = 1L;
private Integer minStartRqtId = 111111111;
private Integer maxStartRqtId = 999999999;
private String wsuri;
private String cfg;
private String sessionID;
private String userName;
private String password;
public String getCfg() {
return cfg;
}
public void setCfg(String cfg) {
this.cfg = cfg;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public Integer getMinStartRqtId() {
return minStartRqtId;
}
public void setMinStartRqtId(Integer minStartRqtId) {
this.minStartRqtId = minStartRqtId;
}
public Integer getMaxStartRqtId() {
return maxStartRqtId;
}
public void setMaxStartRqtId(Integer maxStartRqtId) {
this.maxStartRqtId = maxStartRqtId;
}
public String getSessionID() {
return sessionID;
}
public void setSessionID(String sessionID) {
this.sessionID = sessionID;
}
public String getWsuri() {
return wsuri;
}
public void setWsuri(String wsuri) {
this.wsuri = wsuri;
}
}
private Service okcoin = new Service();
public Service getOkcoin() {
return okcoin;
}
public void setOkcoin(Service okcoin) {
this.okcoin = okcoin;
}
}
FixApi.java:
package com.hx98.server.api.okcoin;
import java.util.concurrent.CountDownLatch;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Handler;
import org.apache.camel.Producer;
import org.apache.camel.builder.PredicateBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
import org.apache.log4j.Logger;
import com.hx98.server.api.FinanceFetchBaseApi;
import com.hx98.server.config.QFixConfig;
import com.hx98.server.transform.QuickfixjEventJsonTransformer;
import com.hx98.server.utils.CountDownLatchDecrementer;
import quickfix.ConfigError;
import quickfix.FieldNotFound;
import quickfix.Message;
import quickfix.RejectLogon;
import quickfix.field.MsgType;
import quickfix.field.RawData;
import quickfix.fix44.Logon;
import quickfix.fix44.MarketDataRequest;
public class FixApi extends RouteBuilder {
private static final Logger log = Logger.getLogger(FixApi.class);
final CountDownLatch logoutLatch = new CountDownLatch(1);
private CamelContext camelContext;
// @Bean
// String myBean() {
// return "I'm Spring bean!";
// }
@Override
public void configure() {
final String tradeName = getOkcoinConfig().getCfg();
camelContext = getContext();
// from("timer:trigger")
// .transform().simple("ref:myBean")
// .to("log:out");
try {
from("quickfix:" + tradeName + "/inprocess.cfg")
.filter(PredicateBuilder.and(
header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
.isEqualTo(QuickfixjEventCategory.AdminMessageSent),
header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON)))
.bean(new CredentialInjector());
from("quickfix:" + tradeName + "/inprocess.cfg")
.filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
.isEqualTo(QuickfixjEventCategory.SessionLogoff))
.bean(new CountDownLatchDecrementer("logout", logoutLatch));
from("quickfix:" + tradeName + "/inprocess.cfg")
.filter(PredicateBuilder.or(
header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
.isEqualTo(QuickfixjEventCategory.AdminMessageSent),
header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
.isEqualTo(QuickfixjEventCategory.AppMessageSent),
header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
.isEqualTo(QuickfixjEventCategory.AdminMessageReceived),
header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
.isEqualTo(QuickfixjEventCategory.AppMessageReceived)))
.bean(new QuickfixjEventJsonTransformer()).to("log:routing");
from("quickfix:" + tradeName + "/inprocess.cfg")
.filter(PredicateBuilder.and(
header(QuickfixjEndpoint.EVENT_CATEGORY_KEY)
.isEqualTo(QuickfixjEventCategory.AdminMessageReceived),
header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON)))
.bean(new LogonAuthenticator());
from("quickfix:" + tradeName + "/inprocess.cfg").filter(routeBuilder
.header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon))
.bean(new SessionLogon());
from("quickfix:" + tradeName + "/inprocess.cfg")
.filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY)
.isEqualTo(MsgType.MARKET_DATA_SNAPSHOT_FULL_REFRESH))
.bean(new QuickfixjEventJsonTransformer())
.to("rabbitmq:rmq-srv/finance.quote")
//.to("websocket://0.0.0.0:8082/jfix")
;
} catch (ConfigError e) {
e.printStackTrace();
}
}
private static Integer curRqtId = null;
private String generateRequestId() {
QFixConfig.Service okcoinConfig = getOkcoinConfig();
if (curRqtId == null) {
curRqtId = okcoinConfig.getMinStartRqtId();
}
Integer ret = curRqtId++;
if (curRqtId > okcoinConfig.getMaxStartRqtId())
curRqtId = okcoinConfig.getMinStartRqtId();
return ret.toString();
}
public static class LogonAuthenticator {
@Handler
public void authenticate(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound {
log.info("LogonAuthenticator Acceptor is logon for "
+ exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY));
Message message = exchange.getIn().getMandatoryBody(Message.class);
if (message.isSetField(RawData.FIELD)) {
log.info("LogonAuthenticator body: " + message.getString(RawData.FIELD));
}
}
}
public class SessionLogon {
@Handler
public void logon(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound {
log.info("logon is received!");
MarketDataRequest marketDataRequest = new MarketDataRequest(new quickfix.field.MDReqID(generateRequestId()),
new quickfix.field.SubscriptionRequestType(
quickfix.field.SubscriptionRequestType.SNAPSHOT_PLUS_UPDATES),
new quickfix.field.MarketDepth(1));
final char[] fields = new char[] { quickfix.field.MDEntryType.OPENING_PRICE,
quickfix.field.MDEntryType.CLOSING_PRICE, quickfix.field.MDEntryType.TRADING_SESSION_HIGH_PRICE,
quickfix.field.MDEntryType.TRADING_SESSION_LOW_PRICE,
quickfix.field.MDEntryType.TRADING_SESSION_VWAP_PRICE, quickfix.field.MDEntryType.TRADE_VOLUME };
MarketDataRequest.NoMDEntryTypes noMDEntryTypes = new MarketDataRequest.NoMDEntryTypes();
for (char f : fields) {
noMDEntryTypes.set(new quickfix.field.MDEntryType(f));
marketDataRequest.addGroup(noMDEntryTypes);
}
final String[] symbols = new String[] { "BTC/CNY" }; // , "LTC/CNY"
// only 1
MarketDataRequest.NoRelatedSym noRelatedSym = new MarketDataRequest.NoRelatedSym();
for (String symbol : symbols) {
noRelatedSym.setField(new quickfix.field.Symbol(symbol));
marketDataRequest.addGroup(noRelatedSym);
}
marketDataRequest.setField(new quickfix.field.MDUpdateType(quickfix.field.MDUpdateType.FULL_REFRESH));
// exchange.getOut().setBody(marketDataRequest);
// above row can not worked.
// below is create one new exchange messge and send messge to
// server.
send(marketDataRequest);
}
}
private Endpoint requestEndpoint;
private Producer requestProducer;
public void send(Message message) {
try {
QFixConfig.Service okcoinConfig = getOkcoinConfig();
if (requestEndpoint == null) {
String marketUri = "quickfix:" + okcoinConfig.getCfg() + "/inprocess.cfg" + okcoinConfig.getSessionID();
requestEndpoint = camelContext.getEndpoint(marketUri);
}
if (requestProducer == null) {
requestProducer = requestEndpoint.createProducer();
}
Exchange requestExchange = requestEndpoint.createExchange(ExchangePattern.InOnly);
requestExchange.getIn().setBody(message);
requestProducer.process(requestExchange);
} catch (Exception e) {
e.printStackTrace();
}
}
public class CredentialInjector {
@Handler
public void inject(Exchange exchange) throws CamelExchangeException {
QFixConfig.Service okcoinConfig = getOkcoinConfig();
log.info("Injecting password into outgoing logon message");
Message message = exchange.getIn().getMandatoryBody(Message.class);
Logon logon = (Logon) message;
String s;
s = okcoinConfig.getUserName();
if ((s != null) && (!s.isEmpty()))
logon.setField(new quickfix.field.Username(s));
s = okcoinConfig.getPassword();
if ((s != null) && (!s.isEmpty()))
logon.setField(new quickfix.field.Password(s));
}
}
}
QuickfixjEventJsonTransformer.java:
package com.hx98.server.transform;
import org.apache.camel.Exchange;
import org.apache.camel.Handler;
import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
import quickfix.ConfigError;
import quickfix.DataDictionary;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
public class QuickfixjEventJsonTransformer {
private final QuickfixjMessageJsonTransformer renderer;
public QuickfixjEventJsonTransformer() throws ConfigError {
renderer = new QuickfixjMessageJsonTransformer();
}
@Handler
public String transform(Exchange exchange) {
SessionID sessionID = exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY, SessionID.class);
Session session = Session.lookupSession(sessionID);
DataDictionary dataDictionary = session.getDataDictionary();
if (dataDictionary == null) {
throw new IllegalStateException("No Data Dictionary. Exchange must reference an existing session");
}
StringBuilder sb = new StringBuilder();
sb.append("\"event\": {\n");
org.apache.camel.Message in = exchange.getIn();
for (String key : in.getHeaders().keySet()) {
sb.append(" \"").append(key).append("\": ").append(in.getHeader(key)).append(",\n");
}
sb.append(renderer.transform(in.getBody(Message.class), " ", dataDictionary)).append("\n");
sb.append("}\n");
return sb.toString();
}
}
Last:
Thanks!
Comments