Create the C++ file for the UDA
To begin, use any text editor to create your C++ file. Your C++ file must include the udxinc.h header file, which contains the required declarations for user-defined aggregates and processing on the SPUs.
#include "udxinc.h"
In addition, make sure that you declare any of the standard C++ library header files that your aggregate might require. If your UDA requires any user-defined shared libraries, make sure that you note the name of the libraries as you need them when you register the UDA in the database.
User-defined shared libraries must exist in the database before you can register the UDA and specify those libraries as dependencies. You can register the UDA without specifying any library dependencies, and after the libraries are added, use the ALTER AGGREGATE command to update the UDA definition with the correct dependencies. For more information about user-defined shared libraries, see Create a user-defined shared library.
nz::udx_ver2.
(The API version 1 UDXs use the nz::udx namespace.)
Your C++ program must reference the correct namespace. For example:#include "udxinc.h"
using namespace nz::udx_ver2;This section uses udx_ver2 as the default namespace
for the examples that follow. The sections note the differences with
UDX version 1, and Sample user-defined functions and aggregates reference contains
examples of version 1 and version 2 definitions. You can continue
to create UDX version 1 UDAs and new version 2 UDAs; both versions
operate on release 6.0.x or later systems. However, the version 1
UDAs work on Netezza Performance Server release
5.0.x and later systems and thus might be more portable for your Netezza Performance Server systems.
#include "udxinc.h"
using namespace nz::udx_ver2;
class CPenMax: public nz::udx_ver2::Uda
{
public:
};class CPenMax : public nz::udx_ver2::Uda
{
public:
static nz::udx_ver2::Uda* instantiate(UdxInit *pInit)
virtual void initializeState();
virtual void accumulate();
virtual void merge();
virtual ReturnValue finalResult();
};
nz::udx_ver2::Uda* CPenMax::instantiate(UdxInit *pInit)
{
return new CPenMax(pInit);
}- The instantiate() method is called by the runtime
engine to create the object dynamically. The static implementation
must be outside of the class definition. In UDX version 2, the instantiate
method takes one argument (UdxInit *pInit), which enables access to
the memory specification, the log setting, and the UDX environment
in the constructor (see UDX environment). It creates an object of the derived class
type by using the new operator and returns it (as base class type
Uda) to the runtime engine. The runtime engine deletes the object
when it is no longer needed. An example follows:
class CPenMax : public nz::udx_ver2::Uda { public: CPenMax(UdxInit *pInit) : Uda(pInit) { } static nz::udx_ver2::Uda* instantiate(UdxInit *pInit); virtual void initializeState(); virtual void accumulate(); virtual void merge(); virtual ReturnValue finalResult(); }; nz::udx_ver2::Uda* CPenMax::instantiate(UdxInit *pInit) { return new CPenMax(pInit); } - The initializeState() method is called to allow
the implementer to initialize the necessary state that is used in
the UDA. The state of a UDA is one or more values that must be valid Netezza Performance Server data
types. The state is automatically preserved by the runtime engine
between snippets, if necessary. To calculate the penultimate maximum,
the function must track the largest two numbers in state variables.
The initializeState() method sets both the variables
to NULL. The states are declared in the CREATE AGGREGATE command,
which is described later. An example follows:
void CPenMax::initializeState() { setStateNull(0, true); // set current max to null setStateNull(1, true); // set current penmax to null } - The accumulate() method is called once per
row and adds the contribution of its arguments to the accumulator
state. It updates the states to keep the highest two values in the
correct states. In addition to getting the arguments through
int curVal = int32Arg(0);, the method retrieves the two state variables by using the int32State(int) and isStateNull(int) functions. The accumulate method updates the states as required.void CPenMax::accumulate() { int *pCurMax = int32State(0); bool curMaxNull = isStateNull(0); int *pCurPenMax = int32State(1); bool curPenMaxNull = isStateNull(1); int curVal = int32Arg(0); bool curValNull = isArgNull(0); if ( !curValNull ) { // do nothing if argument is null - can't //affect max or penmax if ( curMaxNull ) { // if current max is null, this arg //becomes current max setStateNull(0, false); // current max no longer null *pCurMax = curVal; } else { if ( curVal > *pCurMax ) { // if arg is new max setStateNull(1, false); // then prior current max // becomes current penmax *pCurPenMax = *pCurMax; *pCurMax = curVal; // and current max gets arg } else if ( curPenMaxNull || curVal > *pCurPenMax ){ // arg might be greater than current penmax setStateNull(1, false); // it is *pCurPenMax = curVal; } } } } - The merge() method is called with arguments
of a second set of state variables and merges this second state into
its own state variables. This method is necessary because the Netezza Performance Server system
is a parallel-processing architecture, and the aggregate states from
all SPUs are sent to the host, where they are consolidated into a
single merged aggregation state. The merge() method
merges two states, handling all the null values states correctly.
One of the states is passed in normally as in accumulate().
The second state is passed in as arguments, requiring the use of argument
retrieval functions such as int32Arg(int) and isArgNull(int) to
retrieve. An example follows:
void CPenMax::merge() { int *pCurMax = int32State(0); bool curMaxNull = isStateNull(0); int *pCurPenMax = int32State(1); bool curPenMaxNull = isStateNull(1); int nextMax = int32Arg(0); bool nextMaxNull = isArgNull(0); int nextPenMax = int32Arg(1); bool nextPenMaxNull = isArgNull(1); if ( !nextMaxNull ) { // if next max is null, then so is //next penmax and we do nothing if ( curMaxNull ) { setStateNull(0, false); // current max was null, // so save next max *pCurMax = nextMax; } else { if ( nextMax > *pCurMax ) { setStateNull(1, false); // next max is greater than current, so save next *pCurPenMax = *pCurMax; // and make current penmax prior current max *pCurMax = nextMax; } else if ( curPenMaxNull || nextMax > *pCurPenMax ) { // next max may be greater than current penmax setStateNull(1, false); // it is *pCurPenMax = nextMax; } } if ( !nextPenMaxNull ) { if ( isStateNull(1) ) { // can't rely on curPenMaxNull here, might have // change state var null flag above setStateNull(1, false); // first non-null penmax, // save it *pCurPenMax = nextPenMax; } else { if ( nextPenMax > *pCurPenMax ) { *pCurPenMax = nextPenMax; // next penmax greater than current, save it } } } } } - The finalResult() method returns the final
aggregation value from the accumulated state. An example might be
a UDA implementation of an average aggregation, where the finalResult() method
divides the sum by the count to produce an average. In this example,
the finalResult() method gathers one of the states
and returns it by using the NZ_UDX_RETURN_INT32 macro in a similar
fashion to evaluate() in the UDF case.
ReturnValue CPenMax::finalResult() { int curPenMax = int32Arg(1); bool curPenMaxNull = isArgNull(1); if ( curPenMaxNull ) NZ_UDX_RETURN_NULL(); setReturnNull(false); NZ_UDX_RETURN_INT32(curPenMax); }
The NZ_UDX_RETURN_INT32 macro helps to confirm that the return value is of the expected type. For a list of the available return macros, see UDX return value macros. The finalResult() method can access all of the data type helper API calls and access a list of state arguments that are listed in UDA state arguments.