In UNIX and Linux environments, you can create named pipes using the mkfifo command.
You can then use the pipe
with the DB2 LOAD command.
However, the Windows operating system does not allow you to create a named pipe by
using an external command similar to mkfifo.
With Windows, you need to create the named pipe through the Windows API, and
the scope of the pipe is only within the session in which it is created.
Also, a named pipe created with Java
cannot interact with the Windows shell, and
therefore cannot be used by the DB2 LOAD command.
The pipeload.zip file that is linked to in the Download section of this article
contains sample code to help solve this problem.
The sample code lets you create a Windows pipe that can be used by the DB2 LOAD command.
You do this by creating Java Native Interface (JNI) wrappers that use calls to the Windows API through a C program
to create and use named pipes .
You can then use a Java program to send the data to the Windows pipe.
At a high level, the sample code uses a Java driver program (TestPipe.java or TestUnixPipe.java) to send data to a named pipe. Then, db2load.sql consumes the data through a pipe in a separate command shell. The use of this concept is demonstrated in the IBM Data Movement Tool (see the Resources section for a link to a developerWorks article that describes the IBM Data Movement Tool).
Following are descriptions of the main files contained in the pipeload.zip download file:
- Pipe.c — Contains C code that uses the Windows APIs to create, use, and close Windows named pipes.
- Pipes.java — Contains Java Native Interface (JNI) that declares wrappers for all Windows native methods used in Pipe.c.
- TestPipe.java — Contains JNI calls to create, use, and close Windows named pipes.
- TestUnixPipe.java — Contains a system call to the
mkfifocommand to create, use, and drop a named pipe in UNIX. - db2load.sql — Contains a DB2
LOADcommand using a pipe.
The named pipes you create in Windows are not permanent like they are with UNIX and Linux,
and you cannot create them like a special file.
You can access a Windows named pipe much like a file, but the pipe will not be
visible through Windows Explorer.
Windows pipes are mounted on a special file system.
You can use the Windows SDK functions,
such as CreateFile, ReadFile,
WriteFile, and CloseHandle, to open,
read from, write to, and close a pipe.
With UNIX or Linux, you can create a named pipe with the mkfifo or
mknod command.
Two separate processes can access the created pipe.
The mkfifo command creates a First In First Out (FIFO) file.
You can then use this FIFO file to move information from one application to another
without storing the data in an intermediate file.
Therefore, you save the space you would otherwise use to store the temporary file.
The mknod command also creates a named pipe if
you specify the p option for the type.
Following are samples of the commands to perform the two steps for creating a named pipe and loading DB2 data within a Unix or Linux environment.
Step 1 — create a named pipe and send uncompressed data to the pipe:
$ mkfifo db2pipe $ gunzip tabledata.gz > db2pipe |
Step 2 — use the pipe to load the uncompressed data into DB2:
$ db2 connect to sample $ db2 "LOAD FROM db2pipe OF DEL INSERT INTO mytable" $ db2 terminate |
JNI to access Windows native methods
The code in the Pipes.java program declares Java methods that point back to the Windows
native API calls implemented through the Pipe.c program.
You need to run Pipes.java through the javah program
to create a header file
that has specifications for the C program to match the signature of the C methods.
Note that it is beyond the scope of this article to provide a complete description of JNI.
Listing 1. JNI methods declarations in Pipes.java file
public class Pipes
{
static
{
System.loadLibrary("Pipe");
}
public static final native int CreateNamedPipe(String pipeName,
int ppenMode, int pipeMode, int maxInstances,
int outBufferSize, int inBufferSize, int defaultTimeOut,
int securityAttributes);
public static final native boolean ConnectNamedPipe(int namedPipeHandle,
int overlapped);
public static final native int GetLastError();
public static final native boolean CloseHandle(int bbject);
public static final native byte[] ReadFile(int file, int numberOfBytesToRead);
public static final native int WriteFile(int file, byte[] buffer,
int numberOfBytesToWrite);
public static final native boolean FlushFileBuffers(int file);
public static final native boolean DisconnectNamedPipe(int namedPipeHandle);
public static final native int CreateFile(String fileName,
int desiredAccess, int shareMode, int securityAttributes,
int creationDisposition, int flagsAndAttributes,
int templateFile);
public static final native boolean WaitNamedPipe(String namedPipeName, int timeOut);
public static final native String FormatMessage(int errorCode);
public static final native void Print(String message);
}
|
Use the following code to compile and create a JNI header file for Pipes.java:
javac ibm/Pipes.java javah -jni ibm.Pipes |
Copy the ibm_Pipes.h file from the sample code to your C program folder.
C code to wrap Windows methods
The sample C code in Listing 2 is from the Pipe.c file in the sample download.
The code creates C functions that match the declarations in the
header file you created
using the javah program in the previous step.
Listing 2. Sample code listing for JNI code
#include <windows.h>
#include <strsafe.h>
#include <jni.h>
#include "ibm_Pipes.h"
#define DEBUG 0
JNIEXPORT jint JNICALL Java_ibm_Pipes_CreateNamedPipe
(
JNIEnv *env,
jclass className,
jstring sPipeName,
jint dwOpenMode,
jint dwPipeMode,
jint nMaxInstances,
jint nOutBufferSize,
jint nInBufferSize,
jint nDefaultTimeOut,
jint lpSecurityAttributes
)
{
HANDLE pipeHandler;
LPCSTR pipeName;
pipeName = (*env)->GetStringUTFChars(env, sPipeName, NULL);
if (pipeName == NULL)
return -1;
if (DEBUG)
{
printf("Native: Pipe Name %s\n", pipeName);
printf("Native: dwOpenMode %d\n", dwOpenMode);
printf("Native: dwPipeMode %d\n", dwPipeMode);
printf("Native: nMaxInstances %d\n", nMaxInstances);
printf("Native: nOutBufferSize %d\n", nOutBufferSize);
printf("Native: nInBufferSize %d\n", nInBufferSize);
printf("Native: nDefaultTimeOut %d\n", nDefaultTimeOut);
}
pipeHandler = CreateNamedPipe((LPCSTR)pipeName, dwOpenMode,
dwPipeMode, nMaxInstances, nOutBufferSize,
nInBufferSize, nDefaultTimeOut,
(LPSECURITY_ATTRIBUTES) lpSecurityAttributes);
(*env)->ReleaseStringUTFChars(env, sPipeName, pipeName);
return (jint) pipeHandler;
}
JNIEXPORT jboolean JNICALL Java_ibm_Pipes_ConnectNamedPipe
(
JNIEnv *env,
jclass className,
jint hNamedPipe,
jint lpOverlapped
)
{
BOOL fConnected;
HANDLE pipeHandler = (HANDLE) hNamedPipe;
fConnected = ConnectNamedPipe(pipeHandler,
(LPOVERLAPPED) lpOverlapped);
return fConnected;
}
JNIEXPORT jint JNICALL Java_ibm_Pipes_GetLastError
(
JNIEnv *env,
jclass className
)
{
DWORD errorNumber = GetLastError();
return (jint) errorNumber;
}
JNIEXPORT jboolean JNICALL Java_ibm_Pipes_CloseHandle
(
JNIEnv *env,
jclass className,
jint hNamedPipe
)
{
BOOL result;
HANDLE pipeHandler = (HANDLE) hNamedPipe;
result = CloseHandle(pipeHandler);
return result;
}
JNIEXPORT jbyteArray JNICALL Java_ibm_Pipes_ReadFile
(
JNIEnv *env,
jclass className,
jint hNamedPipe,
jint nNumberOfBytesToRead
)
{
int bytesRead = 0;
BOOL result;
HANDLE pipeHandler = (HANDLE) hNamedPipe;
LPVOID buffer;
jbyteArray lpBuffer;
buffer = (LPVOID)LocalAlloc(LMEM_ZEROINIT, nNumberOfBytesToRead);
if (DEBUG)
{
printf("Native: Before ReadFile pipeHandler %d
nNumberOfBytesToRead %d\n", pipeHandler, nNumberOfBytesToRead);
}
result = ReadFile(pipeHandler, (LPVOID) buffer,
(DWORD) nNumberOfBytesToRead,
&bytesRead, (LPOVERLAPPED) 0);
if (result)
{
lpBuffer = (*env)->NewByteArray(env, (jsize) bytesRead);
(*env)->SetByteArrayRegion(env, lpBuffer, 0,
(jsize) bytesRead, (jbyte *) buffer);
} else
bytesRead = 0;
LocalFree(buffer);
if (DEBUG)
{
printf("Native: After ReadFile BytesRead %d\n", bytesRead);
}
return lpBuffer;
}
JNIEXPORT jint JNICALL Java_ibm_Pipes_WriteFile
(
JNIEnv *env,
jclass className,
jint hNamedPipe,
jbyteArray lpBuffer,
jint nNumberOfBytesToWrite
)
{
int bytesWritten = 0;
BOOL result;
HANDLE pipeHandler = (HANDLE) hNamedPipe;
LPVOID buffer;
buffer = (LPVOID)LocalAlloc(LMEM_ZEROINIT, nNumberOfBytesToWrite);
(*env)->GetByteArrayRegion(env, lpBuffer, 0,
nNumberOfBytesToWrite, buffer);
result = WriteFile(pipeHandler, buffer,
(DWORD) nNumberOfBytesToWrite,
(LPDWORD) &bytesWritten, (LPOVERLAPPED) 0);
LocalFree(buffer);
if (DEBUG)
{
printf("Native: After WriteFile BytesReadWritten %d\n",
bytesWritten);
}
if (!result)
{
if (GetLastError() != ERROR_IO_PENDING)
result = 0;
else
result = 1;
}
if (!result)
{
bytesWritten = -1;
}
return bytesWritten;
}
JNIEXPORT jboolean JNICALL Java_ibm_Pipes_FlushFileBuffers
(
JNIEnv *env,
jclass className,
jint hNamedPipe
)
{
BOOL result;
HANDLE pipeHandler = (HANDLE) hNamedPipe;
result = FlushFileBuffers(pipeHandler);
return result;
}
JNIEXPORT jboolean JNICALL Java_ibm_Pipes_DisconnectNamedPipe
(
JNIEnv *env,
jclass className,
jint hNamedPipe
)
{
BOOL result;
HANDLE pipeHandler = (HANDLE) hNamedPipe;
result = DisconnectNamedPipe(pipeHandler);
return result;
}
JNIEXPORT jint JNICALL Java_ibm_Pipes_CreateFile
(
JNIEnv *env,
jclass className,
jstring lpFileName,
jint dwDesiredAccess,
jint dwShareMode,
jint lpSecurityAttributes,
jint dwCreationDisposition,
jint dwFlagsAndAttributes,
jint hTemplateFile
)
{
HANDLE pipeHandler;
const jbyte *fileName;
fileName = (*env)->GetStringUTFChars(env, lpFileName, NULL);
if (fileName == NULL)
return -1;
pipeHandler = CreateFile((LPCSTR) fileName,
(DWORD) dwDesiredAccess, (DWORD) dwShareMode,
(LPSECURITY_ATTRIBUTES) lpSecurityAttributes,
(DWORD) dwCreationDisposition,
(DWORD) dwFlagsAndAttributes,
(HANDLE) hTemplateFile);
return (jint) pipeHandler;
}
JNIEXPORT jboolean JNICALL Java_ibm_Pipes_WaitNamedPipe
(
JNIEnv *env,
jclass className,
jstring lpNamedPipeName,
jint nTimeOut
)
{
BOOL result;
const jbyte *pipeName;
pipeName = (*env)->GetStringUTFChars(env, lpNamedPipeName, NULL);
if (pipeName == NULL)
return 0;
result = WaitNamedPipe((LPCSTR) pipeName, (DWORD) nTimeOut);
return result;
}
JNIEXPORT jstring JNICALL Java_ibm_Pipes_FormatMessage
(
JNIEnv *env,
jclass className,
jint errorCode
)
{
LPVOID lpMsgBuf;
LPVOID lpDisplayBuf;
DWORD dw = (DWORD) errorCode;
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
dw,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR) &lpMsgBuf,
0, NULL );
lpDisplayBuf = (LPVOID)LocalAlloc(LMEM_ZEROINIT,
(lstrlen((LPCTSTR)lpMsgBuf) + 40) * sizeof(TCHAR));
StringCchPrintf((LPTSTR)lpDisplayBuf,
LocalSize(lpDisplayBuf) / sizeof(TCHAR),
TEXT("Failed with error %d: %s"), dw, lpMsgBuf);
return (jstring) (*env)->NewStringUTF(env, lpDisplayBuf);
}
JNIEXPORT void JNICALL Java_ibm_Pipes_Print(JNIEnv *env,
jclass className,
jstring lpMsgBuf)
{
const jbyte *str;
str = (*env)->GetStringUTFChars(env, lpMsgBuf, NULL);
if (str == NULL)
return;
printf("Native: %s\n", str);
(*env)->ReleaseStringUTFChars(env, lpMsgBuf, str);
return;
}
|
Compile the C program in Windows
To compile and create a dynamic link library (DLL),
you need the Windows cl.exe compiler.
If you do not already have cl.exe, you can get it
by downloading Microsoft Visual Studio Express Edition.
It is beyond the scope of this article to provide instructions on how to use Visual Studio to compile a C program.
However, you can simply use cl.exe
from the command line to create the DLL as follows:
cl -I"C:\Program Files\IBM\Java50\include" -I"C:\Program Files\IBM\Java50\include\win32"
-LD Pipe.c -FePipe.dll
|
When using cl.exe to create a DLL,
remember to copy the ibm_Pipes.h file to the current directory and replace
references to the Java include directory with the actual location
of the directory on your system.
Sample Java program to use Windows pipes
Use the code in Listing 3 to create a named pipe on Windows using the native methods declared in the Pipes.h header file and implemented in the Pipe.c code.
Listing 3. Sample TestPipe.java program to use a Windows pipe
package ibm;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
public class TestPipe
{
static final int ERROR_PIPE_CONNECTED = 535;
static final int ERROR_BROKEN_PIPE = 109;
private int namedPipeHandle;
private String pipeName, srcFile;
private int pipeBuffer = 131072, fileBuffer = 8192;
public TestPipe(String pipeName, String srcFile)
{
this.pipeName = pipeName;
this.srcFile = srcFile;
}
private void log(String message)
{
System.out.println(message);
}
private boolean createPipe()
{
boolean ok = false;
namedPipeHandle = Pipes.CreateNamedPipe(pipeName,
0x00000003, 0x00000000, 2, pipeBuffer,
pipeBuffer, 0xffffffff, 0);
if (namedPipeHandle == -1)
{
log("CreateNamedPipe failed for " + pipeName +
" for error " + " Message " +
Pipes.FormatMessage(Pipes.GetLastError()));
ok = false;
} else
{
log("Named Pipe " + pipeName +
" created successfully Handle=" + namedPipeHandle);
ok = true;
}
return ok;
}
private boolean connectToPipe()
{
log("Waiting for a client to connect to pipe " + pipeName);
boolean connected = Pipes.ConnectNamedPipe(namedPipeHandle, 0);
if (!connected)
{
int lastError = Pipes.GetLastError();
if (lastError == ERROR_PIPE_CONNECTED)
connected = true;
}
if (connected)
{
log("Connected to the pipe " + pipeName);
} else
{
log("Falied to connect to the pipe " + pipeName);
}
return connected;
}
public void runPipe()
{
if (createPipe())
{
if (!connectToPipe())
{
log("Connect ConnectNamedPipe failed : " +
Pipes.FormatMessage(Pipes.GetLastError()));
return;
} else
{
log("Client connected.");
}
try
{
File f1 = new File(this.srcFile);
InputStream in = new FileInputStream(f1);
log("Sending data to the pipe");
byte[] buf = new byte[fileBuffer];
int len, bytesWritten;
while ((len = in.read(buf)) > 0)
{
bytesWritten = Pipes.WriteFile(namedPipeHandle, buf, len);
log("Sent " + len + "/" + bytesWritten +
" bytes to the pipe");
if (bytesWritten == -1)
{
int errorNumber = Pipes.GetLastError();
log("Error Writing to pipe " +
Pipes.FormatMessage(errorNumber));
}
}
in.close();
Pipes.FlushFileBuffers(namedPipeHandle);
Pipes.CloseHandle(namedPipeHandle);
Pipes.DisconnectNamedPipe(namedPipeHandle);
log("Writing to the pipe completed.");
} catch (Exception e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException,
InterruptedException
{
String pipeName = "\\\\.\\pipe\\mynamedpipe";
String fileName = "C:\\db2tabledata.txt";;
TestPipe testPipe = new TestPipe(pipeName, fileName);
testPipe.runPipe();
}
}
|
Sample Java program to use UNIX pipes
With UNIX, you do not need the Java code to invoke a C program and create a named pipe.
This is because UNIX can create a named pipe directly using
either the
mkfifo or mknod command.
The code in Listing 4 shows an example of using a named pipe in UNIX.
Listing 4. Sample TestUnixPipe.java program to use a named UNIX pipe
package ibm;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class TestUnixPipe
{
FileChannel fc;
int multiTables[] = new int[1];
String filesep = System.getProperty("file.separator");
String fileName, OUTPUT_DIR = ".", pipeName;
int pipeBuffer = 131072, fileBuffer = 8192;
public TestUnixPipe(String fileName, String output)
{
this.fileName = fileName;
this.OUTPUT_DIR = output;
multiTables[0] = 0;
}
private void log(String message)
{
System.out.println(message);
}
public void runPipe()
{
int bytesReturn;
pipeName = OUTPUT_DIR + "data" + filesep + pipeName + ".pipe";
File pipeFile = new File(pipeName);
pipeFile.deleteOnExit();
if (!pipeFile.exists())
{
try
{
Runtime.getRuntime().exec("mkfifo " +
pipeFile.getAbsolutePath());
} catch (Exception e)
{
e.printStackTrace();
}
}
FileOutputStream fos = null;
try
{
if (multiTables[0] == 0)
{
fos = new FileOutputStream(pipeFile);
fc = fos.getChannel();
} else
{
fc = fc;
}
} catch (Exception e)
{
e.printStackTrace();
}
try
{
File f1 = new File(this.fileName);
InputStream in = new FileInputStream(f1);
log("Sending data to the pipe");
byte[] buf = new byte[fileBuffer];
int len;
while ((len = in.read(buf)) > 0)
{
bytesReturn = fc.write(ByteBuffer.wrap(buf));
log("Sent " + len + "/" + bytesReturn +
" bytes to the pipe");
if (bytesReturn == -1)
{
log("Error Writing to pipe " + pipeName);
}
}
in.close();
log("Writing to the pipe completed.");
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException,
InterruptedException
{
String output = ".";
String fileName = "/home/db2inst1/db2tabledata.txt";;
TestUnixPipe testPipe = new TestUnixPipe(fileName, output);
testPipe.runPipe();
}
}
|
Run the TestPipe.java program to use a Windows pipe
Use the following commands to compile and run the sample TestPipe.java program (Listing 3).
javac TestPipe.java java -Djava.library.path=".." ibm.TestPipe |
In the above example, you pass the java.library.path system property to the JVM to indicate the location where the JVM can look for Pipe.dll to invoke Windows native methods. In the example, the Pipe.dll is located one level up from the current directory.
When you run TestPipe, it creates a named pipe called \\.\pipe\mynamedpipe and
connects to the pipe.
It then waits for another program (in this case the DB2 LOAD command) to connect to it before
it starts to write data to the pipe.
Once DB2 LOAD connects to the pipe, the Java
program starts sending the contents of the db2tabledata.txt file to the pipe
and the DB2 LOAD consumes that data.
Figure 1. Running the Java sample code from a Windows command prompt
DB2 Load script to use the pipe for a table
As described in the previous section, after the TestPipe program connects to the
pipe, it waits for another program to connect to it.
To make this other connection, run the sample db2load.sql DB2
script (Listing 5).
This creates a table and starts the LOAD from mynamedpipe.
Listing 5. Sample DB2 script to create a table and start the LOAD
CONNECT TO SAMPLE; CREATE TABLE PIPE_TABLE ( "ID" NUMBER(10) NOT NULL , "NAME" VARCHAR2(35) , "LOC_ID" NUMBER(4) ) ; LOAD FROM "\\.\pipe\mynamedpipe" OF DEL MODIFIED BY CODEPAGE=1208 COLDEL~ ANYORDER USEDEFAULTS CHARDEL"" DELPRIORITYCHAR NOROWWARNINGS METHOD P (1,2,3) MESSAGES "db2tabledata.txt" REPLACE INTO PIPE_TABLE ( "ID", "NAME", "LOC_ID" ) NONRECOVERABLE INDEXING MODE AUTOSELECT ; TERMINATE; |
To invoke the db2load.sql script, open a DB2 command line processor (CLP) window and run the following command:
db2 -tf db2load.sql
|
Figure 2, represents the simultaneous processing that occurs between the
db2load.sql script and the TestPipe.java program.
Notice that once DB2 LOAD command connects to the Windows named pipe,
the Java program starts writing data to the pipe.
Simultaneously, the DB2 LOAD consumes that
data and loads it into DB2.
Figure 2. DB2 CLP window running db2load.sql and Windows command prompt running TestPipe.java running TestPipe
Run the TestUnixPipe.java program to use a UNIX pipe
You can run the TestUnixPipe.java program on UNIX systems and see the same type of behavior as described above for the Windows environment. The exception, of course, is that you would use the UNIX capability to directly create the pipe.
Using pipes with the IBM Data Movement Tool
You can customize and use the sample code from this article when you have your own requirements to process data through Java or C/C++ programs, and your main motivation is to avoid having to use intermediate data files. However, if your motivation is to just unload data from a source database and load it into DB2 using pipes, you can simply use the IBM Data Movement Tool with the Use Pipe option, as shown in Figure 3.
With the IBM Data Movement Tool, you first extract and create table definitions from a source database to DB2 and then use the pipe option to load the data. You can do this in parallel to unload. You can load and unload data from a single or multiple tables in parallel. For more information about the tool and how to use it, refer to the IBM Data Movement article link in the Resources section.
Figure 3. Screenshot of IBM Data Movement Tool
The DB2 LOAD utility is very powerful and allows you to exploit server hardware to the full extent to
load data.
However, in a very large database environment, the space to hold the intermediate data
can become an issue.
You can overcome this issue by using the pipe capability.
The use of pipes across platforms is the same as using a normal file, except for that
within a Windows environment there are extra steps to create the pipes,
and the Java program must use JNI to access the Windows API.
This article's instructions along with the sample code included in the download file
should take some of the complexity out of this process and
make it easier for you to accomplish this type of task.
| Description | Name | Size | Download method |
|---|---|---|---|
| Sample Java and C code for this article | pipeload.zip | 28KB | HTTP |
Information about download methods
Learn
-
"IBM Data Movement Tool"
(developerWorks, Jun 2009) implements pipe to move data from source database to DB2.
-
"Microsoft Visual Studio"
Express Edition to compile Pipe.c program into Pipe.dll
Get products and technologies
- Download
Download a free trial version of DB2 9.7 for Linux, UNIX, and Windows
- DB2 Express-C 9.7
Download DB2 Express-C 9.7, a no-charge version of DB2 Express database server for the community.
- Build your next development project with
IBM Trial software, available for download directly from developerWorks.
Discuss
Vikram S Khatri works for IBM in the Sales and Distribution Division and is a member of the DB2 Migration team. Vikram has 23 years of IT experience and specializes in enabling non-DB2 applications to DB2. Vikram supports the DB2 technical sales organization by assisting with complex database migration projects as well as with database performance benchmark testing.




