Python language aggregates
This example uses the following file name: aggregate.py.
Code
The code in this example is slightly longer than the C language aggregate example, since the code handles all possible data types.
import nzae
class AggregateAe(nzae.Ae):
# The _runUda function is the exact function implemented for the AE class.
# Reproducing this function is not needed in user code to run an aggregate
# as it is inherited. The user can override this function to have
# finer control when running the aggregate AE.
def _runUda(self):
# LOOP, AGGREGATING.
while True:
# FETCH THE NEXT AGGREGATION.
aggregationType = self._getNextAggregation()
# INITIALIZE (AS APPROPRIATE).
if aggregationType == self.AGGREGATION_TYPE__INITIALIZE:
self._initializeState()
self._saveAggregateResult()
# ACCUMULATE (AS APPROPRIATE).
elif aggregationType == self.AGGREGATION_TYPE__ACCUMULATE:
self._accumulate(self.getState(), self.getInputRow())
self._saveAggregateResult()
# MERGE (AS APPROPRIATE).
elif aggregationType == self.AGGREGATION_TYPE__MERGE:
self._merge(self.getState(), self.getInputState())
self._saveAggregateResult()
# GET THE FINAL RESULT (AS APPROPRIATE).
elif aggregationType == self.AGGREGATION_TYPE__FINAL_RESULT:
result = self._finalResult(self.getState())
self._setAggregateResult(result, True)
self._saveAggregateResult()
# END AS APPROPRIATE.
elif aggregationType == self.AGGREGATION_TYPE__END:
return
# ERROR AS APPROPRIATE.
elif aggregationType == self.AGGREGATION_TYPE__ERROR:
raise Exception("Error calling nzaeAggNext(). Cause unknown.")
else:
raise Exception("Received unknown aggregation type.")
# The functions below must be overridden by the user
# while writing an aggregate AE.
def _initializeState(self):
self.setState(0, -2147483647)
def _accumulate(self, instate, row):
if isinstance(instate, (list, tuple)):
state = instate[0]
else:
state = instate
if isinstance(row, (list, tuple)):
for i in row:
if state is None:
state = i
else:
if state < i:
state = i
if state is None:
state = -2147483647
self.setState(0, state)
def _merge(self, instate, inputValues):
if isinstance(instate, (list, tuple)):
state = instate[0]
else:
state = instate
if isinstance(inputValues, (list, tuple)):
for i in inputValues:
if state is None:
state = i
else:
if state < i:
state = i
if state is None:
state = -2147483647
self.setState(0, state)
def _finalResult(self, state):
if isinstance(state, (list, tuple)):
return state[0]
else:
return -2147483647
AggregateAe.run()
Deployment
Deploy the script:
$NZ_EXPORT_DIR/ae/utilities/bin/compile_ae --language python64 \
--template deploy ./aggregate.py --version 3
Registration
Register the example as a UDA by using the state
parameter:
$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --language python64 --version 3 \
--template uda --exe aggregate.py --sig "maxae(int)" --return int4 \
–-state "(int4)"
Running
To run, first create a dummy table and then run the
aggregate:
CREATE TABLE grp_test (grp int4, val int4);
CREATE TABLE
INSERT INTO grp_test VALUES (1, 1);
INSERT 0 1
INSERT INTO grp_test VALUES (1, 2);
INSERT 0 1
INSERT INTO grp_test VALUES (1, 3);
INSERT 0 1
INSERT INTO grp_test VALUES (2, 4);
INSERT 0 1
SELECT maxae(val) FROM grp_test;
MAXAE
-------
4
(1 row)
SELECT grp, maxae(val) FROM grp_test GROUP BY grp;
GRP | MAXAE
-----+-------
1 | 3
2 | 4
(2 rows)
SELECT grp, maxae(val) over (partition BY grp) FROM grp_test;
GRP | MAXAE
-----+-------
1 | 3
1 | 3
1 | 3
2 | 4
(4 rows)