IBM Streams 4.3.0
VWAP サンプル
出来高加重平均価格 (VWAP) は金融取引における一般的な計算法です。SPL の機能を示すために、VWAP を使用した例を示します。
namespace vwap;
public composite VwapMain {
param
expression<set<rstring>> $monitoredTickers : { "IBM", "GOOG", "MSFT" };
type
TradeInfoT = decimal64 price, decimal64 volume;
QuoteInfoT = decimal64 bidprice, decimal64 askprice, decimal64 asksize;
TradeQuoteT = TradeInfoT, QuoteInfoT,
tuple<rstring ticker, rstring dayAndTime, rstring ttype>;
TradeFilterT= TradeInfoT, tuple<timestamp ts, rstring ticker>;
QuoteFilterT= QuoteInfoT, tuple<timestamp ts, rstring ticker>;
VwapT = rstring ticker, decimal64 minprice, decimal64 maxprice,
decimal64 avgprice, decimal64 vwap;
graph
stream<TradeQuoteT> TradeQuote = FileSource() {
param file : "TradesAndQuotes.csv.gz";
format : csv;
compression : zlib;
}
stream<TradeFilterT> TradeFilter = Functor(TradeQuote) {
param filter : ttype=="Trade" && (ticker in $monitoredTickers);
output TradeFilter : ts = toTimestamp(Sys.YYYYMMDDhhmmss,dayAndTime);
}
stream<QuoteFilterT> QuoteFilter = Functor(TradeQuote) {
param filter : ttype=="Quote" && (ticker in $monitoredTickers);
output QuoteFilter : ts = toTimestamp(Sys.YYYYMMDDhhmmss, dayAndTime);
}
stream<VwapT, tuple<decimal64 sumvolume>> PreVwap = Aggregate(TradeFilter) {
window TradeFilter : sliding, count(4), count(1), partitioned;
param partitionBy : ticker;
output PreVwap : ticker = Any(ticker), vwap = Sum(price*volume),
minprice = Min(price), maxprice = Max(price),
avgprice = Average(price), sumvolume = Sum(volume);
}
stream<VwapT> Vwap = Functor(PreVwap) {
output Vwap : vwap = vwap / sumvolume;
}
stream<timestamp ts, decimal64 index>
BargainIndex = Join(Vwap as V; QuoteFilter as Q)
{
window V : sliding, count(1), partitioned;
Q : sliding, count(0); //empty window (one-sided join)
param equalityLHS : V.ticker; // can also be written as nested loop join:
equalityRHS : Q.ticker; // "condition : V.ticker == Q.ticker"
partitionByLHS : V.ticker; // windowing confs apply to each paritition
output BargainIndex :
index = vwap > askprice*100d ? asksize*exp(vwap-askprice*100d) : 0d;
}
() as SinkOp = FileSink(BargainIndex)
{
param
file : "out";
format : txt;
}
config
logLevel: trace;
}