DataStage command line integration
Leveraging one of the most used inter-process communication mechanisms in Linux and UNIX®
DataStage jobs are usually run to process data in batches, which are scheduled to run at specific intervals. When there is no specific schedule to follow, the DataStage operator can start the job manually through the DataStage and QualityStage Director client, or at the command line. If the job is run at the command line, you would most likely do it as follows.
dsjob -run -param input_file=/path/to/in_file -param output_file=/path/to/out_file dstage1 job1.
A diagram representing this command is shown in Figure 1.
Figure 1. Invoking a DataStage job
In normal circumstances, the in_file and out_file are stored in a file
system on the machine where DataStage runs. But, in Linux or UNIX, input
and output can be piped in a series of commands. For example, when a
program requires sorting, you can do the following.
command|sort |uniq >
In this case, Figure 2 shows the flow of data, where the output of one
command becomes the input of the next, and the final output is landed in
the file system.
Figure 2. UNIX typical pipe usage
Assuming the intermediate processes produce many millions of lines, you are potentially avoiding landing the intermediate files, thus saving space in the file system and the time to write those files. DataStage jobs do not take standard input through a pipe, like many programs or commands executed in UNIX. This article will describe a method and show the script to make that happen, as well as the practical uses of it.
If the job should accept standard input and produce standard output like a
regular UNIX command, then it would have to be called through a wrapper
script as follows.
Or maybe you will have to send the output to a file such as the following.
command1|piped_ds_job.sh > /path/to/out_file.
The diagram in Figure 3 shows you how the script should be structured.
Figure 3. Wrapper script for a DataStage job
The script will have to convert standard input into a named pipe, and also convert the output file of the DataStage job into standard output. In the next sections, you will learn how to accomplish this.
Developing the DataStage job
The DataStage job does not require any special treatment. For this example, you will create a job to sort a file which, if run normally, would take at least two parameters: input file and output file. However, the job could have more parameters if its function required it, but for this exercise is better to keep it simple.
The job is shown in figure 4.
Figure 4. Simple sort DataStage job
The DSX for this job is available in the downloads section of this article. The job simply takes a text file, treats the full line as a single column, sorts it, and writes to the output file.
Additionally, the job will have to allow multiple instance execution. It should take the input line with no separator and no quotes, and the output file will have the same characteristics.
Writing and using the wrapper script
The wrapper script will contain the code required to create temporary files for the named pipes, and create the command line for invoking the DataStage job (dsjob). Specifically the script will have to perform the following.
- Direct the standard input (this is the output of the command which is piping to it) to a named pipe.
- Make the output of the job to be written to another named pipe that then will be streamed to the standard output of the process so the next command can read the output in a pipe as well.
- Invoke the DataStage job specifying the input file and output file parameters using the file names of the named pipes created earlier.
- Clean up the temporary files created for the named pipes.
Now begin the writing of the wrapper script. The first group of commands will prepare the environment, sourcing the dsenv file from the installation directory and set some variables. You can use the process ID (pid) as the identifier to create a temporary file in a temporary directory, as shown in Listing 1.
Listing 1. Preparing the DataStage environment
#!/bin/bash dshome=`cat /.dshome` . $dshome/dsenv export PATH=$PATH:$DSHOME/bin pid=$$ fifodir=/data/datastage/tmp infname=$fifodir/infname.$pid outfname=$fifodir/outfname.$pid
You can proceed to do the FIFO creation and the dsjob execution. At this point, the job will wait until the pipe starts receiving input. The code warns you if the DataStage job execution has thrown an error, as shown in Listing 2.
Listing 2. Creating the named pipes and invoking the job
mkfifo $infname mkfifo $outfname dsjob -run -param inputFile=$infname \ -param outputFile=$outfname dstage1 ds_sort.$pid 2> /dev/null & if [ $? -ne 0 ]; then echo "error calling DataStage job." rm $infname rm $outfname exit 1 fi
At the end of the dsjob command, you see an ampersand, which is necessary since the job is waiting for the input named pipe to send data, but the data will be streamed a few lines ahead.
The following code prepares the output to be sent to standard output via a simple cat command. As you can see the cat command and the rm command are within parenthesis, meaning that those two commands are invoked in a sub-shell that is sent to the background (specified by the ampersand at the end of the line), as shown in Listing 3.
Listing 3. Handling the input and output named pipes
(cat $outfname;rm $outfname)& if [ -z $1 ]; then cat > $infname else cat $1 > $infname fi rm $infname
The latter is necessary so when the job is finished writing the output, the temporary named pipe file name is removed. The code that follows, tests if the script was called with a parameter as a file, or if you are receiving from the data from a pipe. After the input stream (file or pipe) is sent to the input named pipe, you finish and remove the file.
You can name the script as piped_ds_job.sh and execute it as
command1|piped_ds_job.sh > /path/to/out_file.
The fact that the script can receive the input via an anonymous pipe,
allows the uses shown in Listing 4.
Listing 4. Wrapper script uses
command1|piped_ds_job.sh|command2 zcat compressedfile.gz |piped_ds_job.sh > /path/to/out_file zcat compressedfile.gz |ssh -l firstname.lastname@example.org piped_ds_job.sh| command2
The last sample where you use SSH assumes that you are executing from another machine, and therefore the DataStage job is somehow used as a service. This also would be a representative usage of how you can bypass the file transmission (and decompression in this case).
The mechanism described in this article allows for a more flexible DataStage job invocation at the command line and in shell scripting. The explained wrapper script can easily be customized to make it more general and flexible. The technique is a simple one that can be quickly implemented for current jobs and can convert them in services through remote execution via SSH. The benefits in avoiding landing data in a regular file are most notable when file sizes are in the order of dozens of million of rows, but even if your data is not that large, the integration use case is very valuable.
- Read about Information Server and DataStage in the InfoSphere Information Server 9.1 Information Center.
- Review the UNIX IPC in the "Speaking UNIX: Interprocess communication with shared memory" developerWorks article.
- Evaluate IBM products in the way that suits you best: Download a product trial, try a product online, use a product in a cloud environment, or spend a few hours in the SOA Sandbox learning how to implement Service Oriented Architecture efficiently.