C ejemplo (múltiples conexiones simultáneas)
En C hay dos formas de conseguir varias conexiones simultáneas. La primera es creando un hilo para cada uno utilizando pthreads. La segunda es mediante la bifurcación de un nuevo proceso para manejar la conexión, que es el método demostrado en este ejemplo. Este ejemplo se basa en el ejemplo de la formadora y la clasificadora, pero con cambios en la principal.
En este ejemplo, se llama a nzaeRemprotAcceptEnvironment y se bifurca dos veces para disociarse. SIGCHLD se ignora antes de la bifurcación, y el padre espera a que el primer hijo salga antes de desbloquearse, evitando procesos zombis. Sólo el padre puede cerrar el punto de conexión y los manejadores de protocolo remotos. El niño procesa una única solicitud.
El ejemplo utiliza el siguiente nombre de archivo:
sstring.c
Código
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <signal.h>
#include "nzaeapis.h"
static int run(NZAE_HANDLE h);
static int runShaper(NZAESHP_HANDLE h);
int main(int argc, char * argv[])
{
if (nzaeIsLocal())
{
NzaeApi result;
char errorMessage[1050];
if (nzaeLocprotGetApi(&result, NZAE_LDK_VERSION,
errorMessage, sizeof(errorMessage)))
{
fprintf(stderr, "%s\n", errorMessage);
return -1;
}
if (result.apiType == NZAE_API_FUNCTION) {
run(result.handle.function);
nzaeClose(result.handle.function);
}
else {
runShaper(result.handle.shaper);
nzaeShpClose(result.handle.shaper);
}
}
else {
NZAECONPT_HANDLE hConpt = nzaeconptCreate();
if (!hConpt)
{
fprintf(stderr, "error creating connection point\n");
fflush(stderr);
return -1;
}
const char * conPtName = nzaeRemprotGetRemoteName();
if (!conPtName)
{
fprintf(stderr, "error getting connection point name\n");
fflush(stderr);
exit(-1);
}
if (nzaeconptSetName(hConpt, conPtName))
{
fprintf(stderr, "error setting connection point name\n");
fflush(stderr);
nzaeconptClose(hConpt);
return -1;
}
NzaeremprotInitialization args;
memset(&args, 0, sizeof(args));
args.ldkVersion = NZAE_LDK_VERSION;
args.hConpt = hConpt;
if (nzaeRemprotCreateListener(&args))
{
fprintf(stderr, "unable to create listener - %s\n",
args.errorMessage);
fflush(stderr);
nzaeconptClose(hConpt);
return -1;
}
NZAEENV_HANDLE hEnv;
NZAEREMPROT_HANDLE hRemprot = args.handle;
NzaeApi api;
int i;
for (i = 0; i < 5; i++)
{
if (nzaeRemprotAcceptEnvironment(hRemprot, &hEnv))
{
fprintf(stderr, "unable to accept API - %s\n", \
nzaeRemprotGetLastErrorText(hRemprot));
fflush(stderr);
nzaeconptClose(hConpt);
nzaeRemprotClose(hRemprot);
return -1;
}
struct sigaction newaction;
struct sigaction m_save;
memset(&newaction, 0, sizeof(newaction));
newaction.sa_handler = SIG_IGN;
sigaction(SIGCHLD, &newaction, &m_save);
int rcFork = fork();
if (rcFork == -1) {
fprintf(stderr, "fork failed\n");
fflush(stderr);
nzaeconptClose(hConpt);
nzaeRemprotClose(hRemprot);
}
else if (rcFork > 0) {
int status;
int ret = waitpid(rcFork, &status, 0);
sigaction(SIGCHLD, &m_save, NULL);
//parent
nzaeenvClose(hEnv);
continue;
}
// fork again to disassociate
rcFork = fork();
if (rcFork == -1) {
exit(1);
}
else if (rcFork > 0) {
nzaeenvClose(hEnv);
exit(1);
}
int j;
// fix all signal handlers.
// This will remove any parent signal handlers
for (j=0;j < NSIG; j++) {
signal(j, SIG_DFL);
}
setpgid(getpid(), 0);
setsid();
sigaction(SIGCHLD, &m_save, NULL);
printf("testcapi: accepted a remote request\n");
fflush(stdout);
NzaeApiTypes apiType = nzaeRemprotGetEnvironmentApiType(hEnv);
if (apiType == NZAE_API_FUNCTION) {
NzaeInitialization arg;
memset(&arg, 0, sizeof(arg));
arg.hEnv = hEnv;
arg.ldkVersion = NZAE_LDK_VERSION;
if (nzaeInitialize(&arg))
{
fprintf(stderr, "initialization failed\n");
return -1;
}
run(arg.handle);
nzaeClose(arg.handle);
}
else {
NzaeShpInitialization arg;
memset(&arg, 0, sizeof(arg));
arg.hEnv = hEnv;
arg.ldkVersion = NZAE_LDK_VERSION;
if (nzaeShpInitialize(&arg))
{
fprintf(stderr, "initialization failed\n");
return -1;
}
runShaper(arg.handle);
nzaeShpClose(arg.handle);
}
// Since you are forked, only run once
exit(0);
}
nzaeRemprotClose(hRemprot);
nzaeconptClose(hConpt);
}
return 0;
}
static int runShaper(NZAESHP_HANDLE h)
{
char name[2];
bool upper = true;
NzaeShpMetadata meta;
#define CHECK2(value) \
{ \
NzaeRcCode rc = value; \
if (rc) \
{ \
const char * format = "%s in %s at %d"; \
fprintf(stderr, format, \
nzaeShpGetLastErrorText(h), __FILE__, __LINE__); \
nzaeShpUserError(h, format, \
nzaeShpGetLastErrorText(h), __FILE__, __LINE__); \
exit(-1); \
} \
}
CHECK2(nzaeShpGetMetadata(h, &meta));
if (!meta.oneOutputRowRestriction)
CHECK2(nzaeShpSystemCatalogIsUpper(h, &upper));
if (upper)
name[0] = 'I';
else
name[0] = 'i';
name[1] = 0;
if (meta.inputTypes[0] == NZUDSUDX_FIXED ||
meta.inputTypes[0] == NZUDSUDX_VARIABLE ||
meta.inputTypes[0] == NZUDSUDX_NATIONAL_FIXED ||
meta.inputTypes[0] == NZUDSUDX_NATIONAL_VARIABLE) {
CHECK2(nzaeShpAddOutputColumnString(h, meta.inputTypes[0], name, \
meta.inputSizes[0]));
}
else if (meta.inputTypes[0] == NZUDSUDX_NUMERIC128 ||
meta.inputTypes[0] == NZUDSUDX_NUMERIC64 ||
meta.inputTypes[0] == NZUDSUDX_NUMERIC32) {
CHECK2(nzaeShpAddOutputColumnNumeric(h, meta.inputTypes[0], name, \
meta.inputSizes[0], meta.inputScales[0]));
}
else {
CHECK2(nzaeShpAddOutputColumn(h, meta.inputTypes[0], name));
}
CHECK2(nzaeShpUpdate(h));
}
static int run(NZAE_HANDLE h)
{
NzaeMetadata metadata;
if (nzaeGetMetadata(h, &metadata))
{
fprintf(stderr, "get metadata failed\n");
return -1;
}
#define CHECK(value) \
{ \
NzaeRcCode rc = value; \
if (rc) \
{ \
const char * format = "%s in %s at %d"; \
fprintf(stderr, format, \
nzaeGetLastErrorText(h), __FILE__, __LINE__); \
nzaeUserError(h, format, \
nzaeGetLastErrorText(h), __FILE__, __LINE__); \
exit(-1); \
} \
}
for (;;)
{
int i;
double result = 0;
NzaeRcCode rc = nzaeGetNext(h);
if (rc == NZAE_RC_END)
{
break;
}
NzudsData * input = NULL;
CHECK(nzaeGetInputColumn(h, 0, &input));
const char * opString;
if (input->isNull)
{
nzaeSetOutputNull(h,0);
}
else {
nzaeSetOutputColumn(h,0, input);
}
CHECK(nzaeOutputResult(h));
}
nzaeDone(h);
return 0;
}
Una compilación
Compila como sigue:
$NZ_EXPORT_DIR/ae/utilities/bin/compile_ae --language system --version 3 \
--template compile sstring.c --exe sstring
Registro
Para el registro, utilice una variación remota para demostrar el modo de horquilla que sólo funciona en modo remoto:
$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --language system --version 3 \
--template udtf --exe sstring --sig "rem_sstring_c(VARCHAR(ANY))" \
--return "TABLE (val varchar(2000))" --remote --rname testcapi
$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --language system --version 3 \
--template udtf --exe sstring --sig "rem_sstring_launch(int8)" \
--return "TABLE(aeresult varchar(255))" --remote --rname testcapi \
--launch
En ejecución
A correr:
SELECT * FROM TABLE WITH FINAL(rem_sstring_launch(0));
AERESULT
---------------------------------------------------------------------------------
-------------------
tran: 8278 session: 16019 DATA slc: 0 hardware: 0 machine: bdrosendev process:
20470 thread: 20470
(1 row)
SELECT * FROM TABLE WITH FINAL(rem_sstring_c('test'));
VAL
------
test
(1 row)