Topic
IC4NOTICE: developerWorks Community will be offline May 29-30, 2015 while we upgrade to the latest version of IBM Connections. For more information, read our upgrade FAQ.
6 replies Latest Post - ‏2013-02-04T19:22:17Z by Jim Sharpe
Jim Sharpe
Jim Sharpe
98 Posts
ACCEPTED ANSWER

Pinned topic Parse operator

‏2013-02-04T02:11:18Z |
I’m trying to use the new Parse operator process a stream of lines to which I’ve previously prepended a delay field (which if I can get this to work, will be calculated from timestamps included in the actual line data).

After reviewing the documentation for the Parse operator (and FileSource for the hasDelayField parameter) I still haven’t quite figured out a working combination. Here is the latest version I've tried:

// convert rstring lines with the delay attribute already prepended into blobs so the Parse operator can use them
stream<blob lineAsBlob> LinesWDB = Functor(LinesWithDelay)
{
output
LinesWDB : lineAsBlob = convertToBlob(LinesWithDelay.lineWD) ;
}

stream<rstring line> ThrottledData = Parse (LinesWDB)
{
param
format : csv ;
parseInput : LinesWDB.lineAsBlob ;
hasDelayField : true ;
}

With test input that looks like this:

1,123,456
1,123,456
1,123,456
1,123,456
1,123,456
1,123,456

I get this runtime exception:

CDISR5029E: During the read of tuple number 1, the \n character was expected, but the , character was found.

Why is it looking for a newline with a csv format? Did I misunderstand how the Parse operator should be invoked?

Any help would be greatly appreciated.
  • hnasgaard
    hnasgaard
    200 Posts
    ACCEPTED ANSWER

    Re: Parse operator

    ‏2013-02-04T12:30:56Z  in response to Jim Sharpe
    I think what is happening is it is looking for a newline after the first token on the line because your schema says there is only one, to be interpreted as an rstring. Try the same code with
    
    stream<int32i, int32 j, int32 k>
    
    and see what happens.
    • Jim Sharpe
      Jim Sharpe
      98 Posts
      ACCEPTED ANSWER

      Re: Parse operator

      ‏2013-02-04T14:21:24Z  in response to hnasgaard
      Just tried it but unfortunately got the same exception.
      • hnasgaard
        hnasgaard
        200 Posts
        ACCEPTED ANSWER

        Re: Parse operator

        ‏2013-02-04T15:51:14Z  in response to Jim Sharpe
        I was able to write a small sample that reproduced the problem you are seeing. First, you should have two attributes on the output stream (I mistook the first for data rather than the delay). Second, within your blob, each logical line should have a newline at the end, otherwise the parse doesn't know where it ends. Here is the sample I used:
        
        composite Main 
        { graph stream<blob b> Src = Beacon() 
        { param iterations : 10; output Src : b = convertToBlob(
        "1,123,456\n"); 
        } stream<int32 i, int32 j> O = Parse(Src) 
        { param format : csv; hasDelayField : 
        
        true; 
        } () as P = Custom(O) 
        { logic onTuple O : println(O); 
        } 
        }
        

        Without the '\n' in the string in convertToBlob I got the same error. The Parse operator logically concatenates the incoming data, and, since it is parsing a csv format, it expects that data to appear like it would in a csv file, with lines containing comma separated values.

        Hope that helps.
        • Jim Sharpe
          Jim Sharpe
          98 Posts
          ACCEPTED ANSWER

          Re: Parse operator

          ‏2013-02-04T18:06:47Z  in response to hnasgaard
          Thanks that did help immensely. It's working fine now even on the real data where the lines contain the timestamps, although I did have to deal with an issue of embedded commas.

          However, before I mark this question closed I'd like to get a better understanding of the limitations of using the hasDelayField capability of the Parse operator. When using it with the FileSource operator it intuitively makes sense that when the output is significantly delayed it can stop reading from the file until it "catches up". However, with the Parse operator what happens when the rate of tuples being sent to the output port is much slower than it is made available to the input port. Will backpressure somehow trickle up through the upstream operators to an initial source operator which will then limit it's ingestion rate, or will it attempt to buffer internally until it potentially runs out of memory, or does it do something else entirely?
          • hnasgaard
            hnasgaard
            200 Posts
            ACCEPTED ANSWER

            Re: Parse operator

            ‏2013-02-04T18:27:01Z  in response to Jim Sharpe
            Looking at the implementation I think the answer is, it will buffer until it runs out of memory. Basically it maintains a buffer and the incoming data is appended. The Parse code simply loops, extracting a tuple worth, waiting for the require delay, and then doing the submit. If the incoming rate is higher than the outgoing rate the buffer will continually grow.
            • Jim Sharpe
              Jim Sharpe
              98 Posts
              ACCEPTED ANSWER

              Re: Parse operator

              ‏2013-02-04T19:22:17Z  in response to hnasgaard
              Thanks. That's what I needed to know and will treat it accordingly.