Topic
  • 6 replies
  • Latest Post - ‏2013-02-04T19:22:17Z by Jim Sharpe
Jim Sharpe
Jim Sharpe
98 Posts

Pinned topic Parse operator

‏2013-02-04T02:11:18Z |
I’m trying to use the new Parse operator process a stream of lines to which I’ve previously prepended a delay field (which if I can get this to work, will be calculated from timestamps included in the actual line data).

After reviewing the documentation for the Parse operator (and FileSource for the hasDelayField parameter) I still haven’t quite figured out a working combination. Here is the latest version I've tried:

// convert rstring lines with the delay attribute already prepended into blobs so the Parse operator can use them
stream<blob lineAsBlob> LinesWDB = Functor(LinesWithDelay)
{
output
LinesWDB : lineAsBlob = convertToBlob(LinesWithDelay.lineWD) ;
}

stream<rstring line> ThrottledData = Parse (LinesWDB)
{
param
format : csv ;
parseInput : LinesWDB.lineAsBlob ;
hasDelayField : true ;
}

With test input that looks like this:

1,123,456
1,123,456
1,123,456
1,123,456
1,123,456
1,123,456

I get this runtime exception:

CDISR5029E: During the read of tuple number 1, the \n character was expected, but the , character was found.

Why is it looking for a newline with a csv format? Did I misunderstand how the Parse operator should be invoked?

Any help would be greatly appreciated.
  • hnasgaard
    hnasgaard
    200 Posts

    Re: Parse operator

    ‏2013-02-04T12:30:56Z  
    I think what is happening is it is looking for a newline after the first token on the line because your schema says there is only one, to be interpreted as an rstring. Try the same code with
    
    stream<int32i, int32 j, int32 k>
    
    and see what happens.
  • Jim Sharpe
    Jim Sharpe
    98 Posts

    Re: Parse operator

    ‏2013-02-04T14:21:24Z  
    • hnasgaard
    • ‏2013-02-04T12:30:56Z
    I think what is happening is it is looking for a newline after the first token on the line because your schema says there is only one, to be interpreted as an rstring. Try the same code with <pre class="jive-pre"> stream<int32i, int32 j, int32 k> </pre> and see what happens.
    Just tried it but unfortunately got the same exception.
  • hnasgaard
    hnasgaard
    200 Posts

    Re: Parse operator

    ‏2013-02-04T15:51:14Z  
    Just tried it but unfortunately got the same exception.
    I was able to write a small sample that reproduced the problem you are seeing. First, you should have two attributes on the output stream (I mistook the first for data rather than the delay). Second, within your blob, each logical line should have a newline at the end, otherwise the parse doesn't know where it ends. Here is the sample I used:
    
    composite Main 
    { graph stream<blob b> Src = Beacon() 
    { param iterations : 10; output Src : b = convertToBlob(
    "1,123,456\n"); 
    } stream<int32 i, int32 j> O = Parse(Src) 
    { param format : csv; hasDelayField : 
    
    true; 
    } () as P = Custom(O) 
    { logic onTuple O : println(O); 
    } 
    }
    

    Without the '\n' in the string in convertToBlob I got the same error. The Parse operator logically concatenates the incoming data, and, since it is parsing a csv format, it expects that data to appear like it would in a csv file, with lines containing comma separated values.

    Hope that helps.
  • Jim Sharpe
    Jim Sharpe
    98 Posts

    Re: Parse operator

    ‏2013-02-04T18:06:47Z  
    • hnasgaard
    • ‏2013-02-04T15:51:14Z
    I was able to write a small sample that reproduced the problem you are seeing. First, you should have two attributes on the output stream (I mistook the first for data rather than the delay). Second, within your blob, each logical line should have a newline at the end, otherwise the parse doesn't know where it ends. Here is the sample I used:
    <pre class="jive-pre"> composite Main { graph stream<blob b> Src = Beacon() { param iterations : 10; output Src : b = convertToBlob( "1,123,456\n"); } stream<int32 i, int32 j> O = Parse(Src) { param format : csv; hasDelayField : true; } () as P = Custom(O) { logic onTuple O : println(O); } } </pre>
    Without the '\n' in the string in convertToBlob I got the same error. The Parse operator logically concatenates the incoming data, and, since it is parsing a csv format, it expects that data to appear like it would in a csv file, with lines containing comma separated values.

    Hope that helps.
    Thanks that did help immensely. It's working fine now even on the real data where the lines contain the timestamps, although I did have to deal with an issue of embedded commas.

    However, before I mark this question closed I'd like to get a better understanding of the limitations of using the hasDelayField capability of the Parse operator. When using it with the FileSource operator it intuitively makes sense that when the output is significantly delayed it can stop reading from the file until it "catches up". However, with the Parse operator what happens when the rate of tuples being sent to the output port is much slower than it is made available to the input port. Will backpressure somehow trickle up through the upstream operators to an initial source operator which will then limit it's ingestion rate, or will it attempt to buffer internally until it potentially runs out of memory, or does it do something else entirely?
  • hnasgaard
    hnasgaard
    200 Posts

    Re: Parse operator

    ‏2013-02-04T18:27:01Z  
    Thanks that did help immensely. It's working fine now even on the real data where the lines contain the timestamps, although I did have to deal with an issue of embedded commas.

    However, before I mark this question closed I'd like to get a better understanding of the limitations of using the hasDelayField capability of the Parse operator. When using it with the FileSource operator it intuitively makes sense that when the output is significantly delayed it can stop reading from the file until it "catches up". However, with the Parse operator what happens when the rate of tuples being sent to the output port is much slower than it is made available to the input port. Will backpressure somehow trickle up through the upstream operators to an initial source operator which will then limit it's ingestion rate, or will it attempt to buffer internally until it potentially runs out of memory, or does it do something else entirely?
    Looking at the implementation I think the answer is, it will buffer until it runs out of memory. Basically it maintains a buffer and the incoming data is appended. The Parse code simply loops, extracting a tuple worth, waiting for the require delay, and then doing the submit. If the incoming rate is higher than the outgoing rate the buffer will continually grow.
  • Jim Sharpe
    Jim Sharpe
    98 Posts

    Re: Parse operator

    ‏2013-02-04T19:22:17Z  
    • hnasgaard
    • ‏2013-02-04T18:27:01Z
    Looking at the implementation I think the answer is, it will buffer until it runs out of memory. Basically it maintains a buffer and the incoming data is appended. The Parse code simply loops, extracting a tuple worth, waiting for the require delay, and then doing the submit. If the incoming rate is higher than the outgoing rate the buffer will continually grow.
    Thanks. That's what I needed to know and will treat it accordingly.