Agrégats du langage Python
Cet exemple utilise le nom de fichier suivant : " aggregate.py.
Coder
Le code de cet exemple est légèrement plus long que celui de l'exemple de l'agrégat en langage C, car il traite tous les types de données possibles.
import nzae
class AggregateAe(nzae.Ae):
# The _runUda function is the exact function implemented for the AE class.
# Reproducing this function is not needed in user code to run an aggregate
# as it is inherited. The user can override this function to have
# finer control when running the aggregate AE.
def _runUda(self):
# LOOP, AGGREGATING.
while True:
# FETCH THE NEXT AGGREGATION.
aggregationType = self._getNextAggregation()
# INITIALIZE (AS APPROPRIATE).
if aggregationType == self.AGGREGATION_TYPE__INITIALIZE:
self._initializeState()
self._saveAggregateResult()
# ACCUMULATE (AS APPROPRIATE).
elif aggregationType == self.AGGREGATION_TYPE__ACCUMULATE:
self._accumulate(self.getState(), self.getInputRow())
self._saveAggregateResult()
# MERGE (AS APPROPRIATE).
elif aggregationType == self.AGGREGATION_TYPE__MERGE:
self._merge(self.getState(), self.getInputState())
self._saveAggregateResult()
# GET THE FINAL RESULT (AS APPROPRIATE).
elif aggregationType == self.AGGREGATION_TYPE__FINAL_RESULT:
result = self._finalResult(self.getState())
self._setAggregateResult(result, True)
self._saveAggregateResult()
# END AS APPROPRIATE.
elif aggregationType == self.AGGREGATION_TYPE__END:
return
# ERROR AS APPROPRIATE.
elif aggregationType == self.AGGREGATION_TYPE__ERROR:
raise Exception("Error calling nzaeAggNext(). Cause unknown.")
else:
raise Exception("Received unknown aggregation type.")
# The functions below must be overridden by the user
# while writing an aggregate AE.
def _initializeState(self):
self.setState(0, -2147483647)
def _accumulate(self, instate, row):
if isinstance(instate, (list, tuple)):
state = instate[0]
else:
state = instate
if isinstance(row, (list, tuple)):
for i in row:
if state is None:
state = i
else:
if state < i:
state = i
if state is None:
state = -2147483647
self.setState(0, state)
def _merge(self, instate, inputValues):
if isinstance(instate, (list, tuple)):
state = instate[0]
else:
state = instate
if isinstance(inputValues, (list, tuple)):
for i in inputValues:
if state is None:
state = i
else:
if state < i:
state = i
if state is None:
state = -2147483647
self.setState(0, state)
def _finalResult(self, state):
if isinstance(state, (list, tuple)):
return state[0]
else:
return -2147483647
AggregateAe.run()
Déploiement
Déployer le script :
$NZ_EXPORT_DIR/ae/utilities/bin/compile_ae --language python64 \
--template deploy ./aggregate.py --version 3
Enregistrement
Enregistrez l'exemple en tant qu'UDA en utilisant le paramètre " state
$NZ_EXPORT_DIR/ae/utilities/bin/register_ae --language python64 --version 3 \
--template uda --exe aggregate.py --sig "maxae(int)" --return int4 \
–-state "(int4)"
En cours d'exécution
Pour l'exécuter, créez d'abord une table fictive, puis exécutez l'agrégat :
CREATE TABLE grp_test (grp int4, val int4);
CREATE TABLE
INSERT INTO grp_test VALUES (1, 1);
INSERT 0 1
INSERT INTO grp_test VALUES (1, 2);
INSERT 0 1
INSERT INTO grp_test VALUES (1, 3);
INSERT 0 1
INSERT INTO grp_test VALUES (2, 4);
INSERT 0 1
SELECT maxae(val) FROM grp_test;
MAXAE
-------
4
(1 row)
SELECT grp, maxae(val) FROM grp_test GROUP BY grp;
GRP | MAXAE
-----+-------
1 | 3
2 | 4
(2 rows)
SELECT grp, maxae(val) over (partition BY grp) FROM grp_test;
GRP | MAXAE
-----+-------
1 | 3
1 | 3
1 | 3
2 | 4
(4 rows)