• 1 reply
  • Latest Post - ‏2014-02-01T22:02:58Z by SnehalAntani
1 Post

Pinned topic Has any one written Custom BDS Stream by implementing BatchDataStream?

‏2013-04-22T23:43:06Z |

Hello Everyone

I would like to know if anyone has implemented and using custom BDS Stream by implementing BatchDataStream.

I have created one but I want to override void intialize(Properties props) method like we do in any other BDS Pattern. This is because I want to read some values from the xJCL BDS properties.

Please let me know if anyone of you had implemented a Custom BDS, I need this because I need to read and write to a Service.

  • SnehalAntani
    1 Post

    Re: Has any one written Custom BDS Stream by implementing BatchDataStream?


    Yup... i've written plenty... it's pretty straightforward. I would encourage you to look at leveraging some of the framework services in the batch datastream framework as an accelerator. Here's an example for writing stock data to MongoDB: 






    <prop name="IMPLCLASS" value="com.ssa.stocks.writers.StockWriter"/>

            <prop name="file.encoding" value="${fileEncoding}"/>

            <prop name="FILENAME" value="${outputDataStream}" />

            <prop name="dbname" value="${dbname}"/>

            <prop name="hostname" value="${hostname}"/>

            <prop name="port" value="${port}"/>

            <prop name="debug" value="${debugEnabled}"/>

            <prop name="EnablePerformanceMeasurement" value="${perfEnabled}"/>






    package com.ssa.stocks.writers;
    import java.util.ArrayList;
    import java.util.Properties;
    import com.mongodb.BasicDBObject;
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.DBObject;
    import com.mongodb.MongoClient;
    import com.ssa.stocks.domobjs.Stock;
    public class BDSMongoDBWriter extends AbstractBatchDataStream implements
    AbstractBatchDataOutputStream {
    public static final String DBNAME = "dbname";
    public static final String HOSTNAME = "hostname";
    public static final String PORT = "port";
    public static final String COLLECTION = "stockCollection";
    protected Properties props;
    protected String dbname;
    protected String hostname;
    protected int port; 
    protected MongoClient mongoClient;
    protected DB mongodb;
    protected DBCollection collection;
    protected ArrayList<DBObject> list;
    protected String lastDocInserted;
    protected void validateProperties() throws IllegalArgumentException
    // Add security parameters later. 
    dbname = props.getProperty(BDSMongoDBWriter.DBNAME);
    if (dbname == null)
    throw new IllegalArgumentException("MongoBDS: required property dbname is not specified");
    hostname = props.getProperty(BDSMongoDBWriter.HOSTNAME);
    if (hostname == null)
    throw new IllegalArgumentException("MongoBDS: required property hostname is not specified");
    String tempPort = props.getProperty(BDSMongoDBWriter.PORT);
    if (tempPort == null)
    throw new IllegalArgumentException("MongoBDS: required property port is not specified");
    port = Integer.valueOf(tempPort).intValue();
    catch (Exception e)
    throw new IllegalArgumentException("MongoBDS: required property port is not a valid integer");
    protected void initialize(Properties props) {
    this.props = props;
    list = new ArrayList<DBObject>();
    public void open() throws BatchContainerDataStreamException {
    mongoClient = new MongoClient(hostname, port);
    mongodb = mongoClient.getDB(dbname);
    collection = mongodb.getCollection(BDSMongoDBWriter.COLLECTION);
    catch (UnknownHostException e) 
    throw new BatchContainerDataStreamException(e);
    protected void bulkInsert()
    // MongoDB doesn't support bulk updates :(
    list = new ArrayList<DBObject>();
    // SSA: i need to rethink the Stock domain object. i'm running into casting issues (stock, DBObject) as 
    // i switch from textfilewriter to mongodbWriter
    public void write(Object record) throws Exception {
    MongoDBObject data = (MongoDBObject) record;
    Stock stock = (Stock) record;
    BasicDBObject query = new BasicDBObject();
    query.append(Stock.SYMBOL, data.getKey());
    DBObject doc = new BasicDBObject("$set",data.getDBObject());
    collection.update(query, doc, true, false);
    BasicDBObject update = new BasicDBObject("$push", stock.getScore());
    collection.update(query,  update);
    //list.add(doc); mongodb doesn't support bulk updates.
    lastDocInserted = doc.toString();
    public String externalizeCheckpointInformation() {
    //MongoDB doesn't support bulk updates, only bulk inserts :(
    return lastDocInserted;
    public void close() throws BatchContainerDataStreamException {
    public void flush() throws Exception {
    public void intermediateCheckpoint() {
    // No Op
    public void internalizeCheckpointInformation(String arg0) {
    // No Op
    public void positionAtCurrentCheckpoint()
    throws BatchContainerDataStreamException {
    // No Op
    public void positionAtInitialCheckpoint()
    throws BatchContainerDataStreamException {
    // No Op
    public void writeHeader(Object record) throws Exception {
    // No Op