Topic
  • 10 replies
  • Latest Post - ‏2013-05-15T05:11:34Z by Frank_Blau
Frank_Blau
Frank_Blau
28 Posts

Pinned topic Problem with Import/Export

‏2013-05-13T20:27:46Z |

I searched around here and couldn't find anything that got me past this not working:

 

I have an export in a composite called PiDemo1 in the PiDemo namespace:

 

() as Export_1 = Export(Filter_1_out0)

{

   param

     streamId : "tempStream"

}

 

and I have an Import in the Composite ImportTemp in the PiDemo namespace:

{

   graph

      (stream<int32 temp> Import_2_Out0) as Import_2 = Import()

      {

         param

           applicationName : "PiDemo::PiDemo1" ;

            streamId : "tempStream" ;

      }

}

 

(there is also a fileSink off the output of the Import)

I get output to the Export (at least it says I do), but I never get anything on the Input stream.

 

Any ideas?

  • Frank_Blau
    Frank_Blau
    28 Posts

    Re: Problem with Import/Export

    ‏2013-05-13T22:01:10Z  

    Here is the full file:

     

    namespace PiDemo ;
     
    public composite PiDemo1
    {
    graph
    stream<rstring tcp_data> TCP_Data = TCPSource()
    {
    param
    role : server ;
    port : 6001u ;
    format : line ;
    initDelay : 7.0 ;
    }
     
    stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines =
    Functor(TCP_Data)
    {
    logic
    state : mutable list<rstring> tokens ;
    onTuple TCP_Data :
    {
    tokens = tokenize(tcp_data, ",", false) ;
    }
     
    output
    tcp_lines : xVal =(int32) tokens [ 0 ], yVal =(int32) tokens [ 1 ],
    zVal =(int32) tokens [ 2 ], temp =(int32) tokens [ 3 ] ;
    }
     
    () as Temp = FileSink(Filter_1_out0)
    {
    param
    file : "line_out" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_1_out0) as Filter_1 = Filter(tcp_lines)
    {
    param
    filter : temp > 70 ;
    }
     
    () as Movement = FileSink(Filter_2_out0)
    {
    param
    file : "movement.csv" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_2_out0) as Filter_2 = Filter(tcp_lines)
    {
    param
    filter :(tcp_lines.xVal + tcp_lines.yVal + tcp_lines.zVal) > 150 ;
    }
     
    () as FileSink_1 = FileSink(TCP_Data)
    {
    param
    file : "allTuples.csv" ;
    format : csv ;
    }
     
    () as Export_1 = Export(Filter_1_out0)
    {
    param
    streamId : "tempStream" ;
    }
     
    }
     
     
     
    composite ImportTemp
    {
    graph
    (stream<int32 temp> Import_2_out0) as Import_2 = Import()
    {
    param
    applicationName : "PiDemo::PiDemo1" ;
    streamId : "tempStream" ;
    }
     
    () as FileSink_3 = FileSink(Import_2_out0)
    {
    param
    file : "TempImport.csv" ;
    }
     
    }
     
     
     
     
  • Senthil_Nathan_NY
    Senthil_Nathan_NY
    5 Posts

    Re: Problem with Import/Export

    ‏2013-05-14T23:17:21Z  

    Here is the full file:

     

    namespace PiDemo ;
     
    public composite PiDemo1
    {
    graph
    stream<rstring tcp_data> TCP_Data = TCPSource()
    {
    param
    role : server ;
    port : 6001u ;
    format : line ;
    initDelay : 7.0 ;
    }
     
    stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines =
    Functor(TCP_Data)
    {
    logic
    state : mutable list<rstring> tokens ;
    onTuple TCP_Data :
    {
    tokens = tokenize(tcp_data, ",", false) ;
    }
     
    output
    tcp_lines : xVal =(int32) tokens [ 0 ], yVal =(int32) tokens [ 1 ],
    zVal =(int32) tokens [ 2 ], temp =(int32) tokens [ 3 ] ;
    }
     
    () as Temp = FileSink(Filter_1_out0)
    {
    param
    file : "line_out" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_1_out0) as Filter_1 = Filter(tcp_lines)
    {
    param
    filter : temp > 70 ;
    }
     
    () as Movement = FileSink(Filter_2_out0)
    {
    param
    file : "movement.csv" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_2_out0) as Filter_2 = Filter(tcp_lines)
    {
    param
    filter :(tcp_lines.xVal + tcp_lines.yVal + tcp_lines.zVal) > 150 ;
    }
     
    () as FileSink_1 = FileSink(TCP_Data)
    {
    param
    file : "allTuples.csv" ;
    format : csv ;
    }
     
    () as Export_1 = Export(Filter_1_out0)
    {
    param
    streamId : "tempStream" ;
    }
     
    }
     
     
     
    composite ImportTemp
    {
    graph
    (stream<int32 temp> Import_2_out0) as Import_2 = Import()
    {
    param
    applicationName : "PiDemo::PiDemo1" ;
    streamId : "tempStream" ;
    }
     
    () as FileSink_3 = FileSink(Import_2_out0)
    {
    param
    file : "TempImport.csv" ;
    }
     
    }
     
     
     
     

    Hi Frank,

    Your are exporting a stream with this schema:   stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines 

    You are importing a stream with this schema:    stream<int32 temp>

     

    This will not work since your exported stream and imported stream use completely two different types. In order for the import/export to work, it is essential that you use exactly the same stream schema on both ends.

     

  • Frank_Blau
    Frank_Blau
    28 Posts

    Re: Problem with Import/Export

    ‏2013-05-14T23:28:30Z  

    Hi Frank,

    Your are exporting a stream with this schema:   stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines 

    You are importing a stream with this schema:    stream<int32 temp>

     

    This will not work since your exported stream and imported stream use completely two different types. In order for the import/export to work, it is essential that you use exactly the same stream schema on both ends.

     

    Thanks for the help.

    I made the import stream identical to the export stream and it still does not pass anything through.

    -----

     
     
    public composite PiDemo1
    {
    graph
    stream<rstring tcp_data> TCP_Data = TCPSource()
    {
    param
    role : server ;
    port : 6001u ;
    format : line ;
    initDelay : 7.0 ;
    }
     
    stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines =
    Functor(TCP_Data)
    {
    logic
    state : mutable list<rstring> tokens ;
    onTuple TCP_Data :
    {
    tokens = tokenize(tcp_data, ",", false) ;
    }
     
    output
    tcp_lines : xVal =(int32) tokens [ 0 ], yVal =(int32) tokens [ 1 ],
    zVal =(int32) tokens [ 2 ], temp =(int32) tokens [ 3 ] ;
    }
     
    () as Temp = FileSink(Filter_1_out0)
    {
    param
    file : "line_out" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_1_out0) as Filter_1 = Filter(tcp_lines)
    {
    param
    filter : temp > 70 ;
    }
     
    () as Movement = FileSink(Filter_2_out0)
    {
    param
    file : "movement.csv" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_2_out0) as Filter_2 = Filter(tcp_lines)
    {
    param
    filter :(tcp_lines.xVal + tcp_lines.yVal + tcp_lines.zVal) > 150 ;
    }
     
    () as FileSink_1 = FileSink(TCP_Data)
    {
    param
    file : "allTuples.csv" ;
    format : csv ;
    }
     
    () as Export_1 = Export(Filter_1_out0)
    {
    param
    streamId : "tempStream" ;
    }
     
    }
     
    composite ImportTemp
    {
    graph
    (stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> Import_2_out0) as Import_2 = Import()
    {
    param
    applicationName : "PiDemo1" ;
    streamId : "tempStream" ;
    }
     
    () as FileSink_3 = FileSink(Import_2_out0)
    {
    param
    file : "TempImport.csv" ;
    }
     
    }
     
     
     
  • Senthil_Nathan_NY
    Senthil_Nathan_NY
    5 Posts

    Re: Problem with Import/Export

    ‏2013-05-14T23:53:06Z  

    Thanks for the help.

    I made the import stream identical to the export stream and it still does not pass anything through.

    -----

     
     
    public composite PiDemo1
    {
    graph
    stream<rstring tcp_data> TCP_Data = TCPSource()
    {
    param
    role : server ;
    port : 6001u ;
    format : line ;
    initDelay : 7.0 ;
    }
     
    stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines =
    Functor(TCP_Data)
    {
    logic
    state : mutable list<rstring> tokens ;
    onTuple TCP_Data :
    {
    tokens = tokenize(tcp_data, ",", false) ;
    }
     
    output
    tcp_lines : xVal =(int32) tokens [ 0 ], yVal =(int32) tokens [ 1 ],
    zVal =(int32) tokens [ 2 ], temp =(int32) tokens [ 3 ] ;
    }
     
    () as Temp = FileSink(Filter_1_out0)
    {
    param
    file : "line_out" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_1_out0) as Filter_1 = Filter(tcp_lines)
    {
    param
    filter : temp > 70 ;
    }
     
    () as Movement = FileSink(Filter_2_out0)
    {
    param
    file : "movement.csv" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_2_out0) as Filter_2 = Filter(tcp_lines)
    {
    param
    filter :(tcp_lines.xVal + tcp_lines.yVal + tcp_lines.zVal) > 150 ;
    }
     
    () as FileSink_1 = FileSink(TCP_Data)
    {
    param
    file : "allTuples.csv" ;
    format : csv ;
    }
     
    () as Export_1 = Export(Filter_1_out0)
    {
    param
    streamId : "tempStream" ;
    }
     
    }
     
    composite ImportTemp
    {
    graph
    (stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> Import_2_out0) as Import_2 = Import()
    {
    param
    applicationName : "PiDemo1" ;
    streamId : "tempStream" ;
    }
     
    () as FileSink_3 = FileSink(Import_2_out0)
    {
    param
    file : "TempImport.csv" ;
    }
     
    }
     
     
     

    Hi Frank,

    Please download the SPL-Beginners-Examples.tar.gz file from this URL:  http://tinyurl.com/3apa97q

    From that collection of examples, please take the 019_import_export_at_work project and run it on your system. Then, tell me whether that example works or not in your Streams environment.

     

  • Frank_Blau
    Frank_Blau
    28 Posts

    Re: Problem with Import/Export

    ‏2013-05-15T00:02:03Z  

    Hi Frank,

    Please download the SPL-Beginners-Examples.tar.gz file from this URL:  http://tinyurl.com/3apa97q

    From that collection of examples, please take the 019_import_export_at_work project and run it on your system. Then, tell me whether that example works or not in your Streams environment.

     

    When I run that all of the exports go green, but the imports stay yellow and no tuples come across.

  • Senthil_Nathan_NY
    Senthil_Nathan_NY
    5 Posts

    Re: Problem with Import/Export

    ‏2013-05-15T00:19:20Z  

    When I run that all of the exports go green, but the imports stay yellow and no tuples come across.

    I just tried the 019_import_export_at_work example. It works fine with multiple export and import operators.

    If you are running it on a slow machine (such as a virtual machine or Linux running on your laptop), before you finish starting both the jobs, tuples would have been read and passed through the system. Please search for initDelay in both the SPL files in this project and replace the value with 25.0.

    That will give enough time for you to start both the jobs in time for the export and import operators to connect with each other and exchange tuples.

  • Frank_Blau
    Frank_Blau
    28 Posts

    Re: Problem with Import/Export

    ‏2013-05-15T00:43:32Z  

    I just tried the 019_import_export_at_work example. It works fine with multiple export and import operators.

    If you are running it on a slow machine (such as a virtual machine or Linux running on your laptop), before you finish starting both the jobs, tuples would have been read and passed through the system. Please search for initDelay in both the SPL files in this project and replace the value with 25.0.

    That will give enough time for you to start both the jobs in time for the export and import operators to connect with each other and exchange tuples.

    OK, putting a 25 delay on the example one worked, so I put a 25 sec delay on my TCPSource and tried it and it still doesn't make it to the import. (the import and exports both go green though, just nothing on the other side of the import).\

    I wonder if this could have something to do with the TCPSource ... does it need some punctuation or something to trigger an export?

  • Senthil_Nathan_NY
    Senthil_Nathan_NY
    5 Posts

    Re: Problem with Import/Export

    ‏2013-05-15T01:05:10Z  

    OK, putting a 25 delay on the example one worked, so I put a 25 sec delay on my TCPSource and tried it and it still doesn't make it to the import. (the import and exports both go green though, just nothing on the other side of the import).\

    I wonder if this could have something to do with the TCPSource ... does it need some punctuation or something to trigger an export?

    In the Streams Instance graph, could you please hover your mouse over the two Filter operators and see if they are getting any tuples on their respective input port.

    If you see tuples reaching the Filter operators, then try the following.

    1) Comment out the TCPSource operator code.

    2) in place of that commented out TCPSource operator, add the following code snippet after you verify it for correctness.

    stream<rstring tcp_data> TCP_Data = Beacon()
    {
       param
          initDelay : 20.0 ;

          period: 0.5;

          iterations: 100000;

     

       output

          TCP_Data: tcp_date = "47,69,95,98";

    }

    Please see if this change will carry your data to the import operator in the 2nd Streams job.

  • Frank_Blau
    Frank_Blau
    28 Posts

    Re: Problem with Import/Export

    ‏2013-05-15T05:02:56Z  

    In the Streams Instance graph, could you please hover your mouse over the two Filter operators and see if they are getting any tuples on their respective input port.

    If you see tuples reaching the Filter operators, then try the following.

    1) Comment out the TCPSource operator code.

    2) in place of that commented out TCPSource operator, add the following code snippet after you verify it for correctness.

    stream<rstring tcp_data> TCP_Data = Beacon()
    {
       param
          initDelay : 20.0 ;

          period: 0.5;

          iterations: 100000;

     

       output

          TCP_Data: tcp_date = "47,69,95,98";

    }

    Please see if this change will carry your data to the import operator in the 2nd Streams job.

    Hmmm.... I think I got it working.

    The issue seems to be that the export stream was:

    stream<int32 xVal, int32 yVal, int32 zVal, int32 temp>

    and the import stream was:

    stream<int32 temp, int32 xVal, int32 yVal, int32 zVal>

     

    Basically the order was different. That seems to have made the difference. I'll keep testing it some more.

  • Frank_Blau
    Frank_Blau
    28 Posts

    Re: Problem with Import/Export

    ‏2013-05-15T05:11:34Z  

    Hmmm.... I think I got it working.

    The issue seems to be that the export stream was:

    stream<int32 xVal, int32 yVal, int32 zVal, int32 temp>

    and the import stream was:

    stream<int32 temp, int32 xVal, int32 yVal, int32 zVal>

     

    Basically the order was different. That seems to have made the difference. I'll keep testing it some more.

    So one way you can tell this right away... is that in the Instance Graph, if the connection is valid between the export and the import operator, there will be a solid green line between them. When it wasn't working, there was no line. I didn't know that I should be expecting one between apps, but there you have it!

    Sounds like a good Certification Test question! :)