C++ 示例(多路同时连接)

在 C++ 中,有两种方法可以独立处理多个同时连接。 第一种方法是使用 pthreads 为每个线程创建一个线程。 第二种方法是分出一个新进程来处理连接,也就是本例中演示的方法。 此示例基于整形器和定型器示例,但对主程序进行了修改。

在本例中,getApi的调用值为 TRUE,表示叉模式,得到的是指针而不是引用。 在远程模式下,有两个进程。 父进程返回一个 NULL,子进程计数器递增,然后什么也不做,这样做两次。 在分叉后的子进程中,getApi会返回用于处理整形器或定型器的 API,然后退出。

本示例使用以下文件名:
fork.cpp

代码

#include <nzaefactory.hpp>

using namespace nz::ae;

static int run(nz::ae::NzaeFunction *aeFunc);
static int doShaper(nz::ae::NzaeShaper *aeShaper);

int main(int argc, char * argv[])
{
    NzaeApiGenerator helper;
    // The following line is only needed if a launcher is not used
    helper.setName("testcapi");
    int i = 0;
    int j=0;
    while (i < 1) {
        nz::ae::NzaeApi *api = helper.getApi(nz::ae::NzaeApi::ANY, true);
        if (helper.isRemote() && !api) {
            // fork mode (parent)
            j++;
            if (j == 2)
                break;
            continue;
        }
        if (api->apiType == nz::ae::NzaeApi::FUNCTION) {
            run(api->aeFunction);
            i++;
        }
        else {
            doShaper(api->aeShaper);
        }
        if (!helper.isRemote())
            break;
        if (api)
            break;
    }

    return 0;
}

class MyHandler : public nz::ae::NzaeFunctionMessageHandler
{
public:
    void evaluate(NzaeFunction& api, NzaeRecord &input, NzaeRecord &result) {
        const NzaeMetadata& m = api.getMetadata();
        nz::ae::NzaeField &field = input.get(0);
        if (!field.isNull() ) {
            nz::ae::NzaeField &f = result.get(0);
            if (field.type() == NzaeDataTypes::NZUDSUDX_NUMERIC32 ||
                field.type() == NzaeDataTypes::NZUDSUDX_NUMERIC64 ||
                field.type() == NzaeDataTypes::NZUDSUDX_NUMERIC128)
            {
                NzaeNumericField &nf = (NzaeNumericField&)field;
                if (m.getInputSize(0) != nf.precision() ||
                    m.getInputScale(0) != nf.scale()) {
                    NzaeNumeric128Field* np = nf.toNumeric128(38, 5);
                    f.assign(*np);
                    delete np;
                }
                else
                    f.assign(field);
            }
            else {
                f.assign(field);
            }
        }
    } 
};

class MyHandler2 : public nz::ae::NzaeShaperMessageHandler
{
public:
    void shaper(NzaeShaper& api){
        const NzaeMetadata& m = api.getMetadata();
        char name[2];
        if (api.catalogIsUpper())
            name[0] = 'I';
        else
            name[0] = 'i';
        name[1] = 0;

        if (m.getInputType(0) == NzaeDataTypes::NZUDSUDX_FIXED ||
            m.getInputType(0) == NzaeDataTypes::NZUDSUDX_VARIABLE ||
            m.getInputType(0) == NzaeDataTypes::NZUDSUDX_NATIONAL_FIXED ||
            m.getInputType(0) == NzaeDataTypes::NZUDSUDX_NATIONAL_VARIABLE) {
            api.addOutputColumnString(m.getInputType(0), name,
m.getInputSize(0));
        }
        else if (m.getInputType(0) == NzaeDataTypes::NZUDSUDX_NUMERIC128 ||
                m.getInputType(0) == NzaeDataTypes::NZUDSUDX_NUMERIC64 ||
                m.getInputType(0) == NzaeDataTypes::NZUDSUDX_NUMERIC32) {
            api.addOutputColumnNumeric(m.getInputType(0), name,
m.getInputSize(0),
                                             m.getInputScale(0));
        }
        else {
            api.addOutputColumn(m.getInputType(0), name);
        }
    }
};

static int doShaper(nz::ae::NzaeShaper *aeShaper)
{
    aeShaper->run(new MyHandler2());
    return 0;
}

static int run(nz::ae::NzaeFunction *aeFunc)
{
    aeFunc->run(new MyHandler());
    return 0;
}

编译

编译如下

$NZ_EXPORT_DIR/ae/utilities/bin/compile_ae --language cpp --template compile \
     --exe forkae --compargs "-g -Wall" --linkargs "-g" fork.cpp --version 3

注册

注册时,使用远程变体来演示只在远程模式下工作的叉模式:

$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --sig "rem_forkae(VARARGS)" \
     --return "table(ANY)" --language cpp --template udtf --exe forkae \
     --version 3 --remote --rname testcapi

$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --sig "rem_fork_launch(int8)" \
     --return "TABLE(aeresult varchar(255))" --language cpp --template udtf \
     --exe forkae --version 3 --remote --launch --rname testcapi

正在运行

要运行

SELECT * FROM TABLE WITH FINAL(rem_fork_launch(0));
                                 AERESULT
------------------------------------------------------------------------
 tran: 7522 DATA slc: 0 hardware: 0 machine: spubox1 process: 3800 thread: 3800
(1 row)
SELECT * FROM TABLE WITH FINAL(rem_fork_launch(0));
                              AERESULT
------------------------------------------------------------------------
tran: 7606 DATA slc: 0 hardware: 0 machine: spubox1 process: 6560 thread: 6560
(1 row)

SELECT * FROM TABLE WITH FINAL(rem_forkae('test'));
 I
------
 test
(1 row)