Topic
10 replies Latest Post - ‏2013-05-15T05:11:34Z by Frank_Blau
Frank_Blau
Frank_Blau
28 Posts
ACCEPTED ANSWER

Pinned topic Problem with Import/Export

‏2013-05-13T20:27:46Z |

I searched around here and couldn't find anything that got me past this not working:

 

I have an export in a composite called PiDemo1 in the PiDemo namespace:

 

() as Export_1 = Export(Filter_1_out0)

{

   param

     streamId : "tempStream"

}

 

and I have an Import in the Composite ImportTemp in the PiDemo namespace:

{

   graph

      (stream<int32 temp> Import_2_Out0) as Import_2 = Import()

      {

         param

           applicationName : "PiDemo::PiDemo1" ;

            streamId : "tempStream" ;

      }

}

 

(there is also a fileSink off the output of the Import)

I get output to the Export (at least it says I do), but I never get anything on the Input stream.

 

Any ideas?

  • Frank_Blau
    Frank_Blau
    28 Posts
    ACCEPTED ANSWER

    Re: Problem with Import/Export

    ‏2013-05-13T22:01:10Z  in response to Frank_Blau

    Here is the full file:

     

    namespace PiDemo ;
     
    public composite PiDemo1
    {
    graph
    stream<rstring tcp_data> TCP_Data = TCPSource()
    {
    param
    role : server ;
    port : 6001u ;
    format : line ;
    initDelay : 7.0 ;
    }
     
    stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines =
    Functor(TCP_Data)
    {
    logic
    state : mutable list<rstring> tokens ;
    onTuple TCP_Data :
    {
    tokens = tokenize(tcp_data, ",", false) ;
    }
     
    output
    tcp_lines : xVal =(int32) tokens [ 0 ], yVal =(int32) tokens [ 1 ],
    zVal =(int32) tokens [ 2 ], temp =(int32) tokens [ 3 ] ;
    }
     
    () as Temp = FileSink(Filter_1_out0)
    {
    param
    file : "line_out" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_1_out0) as Filter_1 = Filter(tcp_lines)
    {
    param
    filter : temp > 70 ;
    }
     
    () as Movement = FileSink(Filter_2_out0)
    {
    param
    file : "movement.csv" ;
    format : csv ;
    }
     
    (stream<tcp_lines> Filter_2_out0) as Filter_2 = Filter(tcp_lines)
    {
    param
    filter :(tcp_lines.xVal + tcp_lines.yVal + tcp_lines.zVal) > 150 ;
    }
     
    () as FileSink_1 = FileSink(TCP_Data)
    {
    param
    file : "allTuples.csv" ;
    format : csv ;
    }
     
    () as Export_1 = Export(Filter_1_out0)
    {
    param
    streamId : "tempStream" ;
    }
     
    }
     
     
     
    composite ImportTemp
    {
    graph
    (stream<int32 temp> Import_2_out0) as Import_2 = Import()
    {
    param
    applicationName : "PiDemo::PiDemo1" ;
    streamId : "tempStream" ;
    }
     
    () as FileSink_3 = FileSink(Import_2_out0)
    {
    param
    file : "TempImport.csv" ;
    }
     
    }
     
     
     
     
    • Senthil_Nathan_NY
      Senthil_Nathan_NY
      5 Posts
      ACCEPTED ANSWER

      Re: Problem with Import/Export

      ‏2013-05-14T23:17:21Z  in response to Frank_Blau

      Hi Frank,

      Your are exporting a stream with this schema:   stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines 

      You are importing a stream with this schema:    stream<int32 temp>

       

      This will not work since your exported stream and imported stream use completely two different types. In order for the import/export to work, it is essential that you use exactly the same stream schema on both ends.

       

      • Frank_Blau
        Frank_Blau
        28 Posts
        ACCEPTED ANSWER

        Re: Problem with Import/Export

        ‏2013-05-14T23:28:30Z  in response to Senthil_Nathan_NY

        Thanks for the help.

        I made the import stream identical to the export stream and it still does not pass anything through.

        -----

         
         
        public composite PiDemo1
        {
        graph
        stream<rstring tcp_data> TCP_Data = TCPSource()
        {
        param
        role : server ;
        port : 6001u ;
        format : line ;
        initDelay : 7.0 ;
        }
         
        stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> tcp_lines =
        Functor(TCP_Data)
        {
        logic
        state : mutable list<rstring> tokens ;
        onTuple TCP_Data :
        {
        tokens = tokenize(tcp_data, ",", false) ;
        }
         
        output
        tcp_lines : xVal =(int32) tokens [ 0 ], yVal =(int32) tokens [ 1 ],
        zVal =(int32) tokens [ 2 ], temp =(int32) tokens [ 3 ] ;
        }
         
        () as Temp = FileSink(Filter_1_out0)
        {
        param
        file : "line_out" ;
        format : csv ;
        }
         
        (stream<tcp_lines> Filter_1_out0) as Filter_1 = Filter(tcp_lines)
        {
        param
        filter : temp > 70 ;
        }
         
        () as Movement = FileSink(Filter_2_out0)
        {
        param
        file : "movement.csv" ;
        format : csv ;
        }
         
        (stream<tcp_lines> Filter_2_out0) as Filter_2 = Filter(tcp_lines)
        {
        param
        filter :(tcp_lines.xVal + tcp_lines.yVal + tcp_lines.zVal) > 150 ;
        }
         
        () as FileSink_1 = FileSink(TCP_Data)
        {
        param
        file : "allTuples.csv" ;
        format : csv ;
        }
         
        () as Export_1 = Export(Filter_1_out0)
        {
        param
        streamId : "tempStream" ;
        }
         
        }
         
        composite ImportTemp
        {
        graph
        (stream<int32 xVal, int32 yVal, int32 zVal, int32 temp> Import_2_out0) as Import_2 = Import()
        {
        param
        applicationName : "PiDemo1" ;
        streamId : "tempStream" ;
        }
         
        () as FileSink_3 = FileSink(Import_2_out0)
        {
        param
        file : "TempImport.csv" ;
        }
         
        }
         
         
         
        • Senthil_Nathan_NY
          Senthil_Nathan_NY
          5 Posts
          ACCEPTED ANSWER

          Re: Problem with Import/Export

          ‏2013-05-14T23:53:06Z  in response to Frank_Blau

          Hi Frank,

          Please download the SPL-Beginners-Examples.tar.gz file from this URL:  http://tinyurl.com/3apa97q

          From that collection of examples, please take the 019_import_export_at_work project and run it on your system. Then, tell me whether that example works or not in your Streams environment.

           

          • Frank_Blau
            Frank_Blau
            28 Posts
            ACCEPTED ANSWER

            Re: Problem with Import/Export

            ‏2013-05-15T00:02:03Z  in response to Senthil_Nathan_NY

            When I run that all of the exports go green, but the imports stay yellow and no tuples come across.

            • Senthil_Nathan_NY
              Senthil_Nathan_NY
              5 Posts
              ACCEPTED ANSWER

              Re: Problem with Import/Export

              ‏2013-05-15T00:19:20Z  in response to Frank_Blau

              I just tried the 019_import_export_at_work example. It works fine with multiple export and import operators.

              If you are running it on a slow machine (such as a virtual machine or Linux running on your laptop), before you finish starting both the jobs, tuples would have been read and passed through the system. Please search for initDelay in both the SPL files in this project and replace the value with 25.0.

              That will give enough time for you to start both the jobs in time for the export and import operators to connect with each other and exchange tuples.

              • Frank_Blau
                Frank_Blau
                28 Posts
                ACCEPTED ANSWER

                Re: Problem with Import/Export

                ‏2013-05-15T00:43:32Z  in response to Senthil_Nathan_NY

                OK, putting a 25 delay on the example one worked, so I put a 25 sec delay on my TCPSource and tried it and it still doesn't make it to the import. (the import and exports both go green though, just nothing on the other side of the import).\

                I wonder if this could have something to do with the TCPSource ... does it need some punctuation or something to trigger an export?

                • Senthil_Nathan_NY
                  Senthil_Nathan_NY
                  5 Posts
                  ACCEPTED ANSWER

                  Re: Problem with Import/Export

                  ‏2013-05-15T01:05:10Z  in response to Frank_Blau

                  In the Streams Instance graph, could you please hover your mouse over the two Filter operators and see if they are getting any tuples on their respective input port.

                  If you see tuples reaching the Filter operators, then try the following.

                  1) Comment out the TCPSource operator code.

                  2) in place of that commented out TCPSource operator, add the following code snippet after you verify it for correctness.

                  stream<rstring tcp_data> TCP_Data = Beacon()
                  {
                     param
                        initDelay : 20.0 ;

                        period: 0.5;

                        iterations: 100000;

                   

                     output

                        TCP_Data: tcp_date = "47,69,95,98";

                  }

                  Please see if this change will carry your data to the import operator in the 2nd Streams job.

                  • Frank_Blau
                    Frank_Blau
                    28 Posts
                    ACCEPTED ANSWER

                    Re: Problem with Import/Export

                    ‏2013-05-15T05:02:56Z  in response to Senthil_Nathan_NY

                    Hmmm.... I think I got it working.

                    The issue seems to be that the export stream was:

                    stream<int32 xVal, int32 yVal, int32 zVal, int32 temp>

                    and the import stream was:

                    stream<int32 temp, int32 xVal, int32 yVal, int32 zVal>

                     

                    Basically the order was different. That seems to have made the difference. I'll keep testing it some more.

                    • Frank_Blau
                      Frank_Blau
                      28 Posts
                      ACCEPTED ANSWER

                      Re: Problem with Import/Export

                      ‏2013-05-15T05:11:34Z  in response to Frank_Blau

                      So one way you can tell this right away... is that in the Instance Graph, if the connection is valid between the export and the import operator, there will be a solid green line between them. When it wasn't working, there was no line. I didn't know that I should be expecting one between apps, but there you have it!

                      Sounds like a good Certification Test question! :)