Examples

This section contains Python for Spark scripting examples.

Basic scripting example for processing data

import spss.pyspark.runtime
from pyspark.sql.types import *

cxt = spss.pyspark.runtime.getContext() 

if  cxt.isComputeDataModelOnly():   
        _schema = cxt.getSparkInputSchema()   
        cxt.setSparkOutputSchema(_schema)
else:   
        _structType = cxt.getSparkInputSchema()
        df = cxt.getSparkInputData()   
        _newDF = df.sample(False, 0.01, 1)
        cxt.setSparkOutputData(_newDF)

Example model building script, using the LinearRegressionWithSGD algorithm

from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.mllib.regression import
LabeledPoint,LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.linalg import DenseVector
import numpy
import json 

import spss.pyspark.runtime
from spss.pyspark.exceptions import ASContextException

ascontext = spss.pyspark.runtime.getContext()
sc = ascontext.getSparkContext()
df = ascontext.getSparkInputData()

# field settings amd algorithm parameters

target = '%%target_field%%'
predictors = [%%predictor_fields%%]
num_iterations=%%num_iterations%%
prediction_field = "$LR-" + target

# save linear regression model to a filesystem path

def save(model, sc, path):
        data =
sc.parallelize([json.dumps({"intercept":model.intercept,"weights":model.weights.tolist()})])   
        data.saveAsTextFile(path)

# print model details to stdout

def dump(model,predictors):   
        print(prediction_field+" = " + str(model.intercept))   
        weights = model.weights.tolist()
        for i in range(0,len(predictors)):        
                print("\t+ "+predictors[i]+"*"+ str(weights[i]))

# check that required fields exist in the input data

input_field_names = [ty[0] for ty in df.dtypes[:]]
if target not in input_field_names:
        raise ASContextException("target field "+target+" not found") for predictor in predictors:
        if predictor not in input_field_names:        
                raise ASContextException("predictor field "+predictor+" not found")

# define map function to convert from dataframe Row objects to mllib LabeledPoint 

def row2LabeledPoint(target,predictors,row):
        pvals = []
        for predictor in predictors:        
                pval = getattr(row,predictor)        
                pvals.append(float(pval))
        tval = getattr(row,target)   
        return LabeledPoint(float(tval),DenseVector(pvals))

# convert dataframe to an RDD containing LabeledPoint

training_points = df.rdd.map(lambda row:
row2LabeledPoint(target,predictors,row))

# build the model 

model = LinearRegressionWithSGD.train(training_points,num_iterations,intercept=True) 

# write a text description of the model to stdout

dump(model,predictors)

# save the model to the filesystem and store into the output model content

modelpath = ascontext.createTemporaryFolder()
save(model,sc,modelpath)
ascontext.setModelContentFromPath("model",modelpath)

Example model scoring script, using the LinearRegressionWithSGD algorithm

import json
import spss.pyspark.runtime
from pyspark.sql import Row
from pyspark.mllib.regression import
LabeledPoint,LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.linalg import DenseVector
from pyspark.sql.context import SQLContext
import numpy
from pyspark.sql.types import DoubleType, StructField 

ascontext = spss.pyspark.runtime.getContext()
sc = ascontext.getSparkContext()

prediction_field = "$LR-" + '%%target_field%%'
predictors = [%%predictor_fields%%]

# compute the output schema by adding the prediction field
outputSchema = ascontext.getSparkInputSchema()
outputSchema.fields.append(StructField(prediction_field, 
DoubleType(), nullable=True))

# make a prediction based on a regression model and Dataframe Row object
# return a list containing the input row values and the predicted value
def predict(row,model,predictors,infields,prediction_field_name):
        pvals = []
        rdict = row.asDict()
        for predictor in predictors:        
                pvals.append(float(rdict[predictor]))   
        estimate = float(model.predict(pvals))   
        result = []
        for field in infields:        
                result.append(rdict[field])   
        result.append(estimate)   
        return result

# load a serialized model from the filesystem

def load(sc, path):
        js = sc.textFile(path).take(1)[0]
        obj = json.loads(js)   
        weights = numpy.array(obj["weights"])   
        intercept = obj["intercept"]   
        return LinearRegressionModel(weights,intercept)

ascontext.setSparkOutputSchema(outputSchema)

if not ascontext.isComputeDataModelOnly():
        # score the data in the input data frame
        indf = ascontext.getSparkInputData()

        model_path = ascontext.getModelContentToPath("model")
        model = load(sc,model_path)

        # compute the scores   
        infield_names = [ty[0] for ty in indf.dtypes[:]]   
        scores_rdd = indf.rdd.map(lambda row:predict(row,model,predictors,infield_names,prediction_field))   
        
        # create an output DataFrame containing the scores   
        sqlCtx = SQLContext(sc)
        outdf = sqlCtx.createDataFrame(scores_rdd,schema=outputSchema)

        # return the output DataFrame as the result
        ascontext.setSparkOutputData(outdf)