C language scalar function (remote mode)

This example uses the following file name:

func.c

Code

The code in this example shows that, with minor changes, this program works in remote mode:

The change is in the main. The required remote AE connection name, gathered from the launcher, is set and then a loop is added around the accept so it is called five times. This code works like a local AE and handles five iterations as a remote AE.

#include <stdio.h>
#include <stdlib.h>

#include "nzaeapis.h"

static int run(NZAE_HANDLE h);
int main(int argc, char * argv[])
{
    if (nzaeIsLocal())
    {
    NzaeInitialization arg;
    memset(&arg, 0, sizeof(arg));
            arg.ldkVersion = NZAE_LDK_VERSION;
        if (nzaeInitialize(&arg))
        {
            fprintf(stderr, "initialization failed\n");
            return -1;
        }
        run(arg.handle);
        nzaeClose(arg.handle);
    }  
    else {
        NZAECONPT_HANDLE hConpt = nzaeconptCreate();
        if (!hConpt)
        {
            fprintf(stderr, "error creating connection point\n");
            fflush(stderr);
            return -1;
        }
        const char * conPtName = nzaeRemprotGetRemoteName();
        if (!conPtName)
        {
            fprintf(stderr, "error getting connection point name\n");
            fflush(stderr);
            exit(-1);
        }
        if (nzaeconptSetName(hConpt, conPtName))
        {
            fprintf(stderr, "error setting connection point name\n");
            fflush(stderr);
            nzaeconptClose(hConpt);
            return -1;
        } 
        NzaeremprotInitialization args;
        memset(&args, 0, sizeof(args));
        args.ldkVersion = NZAE_LDK_VERSION;
        args.hConpt = hConpt;
        if (nzaeRemprotCreateListener(&args))
        {
            fprintf(stderr, "unable to create listener - %s\n", \
                args.errorMessage);
            fflush(stderr);
            nzaeconptClose(hConpt);
            return -1;
        }
        NZAEREMPROT_HANDLE hRemprot = args.handle;
        NzaeApi api;
        int i;
        for (i = 0; i < 5; i++)
        {
            if (nzaeRemprotAcceptApi(hRemprot, &api))
            {
                fprintf(stderr, "unable to accept API - %s\n", \
                    nzaeRemprotGetLastErrorText(hRemprot));
                fflush(stderr);
                nzaeconptClose(hConpt);
                nzaeRemprotClose(hRemprot);
                return -1;
            }
            if (api.apiType != NZAE_API_FUNCTION)
            {
                fprintf(stderr, "unexpected API returned\n");
                fflush(stderr);
                nzaeconptClose(hConpt);
                nzaeRemprotClose(hRemprot);
                return -1;
            }
            printf("testcapi: accepted a remote request\n");
            fflush(stdout);
            run(api.handle.function);
        }
        nzaeRemprotClose(hRemprot);
        nzaeconptClose(hConpt);
    }
    return 0;
}

static int run(NZAE_HANDLE h)
{
    NzaeMetadata metadata;
    if (nzaeGetMetadata(h, &metadata))
    {
        fprintf(stderr, "get metadata failed\n");
        return -1;
    }

    #define CHECK(value) \
    { \
        NzaeRcCode rc = value; \
        if (rc) \
        { \
            const char * format = "%s in %s at %d"; \
            fprintf(stderr, format, \
                nzaeGetLastErrorText(h), __FILE__, __LINE__); \
            nzaeUserError(h, format, \
                nzaeGetLastErrorText(h), __FILE__, __LINE__); \
            exit(-1); \
        } \
    }

    if (metadata.outputColumnCount != 1 ||
        metadata.outputTypes[0] != NZUDSUDX_DOUBLE)
    {
        nzaeUserError(h, "expecting one output column of type double");
        nzaeDone(h);
        return -1;
    }

    if (metadata.inputColumnCount < 1)
    {
        nzaeUserError(h, "expecting at least one input column");
        nzaeDone(h);
        return -1;
    }

    if (metadata.inputTypes[0] != NZUDSUDX_FIXED
        && metadata.inputTypes[0] != NZUDSUDX_VARIABLE)
    {
        nzaeUserError(h, "first input column expected to be a string type");
        nzaeDone(h);
        return -1;
    }

    for (;;)
    {
        int i;
        double result = 0;
        NzaeRcCode rc = nzaeGetNext(h);
        if (rc == NZAE_RC_END)
        {
            break;
        }

        NzudsData * input = NULL;
        CHECK(nzaeGetInputColumn(h, 0, &input));
        const char * opString;
        if (input->isNull)
        {
            nzaeUserError(h, "first input column may not be null");
            nzaeDone(h);
            return -1;
        }
        if (input->type == NZUDSUDX_FIXED)
        {
            opString = input->data.pFixedString;
        } else if (input->type == NZUDSUDX_VARIABLE)
        {
            opString = input->data.pVariableString;
        } else
        {
            nzaeUserError(h, "first input column expected to be a string type");
            nzaeDone(h);
            return -1;
        }

        enum OperatorEnum
        {
            OP_ADD = 1,
            OP_MULT = 2
        } op;

        if (strcmp(opString, "+") == 0)
        {
            op = OP_ADD;
        } else if (strcmp(opString, "*") == 0)
        {
            op = OP_MULT;
            result = 1;
        } else
        {
            nzaeUserError(h, "unexpected operator %s", opString);
            nzaeDone(h);
            return -1;
        }

        for (i = 1; i < metadata.inputColumnCount; i++)
        {
            CHECK(nzaeGetInputColumn(h, i, &input));
            if (input->isNull)
            {
                continue;
            }
            switch (op)
            {
            case OP_ADD:
                switch(input->type)
                {
                    case NZUDSUDX_INT8:
                        result += *input->data.pInt8;
                        break;
                    case NZUDSUDX_INT16:
                        result += *input->data.pInt16;
                        break;
                    case NZUDSUDX_INT32:
                        result += *input->data.pInt32;
                        break;
                    case NZUDSUDX_INT64:
                        result += *input->data.pInt64;
                        break;
                    case NZUDSUDX_FLOAT:
                        result += *input->data.pFloat;
                        break;
                    case NZUDSUDX_DOUBLE:
                        result += *input->data.pDouble;
                        break;

                    case NZUDSUDX_NUMERIC32:
                    case NZUDSUDX_NUMERIC64:
                    case NZUDSUDX_NUMERIC128:
                        // unlike Java, C has no native numeric data types
                        break;
                    default:
                        // ignore nonnumeric type
                        break;
                }
                break;
            case OP_MULT:
                switch(input->type)
                {
                    case NZUDSUDX_INT8:
                        result *= *input->data.pInt8;
                        break;
                    case NZUDSUDX_INT16:
                        result *= *input->data.pInt16;
                        break;
                    case NZUDSUDX_INT32:
                        result *= *input->data.pInt32;
                        break;
                    case NZUDSUDX_INT64:
                        result *= *input->data.pInt64;
                        break;
                    case NZUDSUDX_FLOAT:
                        result *= *input->data.pFloat;
                        break;
                    case NZUDSUDX_DOUBLE:
                        result *= *input->data.pDouble;
                        break;

                    case NZUDSUDX_NUMERIC32:
                    case NZUDSUDX_NUMERIC64:
                    case NZUDSUDX_NUMERIC128:
                        // unlike Java, C has no native numeric data types
                        break;
                    default:
                        // ignore non-numeric type
                        break;
                }
                break;
            default:
                nzaeUserError(h, "internal error, unexpected operator");
                nzaeDone(h);
                return -1;
            }
        }

        CHECK(nzaeSetOutputDouble(h, 0, result));
        CHECK(nzaeOutputResult(h));
    }
    nzaeDone(h);
    return 0;
}

Compilation

Use the standard compile, as in local mode:

$NZ_EXPORT_DIR/ae/utilities/bin/compile_ae --language system --version 3 \
     --template compile func.c --exe apply

Registration

Registration is slightly different. First, run this to register:

$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --language system --version 3 \
     --template udf --exe apply --sig "rem_applyop_c(VARARGS)" \
     --return "double" --remote --rname testcapi

Then run the following. It specifies the code be registered as a remote AE with a remote name of testcapi, which matches the code. In addition, it registers a launcher:

$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --language system --version 3 \
     --template udtf --exe apply --sig "rem_applyop_launch(int8)" \
     --return "TABLE(aeresult varchar(255))" --remote --rname testcapi --launch

With the exception of the --rname, --exe and the name portion of --sig, all launchers look like the above. In this case it is a UDTF, since that is the interface for all launchers. The value of --rname must match the name in the code.

Running

To run the AE in remote mode, the executable is run as a “server.” In this instance it handles only queries run on the host. Usually, AEs are started on the SPUs as well. Start an instance on the host:

SELECT * FROM TABLE WITH FINAL(rem_applyop_launch(0));
                                         AERESULT
---------------------------------------------------------------------------------
--
 tran: 7788 session: 16354 DATA slc: 0 hardware: 0 machine: bdrosendev process:
13117 thread: 13117
(1 row)

Notice that a different syntax is used to invoke the table function style launcher. This is the syntax used to call any UDTF-based AE. Now run the AE:

SELECT rem_applyop_c('+',1,2);
 REM_APPLYOP_C
---------------
 3
(1 row)

Finally, cause an error:

SELECT rem_applyop_c(1);
ERROR: first input COLUMN expected TO be a string type