C example (multiple simultaneous connections)
There are two ways in C to achieve multiple simultaneous connections. The first is by creating a thread for each one using pthreads. The second is by forking off a new process to handle the connection, which is the method demonstrated in this example. This example is based on the shaper and sizer example, but with changes to the main.
In this example, nzaeRemprotAcceptEnvironment is called and forks twice to disassociate. SIGCHLD is ignored before the fork, and the parent waits for the first child to exit before unblocking, avoiding zombie processes. Only the parent can close the connection point and remote protocol handles. The child processes a single request.
The example uses the following file name:
sstring.c
Code
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <signal.h>
#include "nzaeapis.h"
static int run(NZAE_HANDLE h);
static int runShaper(NZAESHP_HANDLE h);
int main(int argc, char * argv[])
{
if (nzaeIsLocal())
{
NzaeApi result;
char errorMessage[1050];
if (nzaeLocprotGetApi(&result, NZAE_LDK_VERSION,
errorMessage, sizeof(errorMessage)))
{
fprintf(stderr, "%s\n", errorMessage);
return -1;
}
if (result.apiType == NZAE_API_FUNCTION) {
run(result.handle.function);
nzaeClose(result.handle.function);
}
else {
runShaper(result.handle.shaper);
nzaeShpClose(result.handle.shaper);
}
}
else {
NZAECONPT_HANDLE hConpt = nzaeconptCreate();
if (!hConpt)
{
fprintf(stderr, "error creating connection point\n");
fflush(stderr);
return -1;
}
const char * conPtName = nzaeRemprotGetRemoteName();
if (!conPtName)
{
fprintf(stderr, "error getting connection point name\n");
fflush(stderr);
exit(-1);
}
if (nzaeconptSetName(hConpt, conPtName))
{
fprintf(stderr, "error setting connection point name\n");
fflush(stderr);
nzaeconptClose(hConpt);
return -1;
}
NzaeremprotInitialization args;
memset(&args, 0, sizeof(args));
args.ldkVersion = NZAE_LDK_VERSION;
args.hConpt = hConpt;
if (nzaeRemprotCreateListener(&args))
{
fprintf(stderr, "unable to create listener - %s\n",
args.errorMessage);
fflush(stderr);
nzaeconptClose(hConpt);
return -1;
}
NZAEENV_HANDLE hEnv;
NZAEREMPROT_HANDLE hRemprot = args.handle;
NzaeApi api;
int i;
for (i = 0; i < 5; i++)
{
if (nzaeRemprotAcceptEnvironment(hRemprot, &hEnv))
{
fprintf(stderr, "unable to accept API - %s\n", \
nzaeRemprotGetLastErrorText(hRemprot));
fflush(stderr);
nzaeconptClose(hConpt);
nzaeRemprotClose(hRemprot);
return -1;
}
struct sigaction newaction;
struct sigaction m_save;
memset(&newaction, 0, sizeof(newaction));
newaction.sa_handler = SIG_IGN;
sigaction(SIGCHLD, &newaction, &m_save);
int rcFork = fork();
if (rcFork == -1) {
fprintf(stderr, "fork failed\n");
fflush(stderr);
nzaeconptClose(hConpt);
nzaeRemprotClose(hRemprot);
}
else if (rcFork > 0) {
int status;
int ret = waitpid(rcFork, &status, 0);
sigaction(SIGCHLD, &m_save, NULL);
//parent
nzaeenvClose(hEnv);
continue;
}
// fork again to disassociate
rcFork = fork();
if (rcFork == -1) {
exit(1);
}
else if (rcFork > 0) {
nzaeenvClose(hEnv);
exit(1);
}
int j;
// fix all signal handlers.
// This will remove any parent signal handlers
for (j=0;j < NSIG; j++) {
signal(j, SIG_DFL);
}
setpgid(getpid(), 0);
setsid();
sigaction(SIGCHLD, &m_save, NULL);
printf("testcapi: accepted a remote request\n");
fflush(stdout);
NzaeApiTypes apiType = nzaeRemprotGetEnvironmentApiType(hEnv);
if (apiType == NZAE_API_FUNCTION) {
NzaeInitialization arg;
memset(&arg, 0, sizeof(arg));
arg.hEnv = hEnv;
arg.ldkVersion = NZAE_LDK_VERSION;
if (nzaeInitialize(&arg))
{
fprintf(stderr, "initialization failed\n");
return -1;
}
run(arg.handle);
nzaeClose(arg.handle);
}
else {
NzaeShpInitialization arg;
memset(&arg, 0, sizeof(arg));
arg.hEnv = hEnv;
arg.ldkVersion = NZAE_LDK_VERSION;
if (nzaeShpInitialize(&arg))
{
fprintf(stderr, "initialization failed\n");
return -1;
}
runShaper(arg.handle);
nzaeShpClose(arg.handle);
}
// Since you are forked, only run once
exit(0);
}
nzaeRemprotClose(hRemprot);
nzaeconptClose(hConpt);
}
return 0;
}
static int runShaper(NZAESHP_HANDLE h)
{
char name[2];
bool upper = true;
NzaeShpMetadata meta;
#define CHECK2(value) \
{ \
NzaeRcCode rc = value; \
if (rc) \
{ \
const char * format = "%s in %s at %d"; \
fprintf(stderr, format, \
nzaeShpGetLastErrorText(h), __FILE__, __LINE__); \
nzaeShpUserError(h, format, \
nzaeShpGetLastErrorText(h), __FILE__, __LINE__); \
exit(-1); \
} \
}
CHECK2(nzaeShpGetMetadata(h, &meta));
if (!meta.oneOutputRowRestriction)
CHECK2(nzaeShpSystemCatalogIsUpper(h, &upper));
if (upper)
name[0] = 'I';
else
name[0] = 'i';
name[1] = 0;
if (meta.inputTypes[0] == NZUDSUDX_FIXED ||
meta.inputTypes[0] == NZUDSUDX_VARIABLE ||
meta.inputTypes[0] == NZUDSUDX_NATIONAL_FIXED ||
meta.inputTypes[0] == NZUDSUDX_NATIONAL_VARIABLE) {
CHECK2(nzaeShpAddOutputColumnString(h, meta.inputTypes[0], name, \
meta.inputSizes[0]));
}
else if (meta.inputTypes[0] == NZUDSUDX_NUMERIC128 ||
meta.inputTypes[0] == NZUDSUDX_NUMERIC64 ||
meta.inputTypes[0] == NZUDSUDX_NUMERIC32) {
CHECK2(nzaeShpAddOutputColumnNumeric(h, meta.inputTypes[0], name, \
meta.inputSizes[0], meta.inputScales[0]));
}
else {
CHECK2(nzaeShpAddOutputColumn(h, meta.inputTypes[0], name));
}
CHECK2(nzaeShpUpdate(h));
}
static int run(NZAE_HANDLE h)
{
NzaeMetadata metadata;
if (nzaeGetMetadata(h, &metadata))
{
fprintf(stderr, "get metadata failed\n");
return -1;
}
#define CHECK(value) \
{ \
NzaeRcCode rc = value; \
if (rc) \
{ \
const char * format = "%s in %s at %d"; \
fprintf(stderr, format, \
nzaeGetLastErrorText(h), __FILE__, __LINE__); \
nzaeUserError(h, format, \
nzaeGetLastErrorText(h), __FILE__, __LINE__); \
exit(-1); \
} \
}
for (;;)
{
int i;
double result = 0;
NzaeRcCode rc = nzaeGetNext(h);
if (rc == NZAE_RC_END)
{
break;
}
NzudsData * input = NULL;
CHECK(nzaeGetInputColumn(h, 0, &input));
const char * opString;
if (input->isNull)
{
nzaeSetOutputNull(h,0);
}
else {
nzaeSetOutputColumn(h,0, input);
}
CHECK(nzaeOutputResult(h));
}
nzaeDone(h);
return 0;
}
Compilation
Compile as follows:
$NZ_EXPORT_DIR/ae/utilities/bin/compile_ae --language system --version 3 \
--template compile sstring.c --exe sstring
Registration
For registration, use a remote variation to demonstrate the fork mode that only works in remote mode:
$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --language system --version 3 \
--template udtf --exe sstring --sig "rem_sstring_c(VARCHAR(ANY))" \
--return "TABLE (val varchar(2000))" --remote --rname testcapi
$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --language system --version 3 \
--template udtf --exe sstring --sig "rem_sstring_launch(int8)" \
--return "TABLE(aeresult varchar(255))" --remote --rname testcapi \
--launch
Running
To run:
SELECT * FROM TABLE WITH FINAL(rem_sstring_launch(0));
AERESULT
---------------------------------------------------------------------------------
-------------------
tran: 8278 session: 16019 DATA slc: 0 hardware: 0 machine: bdrosendev process:
20470 thread: 20470
(1 row)
SELECT * FROM TABLE WITH FINAL(rem_sstring_c('test'));
VAL
------
test
(1 row)