Spark SQL Expression
The Spark SQL Expression processor performs record-level Spark SQL calculations and writes the results to new or existing fields.
The Spark SQL Expression processor performs calculations on a record-by-record basis. To transform batches of data using a Spark SQL query, use the Spark SQL Query processor.
When you configure an expression, you define the name of the field to receive the results of the calculation, then specify the Spark SQL expression to use. You can configure multiple expressions in a single processor.
If you specify an output field that does not exist, the Spark SQL Expression processor creates the field for the results of the calculation. If you specify a field that exists, the processor replaces the existing value with the results of the calculation.
Expressions
You can define one or more expressions in the Spark SQL Expression processor. The processor performs the calculations on a record-by-record basis.
You can use an expression as simple as current_timestamp()
to add the
time of processing to each record, or you can create as complex an expression as needed.
- To base a calculation on field values, you need to reference the field. For tips
on referencing fields, see Referencing Fields in Spark SQL Expressions.For example, the following expression converts the data in the
message
field to uppercase:upper(message)
- When you define multiple expressions, the processor evaluates them in the
specified order. You can, therefore, use the results of one expression in
subsequent expressions in the list.For example, say the following expression adjusts the value of sales data in the
total
field and writes the results to a field namedtotal_adjusted
:total - (total * .03)
Then, you can define additional expressions that use the new
total_adjusted
field, as long as they are listed below the expression that creates the field. - You can use any Spark SQL
syntax, including functions such as
isnull
ortrim
and operators such as=
or<=
.For example, the following expression returns the hour of the current timestamp and places it in the configured output field:hour(current_timestamp())
You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs.
For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.