Topic
1 reply Latest Post - ‏2014-02-01T22:02:58Z by SnehalAntani
sree666
sree666
1 Post
ACCEPTED ANSWER

Pinned topic Has any one written Custom BDS Stream by implementing BatchDataStream?

‏2013-04-22T23:43:06Z |

Hello Everyone

I would like to know if anyone has implemented and using custom BDS Stream by implementing BatchDataStream.

I have created one but I want to override void intialize(Properties props) method like we do in any other BDS Pattern. This is because I want to read some values from the xJCL BDS properties.

Please let me know if anyone of you had implemented a Custom BDS, I need this because I need to read and write to a Service.

  • SnehalAntani
    SnehalAntani
    1 Post
    ACCEPTED ANSWER

    Re: Has any one written Custom BDS Stream by implementing BatchDataStream?

    ‏2014-02-01T22:02:58Z  in response to sree666

    Yup... i've written plenty... it's pretty straightforward. I would encourage you to look at leveraging some of the framework services in the batch datastream framework as an accelerator. Here's an example for writing stock data to MongoDB: 

     

    xJCL

    <bds>

    <logical-name>outputStream</logical-name>

    <props>

    <prop name="IMPLCLASS" value="com.ssa.stocks.writers.StockWriter"/>

            <prop name="file.encoding" value="${fileEncoding}"/>

            <prop name="FILENAME" value="${outputDataStream}" />

            <prop name="dbname" value="${dbname}"/>

            <prop name="hostname" value="${hostname}"/>

            <prop name="port" value="${port}"/>

            <prop name="debug" value="${debugEnabled}"/>

            <prop name="EnablePerformanceMeasurement" value="${perfEnabled}"/>

    </props>

    <impl-class>${supportclassOut}</impl-class>

    </bds>

     

    code

    package com.ssa.stocks.writers;
     
    import java.net.UnknownHostException;
    import java.util.ArrayList;
    import java.util.Properties;
     
    import com.ibm.websphere.batch.BatchContainerDataStreamException;
    import com.ibm.websphere.batch.devframework.datastreams.bdsadapter.AbstractBatchDataOutputStream;
    import com.ibm.websphere.batch.devframework.datastreams.bdsadapter.AbstractBatchDataStream;
    import com.mongodb.BasicDBObject;
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.DBObject;
    import com.mongodb.MongoClient;
    import com.ssa.stocks.domobjs.Stock;
     
    public class BDSMongoDBWriter extends AbstractBatchDataStream implements
    AbstractBatchDataOutputStream {
     
    public static final String DBNAME = "dbname";
    public static final String HOSTNAME = "hostname";
    public static final String PORT = "port";
    public static final String COLLECTION = "stockCollection";
     
    protected Properties props;
    protected String dbname;
    protected String hostname;
    protected int port; 
    protected MongoClient mongoClient;
    protected DB mongodb;
    protected DBCollection collection;
    protected ArrayList<DBObject> list;
    protected String lastDocInserted;
     
     
    protected void validateProperties() throws IllegalArgumentException
    {
    // Add security parameters later. 
     
    dbname = props.getProperty(BDSMongoDBWriter.DBNAME);
    if (dbname == null)
    throw new IllegalArgumentException("MongoBDS: required property dbname is not specified");
     
    hostname = props.getProperty(BDSMongoDBWriter.HOSTNAME);
    if (hostname == null)
    throw new IllegalArgumentException("MongoBDS: required property hostname is not specified");
     
    String tempPort = props.getProperty(BDSMongoDBWriter.PORT);
    if (tempPort == null)
    throw new IllegalArgumentException("MongoBDS: required property port is not specified");
     
    try
    {
    port = Integer.valueOf(tempPort).intValue();
    }
    catch (Exception e)
    {
    throw new IllegalArgumentException("MongoBDS: required property port is not a valid integer");
    }
     
    }
     
    protected void initialize(Properties props) {
    this.props = props;
    validateProperties();
    list = new ArrayList<DBObject>();
    }
     
    public void open() throws BatchContainerDataStreamException {
    try 
    {
    mongoClient = new MongoClient(hostname, port);
    mongodb = mongoClient.getDB(dbname);
    collection = mongodb.getCollection(BDSMongoDBWriter.COLLECTION);
    catch (UnknownHostException e) 
    {
    throw new BatchContainerDataStreamException(e);
    }
    }
     
    protected void bulkInsert()
    {
    collection.insert(list);
    // MongoDB doesn't support bulk updates :(
    list = new ArrayList<DBObject>();
    }
     
    // SSA: i need to rethink the Stock domain object. i'm running into casting issues (stock, DBObject) as 
    // i switch from textfilewriter to mongodbWriter
     
    public void write(Object record) throws Exception {
    MongoDBObject data = (MongoDBObject) record;
    Stock stock = (Stock) record;
     
     
    BasicDBObject query = new BasicDBObject();
    query.append(Stock.SYMBOL, data.getKey());
     
    DBObject doc = new BasicDBObject("$set",data.getDBObject());
    collection.update(query, doc, true, false);
     
    BasicDBObject update = new BasicDBObject("$push", stock.getScore());
    collection.update(query,  update);
     
     
    //list.add(doc); mongodb doesn't support bulk updates.
     
    lastDocInserted = doc.toString();
    }
     
    public String externalizeCheckpointInformation() {
    //MongoDB doesn't support bulk updates, only bulk inserts :(
    //bulkInsert(); 
    return lastDocInserted;
    }
     
    public void close() throws BatchContainerDataStreamException {
    mongoClient.close();
    }
     
     
    public void flush() throws Exception {
    bulkInsert();
    }
     
    public void intermediateCheckpoint() {
    // No Op
    }
     
    public void internalizeCheckpointInformation(String arg0) {
    // No Op
    }
     
    public void positionAtCurrentCheckpoint()
    throws BatchContainerDataStreamException {
    // No Op
    }
     
    public void positionAtInitialCheckpoint()
    throws BatchContainerDataStreamException {
    // No Op
    }
     
    public void writeHeader(Object record) throws Exception {
    // No Op
    }
    }