IBM Streams 4.3.0

VWAP サンプル

出来高加重平均価格 (VWAP) は金融取引における一般的な計算法です。SPL の機能を示すために、VWAP を使用した例を示します。

namespace vwap;
public composite VwapMain {
param
  expression<set<rstring>> $monitoredTickers : { "IBM", "GOOG", "MSFT" };

type
  TradeInfoT  = decimal64 price, decimal64 volume;
  QuoteInfoT  = decimal64 bidprice, decimal64 askprice, decimal64 asksize;
  TradeQuoteT = TradeInfoT, QuoteInfoT,
               tuple<rstring ticker, rstring dayAndTime, rstring ttype>;
  TradeFilterT= TradeInfoT, tuple<timestamp ts, rstring ticker>;
  QuoteFilterT= QuoteInfoT, tuple<timestamp ts, rstring ticker>;
  VwapT       = rstring ticker, decimal64 minprice, decimal64 maxprice,
               decimal64 avgprice, decimal64 vwap;

graph
  stream<TradeQuoteT> TradeQuote = FileSource() {
    param file        : "TradesAndQuotes.csv.gz";
         format       : csv;
         compression  : zlib;
  }

  stream<TradeFilterT> TradeFilter = Functor(TradeQuote) {
    param  filter      : ttype=="Trade" && (ticker in $monitoredTickers);
    output TradeFilter : ts = toTimestamp(Sys.YYYYMMDDhhmmss,dayAndTime);
  }

  stream<QuoteFilterT> QuoteFilter = Functor(TradeQuote) {
    param filter : ttype=="Quote" && (ticker in $monitoredTickers);
    output QuoteFilter : ts = toTimestamp(Sys.YYYYMMDDhhmmss, dayAndTime);
  }

  stream<VwapT, tuple<decimal64 sumvolume>> PreVwap = Aggregate(TradeFilter) {
    window TradeFilter : sliding, count(4), count(1), partitioned;
    param  partitionBy : ticker;
    output PreVwap     : ticker   = Any(ticker), vwap = Sum(price*volume),
                        minprice = Min(price),  maxprice = Max(price),
                        avgprice = Average(price),  sumvolume = Sum(volume);
  }

  stream<VwapT> Vwap = Functor(PreVwap) {
    output Vwap : vwap = vwap / sumvolume;
  }

  stream<timestamp ts, decimal64 index>
    BargainIndex = Join(Vwap as V; QuoteFilter as Q)
  {
    window V             : sliding, count(1), partitioned;
           Q             : sliding, count(0); //empty window (one-sided join)
    param  equalityLHS   : V.ticker; // can also be written as nested loop join:
          equalityRHS    : Q.ticker; // "condition : V.ticker == Q.ticker"
          partitionByLHS : V.ticker; // windowing confs apply to each paritition
    output BargainIndex  : 
      index = vwap > askprice*100d ? asksize*exp(vwap-askprice*100d) : 0d;
  }

  () as SinkOp = FileSink(BargainIndex)
  {
    param
      file : "out";
      format : txt;
  }

config
  logLevel: trace;
}