#### script example for Python for Spark
applyModel = stream.findByType("extension_apply", None)
score_script = """
import json
import spss.pyspark.runtime
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.tree import DecisionTreeModel
from pyspark.sql.types import StringType, StructField
cxt = spss.pyspark.runtime.getContext()
if cxt.isComputeDataModelOnly():
_schema = cxt.getSparkInputSchema()
_schema.fields.append(StructField("Prediction", StringType(), nullable=True))
cxt.setSparkOutputSchema(_schema)
else:
df = cxt.getSparkInputData()
_modelPath = cxt.getModelContentToPath("TreeModel")
metadata = json.loads(cxt.getModelContentToString("model.dm"))
schema = df.dtypes[:]
target = "Drug"
predictors = ["Age","BP","Sex","Cholesterol","Na","K"]
lookup = {}
for i in range(0,len(schema)):
lookup[schema[i][0]] = i
def row2LabeledPoint(dm,lookup,target,predictors,row):
target_index = lookup[target]
tval = dm[target_index].index(row[target_index])
pvals = []
for predictor in predictors:
predictor_index = lookup[predictor]
if isinstance(dm[predictor_index],list):
pval = row[predictor_index] in dm[predictor_index] and dm[predictor_index].index(row[predictor_index]) or -1
else:
pval = row[predictor_index]
pvals.append(pval)
return LabeledPoint(tval, DenseVector(pvals))
# convert dataframe to an RDD containing LabeledPoint
lps = df.rdd.map(lambda row: row2LabeledPoint(metadata,lookup,target,predictors,row))
treeModel = DecisionTreeModel.load(cxt.getSparkContext(), _modelPath);
# score the model, produces an RDD containing just double values
predictions = treeModel.predict(lps.map(lambda lp: lp.features))
def addPrediction(x,dm,lookup,target):
result = []
for _idx in range(0, len(x[0])):
result.append(x[0][_idx])
result.append(dm[lookup[target]][int(x[1])])
return result
_schema = cxt.getSparkInputSchema()
_schema.fields.append(StructField("Prediction", StringType(), nullable=True))
rdd2 = df.rdd.zip(predictions).map(lambda x:addPrediction(x, metadata, lookup, target))
outDF = cxt.getSparkSQLContext().createDataFrame(rdd2, _schema)
cxt.setSparkOutputData(outDF)
"""
applyModel.setPropertyValue("python_syntax", score_script)