Ejemplos

Esta sección contiene ejemplos de scripts de Python para Spark.

Ejemplo de scripts básicos para procesar datos

import spss.pyspark.runtime
from pyspark.sql.types import *

cxt = spss.pyspark.runtime.getContext() 

if  cxt.isComputeDataModelOnly():   
        _schema = cxt.getSparkInputSchema()   
        cxt.setSparkOutputSchema(_schema)
else:   
        _structType = cxt.getSparkInputSchema()
        df = cxt.getSparkInputData()   
        _newDF = df.sample(False, 0.01, 1)
        cxt.setSparkOutputData(_newDF)

Ejemplo del script de generación del modelo, utilizando el algoritmo LinearRegressionWithSGD

from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.mllib.regression import
LabeledPoint,LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.linalg import DenseVector
import numpy
import json 

import spss.pyspark.runtime
from spss.pyspark.exceptions import
ASContextException

ascontext = spss.pyspark.runtime.getContext()
sc = ascontext.getSparkContext()
df = ascontext.getSparkInputData()

# valores de campo y parámetros de algoritmo

target = '%%target_field%%'
predictors = [%%predictor_fields%%]
num_iterations=%%num_iterations%%
prediction_field = "$LR-" + target

# guardar modelo de regresión lineal en una vía de acceso del
sistema de archivos

def save(model, sc, path):
        data =
sc.parallelize([json.dumps({"intercept":model.intercept,"weights":model.weights.tolist()})])   
        data.saveAsTextFile(path)

# imprimir detalles del modelo en salida estándar

def dump(model,predictors):   
        print(prediction_field+" = " + str(model.intercept))   
        weights = model.weights.tolist()
        for i in range(0,len(predictors)):        
                print("\t+ "+predictors[i]+"*"+ str(weights[i]))

# comprobar que los campos necesarios existen en los datos de
entrada

input_field_names = [ty[0] for ty in df.dtypes[:]]
if target not in input_field_names:
        raise ASContextException("target field "+target+" not found") for predictor in predictors:
        if predictor not in input_field_names:        
                raise ASContextException("predictor field "+predictor+" not found")

# definir función de correlación para convertir de los objetos Row
del marco de datos a mllib LabeledPoint 

def row2LabeledPoint(target,predictors,row):
        pvals = []
        for predictor in predictors:        
                pval = getattr(row,predictor)        
                pvals.append(float(pval))
        tval = getattr(row,target)   
        return LabeledPoint(float(tval),DenseVector(pvals))

# convertir marco de datos a un RDD que contiene LabeledPoint

training_points = df.rdd.map(lambda row:
row2LabeledPoint(target,predictors,row))

# generar el modelo 

model = LinearRegressionWithSGD.train(training_points,num_iterations,intercept=True) 

# escribir una descripción de texto del modelo en la salida estándar

dump(model,predictors)

# guardar el modelo en el sistema de archivos y almacenar en el
contenido del modelo de salida

modelpath = ascontext.createTemporaryFolder()
save(model,sc,modelpath)
ascontext.setModelContentFromPath("model",modelpath)

Ejemplo de script de puntuación, utilizando el algoritmo LinearRegressionWithSGD

import json
import spss.pyspark.runtime
from pyspark.sql import Row
from pyspark.mllib.regression import
LabeledPoint,LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.linalg import DenseVector
from pyspark.sql.context import SQLContext
import numpy
from pyspark.sql.types import DoubleType, StructField 

ascontext = spss.pyspark.runtime.getContext()
sc = ascontext.getSparkContext()

prediction_field = "$LR-" + '%%target_field%%'
predictors = [%%predictor_fields%%]

# calcular el esquema de salida añadiendo el campo de predicción
outputSchema = ascontext.getSparkInputSchema()
outputSchema.fields.append(StructField(prediction_field, 
DoubleType(), nullable=True))

# realizar una predicción basándose en un modelo de regresión y un
objeto Row de marco de datos
# devolver una lista que contiene los valore de fila de entrada y el
valor predicho
def predict(row,model,predictors,infields,prediction_field_name):
        pvals = []
        rdict = row.asDict()
        for predictor in predictors:        
                pvals.append(float(rdict[predictor]))   
        estimate = float(model.predict(pvals))   
        result = []
        for field in infields:        
                result.append(rdict[field])   
        result.append(estimate)   
        return result

# cargar un modelo serializado desde el sistema de archivos

def load(sc, path):
        js = sc.textFile(path).take(1)[0]
        obj = json.loads(js)   
        weights = numpy.array(obj["weights"])   
        intercept = obj["intercept"]   
        return LinearRegressionModel(weights,intercept)

ascontext.setSparkOutputSchema(outputSchema)

if not ascontext.isComputeDataModelOnly():
        # puntuar los datos en el marco de datos de entrada
        indf = ascontext.getSparkInputData()

        model_path = ascontext.getModelContentToPath("model")
        model = load(sc,model_path)

        # calcular las puntuaciones   
        infield_names = [ty[0] for ty in indf.dtypes[:]]   
        scores_rdd = indf.rdd.map(lambda row:predict(row,model,predictors,infield_names,prediction_field))   
        
        # crear un DataFrame de salida que contenga las puntuaciones   
        sqlCtx = SQLContext(sc)
        outdf = sqlCtx.createDataFrame(scores_rdd,schema=outputSchema)

        # devolver al DataFrame de salida como el resultado
        ascontext.setSparkOutputData(outdf)