Developing cognitive IoT solutions for anomaly detection by using deep learning, Part 4

# Using Apache SystemML for anomaly detection

Create a deep learning neural network on Apache Spark with Apache SystemML

### Content series:

## This content is part # of 5 in the series: Developing cognitive IoT solutions for anomaly detection by using deep learning, Part 4

## This content is part of the series:Developing cognitive IoT solutions for anomaly detection by using deep learning, Part 4

Stay tuned for additional content in this series.

At this point in the series of articles I've introduced you to deep learning and long-short term memory (LSTM) networks, shown you how to generate data for anomaly detection, and taught you how to use the Deeplearning4j toolkit. In this fourth article, I talk about Apache SystemML.

As a reminder, our task is to detect anomalies in vibration (accelerometer) sensor data in a bearing as shown in Figure 1.

##### Figure 1. Accelerometer sensor on a bearing records vibrations on each of the three geometrical axes x, y, and z

When talking about deep learning a lot of people talk about libraries such as Caffe, TensorFlow, and PyTorch. Those are great tools, but when you are familiar with Apache Spark – and I love how efficient Apache Spark scales out on large clusters – you have to ask yourself if there is a better tool to do deep learning on Apache Spark. I introduced you to the DeepLearning4J toolkit with its integrated runtime module and ND4J library already, but now I will talk about Apache SystemML.

## What you’ll need to build your app

- An IBM Cloud account. (Sign up for an IBM Cloud Lite account, a free account that never expires.)
- An IBM Watson Studio account. You can use the IBM ID you've created
while registering for the IBM Cloud Account.
To get started using IBM Watson Studio, including signing up for an account, watch the videos in this video collection on developerWorks TV.

## Setting up your development environment

Before we talk about the deep learning use case, spend some time setting up your development environment. We use Jupyter Notebooks running inside IBM Watson Studio. We need to work with these different notebooks:

- The first notebook is used to explain how a feed-forward neural network works with basic Python (that is, not running on Apache Spark).
- The second notebook uses Apache SystemML and is integrated with the Watson IoT platform.

Instead of accessing data from the object store, we'll be accessing it directly from MQTT by using the Watson IoT platform in realtime.

- Log in to datascience.ibm.com.
- To understand how a feed-forward neural network works with basic
Python, import a notebook.
- Click
**Tools > Notebook**. - Click
**From URL**, in the**Name**field add a name, and in the**Notebook URL**field, paste the following URL: https://raw.githubusercontent.com/romeokienzler/developerWorks/master/systemml/PythonFeedforward.ipynb - Click
**Create Notebook**.**Note:**Do not run this notebook now. We'll refer to it later in the article when we practice training a small neural network using Python.

- Click
- To work with Apache SystemML and the Watson IoT platform, import a
notebook.
- Click
**Tools > Notebook**. - Click
**From URL**, in the**Name**field add a name, and in the**Notebook URL**field, paste the following URL: https://raw.githubusercontent.com/romeokienzler/developerWorks/master/systemml/WatsonIoTPlatformSystemMLLSTM.ipynb - Click
**Create Notebook**.

- Click

**Note:** Do not run this notebook now. We'll refer to it
later in the article when we practice training a small neural network in
Python and when we implement a gradient descent to minimize functions.

## What is Apache SystemML?

Apache SystemML began in 2007 as a research project in the IBM Almaden Research lab in California; it is older than Apache Spark. The project's intent was to improve the workflow of data scientists, especially those who want to improve and add functionality to existing machine learning algorithms. SystemML is divided into three core components:

- A
**runtime**component that transparently distributes work on Apache Spark. - A
**declarative machine learning (DML)****language**– an R-like syntax - that can express linear algebra and control structures used in advanced machine learning and deep learning. - A
**cost-based optimizer**that decides which linear algebra operations should run multithreaded as SIMD instructions on CPUs or GPUs, scaled-out using a cluster, or a combination.

### Transparent, distributed runtime streamlines processing

Running Apache SystemML on Apache Spark solves the data parallelization challenge in data processing and machine learning. Unlike other frameworks, SystemML runs transparently. Other deep learning frameworks like TensorFlow bring their own distributed computing environment with them and that needs additional effort to operate.

### Declarative machine learning language implements linear algebra

Since its inception, Apache SystemML has gone through multiple re-factorings and is now one of the fastest machine learning libraries on the planet. Recently, deep learning support was added. This was quite easy to do because all the linear algebra to make this possible is implemented in the SystemML DML language.

Ten years ago, researchers realized out-of-the box machine learning algorithms perform very poorly on large data sets. Figure 2 illustrates the data analysis pipeline that had to be tuned after a small-scale version had been prototyped in MATLAB (matrix laboratory), R, or Python.

##### Figure 2. Data analysis pipeline

### A cost-based optimizer improves processing of machine learning algorithms

The key component on Apache SystemML is the cost-based optimizer. SystemML supports parallel neural network training at the lowest level – in linear algebra. This optimizer turns a high-level description of an algorithm in a domain-specific language (DSL) into a highly optimized physical execution on Apache Spark.

What exactly is going on in the SystemML optimizer? The first thing the engine does is a compile step on the DSL: syntax checking, live variable analysis to determine which intermediate results still are needed, and a semantic check.

After the compile step is passed, an execution plan using high-level operators (HOPs) is generated. HOPs are constructed from the abstract syntax tree (AST) of the DSL. The following important optimization steps are taking place during that phase:

**Static rewrites**

The DSL offers a rich set of syntactic and semantic features that make an implementation easy to understand, but might result in a non-optimal execution. SystemML detects the non-optimal expressions in the AST and rewrites them to a better performing expression (of course maintaining the semantical equivalency).**Dynamic rewrites**

Dynamic rewrites are like static rewrites, but consider the data sizes used in the expressions.

Figure 3 illustrates a rewrite where an entire
expression in a HOP directed acyclic graph (DAG) is actually rewritten to
use a fused operator called **wdivmm **(weighted divide
matrix multiplication). The **wdivmm** operator is a way to
avoid very large, dense intermediates and save compute by exploiting
sparsity of the weight matrix `W`

.

##### Figure 3. Static rewrite of the weighted divide matrix multiplication HOP (high-level operators)

Let's look at how low-level operators (LOPs) are selected and
optimized. We'll stick to the weighted divide matrix multiplication
example shown in Figure 3. Remember this HOP
was selected during the HOP optimization process. Now the question is:
does it make sense to use a parallel version of the corresponding LOP on
the Apache Spark worker nodes or is a local execution preferable? Figure 4 shows that Apache SystemML
determines that the entire computation including input, intermediates, and
output fit into the main memory of the driver node and therefore it
chooses the local operator **wdivmm** over the distributed
operator **mapwdivmm**.

##### Figure 4. Low-level operator execution type based on estimated input, intermediate, and output matrix sizes

Is all this cost-based optimization effort worth it? Let's look at
some performance comparisons in Figure 5
between a local R script, MLLib, and SystemML.

##### Figure 5. Runtime performance comparisons - This image is licensed under the Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License. To view a copy of this license, visithttp://creativecommons.org/licenses/by-nc-nd/4.0/.

The ALS algorithm (or alternating least squares, a matrix-factorization algorithm often used in recommender systems) was run on different data sets with 1.2, 12,and 120 GB of data using R, MLLib, and SystemML. We can clearly see that even on the smallest data set R is not a feasible solution because it took more than 24 hours, and we are not sure if it would ever have completed. On the 12 GB data set we've noticed that SystemML runs significantly faster than MLLib. And finally, on the 120 GB data set, the ALS implementation of MLLib didn't finish within one day and we gave up. The clear winner is SystemML with a runtime performance usable for production use.

## Practice training a small neural network in Python

Now that you see what SystemML can do, I want to show you how to create a neural network. Before we start with our IoT time-series data, start with this Python example we imported in the PythonFeedforward Notebook. You can follow along using the notebook you imported when you set up your development environment.

- Log in to datascience.ibm.com.
- Click
**Default Project**. - To edit the notebook, click the pencil symbol near the notebook.
- To run the notebook, click the run symbol.

First, we implement a single, hidden-layer, feed-forward neural network. Next, we enhance it with a gradient descent (an algorithm that minimizes functions). We significantly improve performance by using SystemML to run all linear algebra operations in parallel. Finally, we leverage the deep neural network library in Apache SystemML to simplify algebra operations.

### Implement a single, hidden-layer, feed-forward neural network

Consider the following Python class. This code implements a single,
hidden-layer, feed-forward neural network. It not only implements the
forward pass, but also the back-propagation pass. This pass is implemented
as the first derivative of the actual back-propagation function, which
obtains the gradients of the direction in which we need to adjust the
weight matrices `W1`

and `W2`

to fit the network to
our data.

import numpy as np class Neural_Network(object): def __init__(self): #Define Hyperparameters self.inputLayerSize = 2 self.outputLayerSize = 1 self.hiddenLayerSize = 3 #Weights (parameters) self.W1 = np.random.randn(self.inputLayerSize, self.hiddenLayerSize) self.W2 = np.random.randn(self.hiddenLayerSize, self.outputLayerSize) def forward(self, X): #Propagate inputs though network self.z2 = np.dot(X, self.W1) self.a2 = self.sigmoid(self.z2) self.z3 = np.dot(self.a2, self.W2) yHat = self.sigmoid(self.z3) return yHat def sigmoid(self, z): #Apply sigmoid activation function to scalar, vector, or matrix return 1/(1+np.exp(-z)) def costFunction(self, X, y): #Compute cost for given X,y, use weights already stored in class. self.yHat = self.forward(X) #print y #print self.yHat J = 0.5*sum((y-self.yHat)**2) return J def sigmoidPrime(self,z): #Gradient of sigmoid return np.exp(-z)/((1+np.exp(-z))**2) def tanh(self,x): return np.tanh(x) def tanh_deriv(self,x): return 1.0 - np.tanh(x)**2 def logistic(x): return 1/(1 + np.exp(-x)) def logistic_derivative(x): return logistic(x)*(1-logistic(x)) def costFunctionPrime(self, X, y): #Compute derivative with respect to W and W2 for a given X and y: self.yHat = self.forward(X) delta3 = np.multiply(-(y-self.yHat), self.sigmoidPrime(self.z3)) #delta3 = np.multiply(-(y-self.yHat), self.z3) dJdW2 = np.dot(self.a2.T, delta3) delta2 = np.dot(delta3, self.W2.T)*self.sigmoidPrime(self.z2) dJdW1 = np.dot(X.T, delta2) return dJdW1, dJdW2

### Implement a gradient descent to minimize functions

Next, we will implement a gradient descent to minimize functions. We are
calculating the gradients `dJdW1`

and `dJdW2`

. By
using the following updatefunction `NN.W - learningRate * dJdW`

we gradually push the weight matrices `W1`

and `W2`

in the correct direction. However, note that this code is not running in
parallel at all.

NN = Neural_Network() max_iterations = 10000 iter = 0 learningRate = 0.01 while iter < max_iterations: dJdW1, dJdW2 = NN.costFunctionPrime(X,y) #update NN.W1 = NN.W1 - learningRate * dJdW1 NN.W2 = NN.W2 - learningRate * dJdW2 if iter % 1000 == 0: print NN.costFunction(X,y) iter = iter + 1

Apache SystemML is able to parallelize and distribute these linear algebra and deep learning operations. How can we parallelize this code easily on Apache Spark? By using SystemML.

Let's switch from this slow, single-threaded implementation to SystemML. You can follow along with this code by opening and using the WatsonIoTPlatformSystemMLLSTM Notebook in Watson Studio you imported when you set up your development environment. This code is very similar to the Python code before, but is implemented in SystemML DML language.

sigmoid = function(matrix[double] z) return (matrix[double] z) { z = 1/(1+exp(-z)) } sigmoidPrime = function(matrix[double] z) return (matrix[double] z) { #Gradient of sigmoid z = exp(-z)/(1+exp(-z)) } inputLayerSize = 2 outputLayerSize = 1 hiddenLayerSize = 3 W1 = rand(rows=inputLayerSize,cols=hiddenLayerSize) W2 = rand(rows=hiddenLayerSize,cols=outputLayerSize) feedForward = function (matrix[double] X, matrix[double] W1, matrix[double] W2) return (matrix[double] z2,matrix[double] a2,matrix[double] z3,matrix[double] Y) { z2 = X %*% W1 a2 = sigmoid(z2) z3 = (a2 %*% W2) Y = sigmoid(z3) } gradient = function(matrix[double] X, matrix[double] W1, matrix[double] W2, matrix[double] Y) return (matrix[double] dJdW1,matrix[double] dJdW2) { #Compute derivative with respect to W and W2 for a given X and y: [z2,a2,z3,Yhat] = feedForward(X,W1,W2) smpz3 = sigmoidPrime(z3) delta3 = -(Y-Yhat) * smpz3 dJdW2 = t(a2) %*% delta3 smpz2 = sigmoidPrime(z2) delta2 = (delta3 %*% t(W2))*smpz2 dJdW1 = t(X) %*% delta2 } upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1)) upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2)) mu = 0.9 max_iterations = 10000 iter = 0 learningRate = 0.001 [z2,a2,z3,YhatBefore] = feedForward(X,W1,W2) while( iter < max_iterations ){ [dJdW1, dJdW2] = gradient(X,W1,W2,y) #update lrdJdW1 = learningRate * dJdW1 lrdJdW2 = learningRate * dJdW2 W1 = W1 - lrdJdW1 W2 = W2 - lrdJdW2 iter = iter + 1 } [z2,a2,z3,YhatAfter] = feedForward(X,W1,W2)

### Simplify the linear algebra with the SystemML deep neural network library

Writing all this linear algebra code can be laborious. Therefore, SystemML provides a deep neural network library. Consider the following code which re-implements our example using the library.

It looks much simpler, doesn't it? We are not using primitive linear algebra operations, instead we are calling forward and backward functions of the deep neural network library of SystemML.

source("nn/layers/affine.dml") as affine source("nn/layers/conv2d_builtin.dml") as conv2d source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss source("nn/layers/dropout.dml") as dropout source("nn/layers/l2_reg.dml") as l2_reg source("nn/layers/max_pool2d_builtin.dml") as max_pool2d source("nn/layers/relu.dml") as relu source("nn/layers/softmax.dml") as softmax source("nn/layers/sigmoid.dml") as sigmoid source("nn/optim/sgd_nesterov.dml") as sgd_nesterov # inputLayerSize = 2 outputLayerSize = 1 hiddenLayerSize = 3 [W1, b1] = affine::init(inputLayerSize, hiddenLayerSize) [W2, b2] = affine::init(hiddenLayerSize, outputLayerSize) sigmoidPrime = function(matrix[double] z) return (matrix[double] z) { #Gradient of sigmoid z = exp(-z)/(1+exp(-z)) } #W1 = rand(rows=inputLayerSize,cols=hiddenLayerSize) #W2 = rand(rows=hiddenLayerSize,cols=outputLayerSize) feedForward = function (matrix[double] X, matrix[double] b1, matrix[double] b2, matrix[double] W1, matrix[double] W2) return (matrix[double] z2,matrix[double] a2,matrix[double] z3,matrix[double] Y) { z2 = affine::forward(X, W1, b1) a2 = sigmoid::forward(z2) z3 = affine::forward(a2, W2, b2) Y = sigmoid::forward(z3) } gradient = function(matrix[double] X, matrix[double] b1, matrix[double] b2, matrix[double] W1, matrix[double] W2, matrix[double] Y) return (matrix[double] dJdW1,matrix[double] dJdW2) { #Compute derivative with respect to W and W2 for a given X and y: [z2,a2,z3,Yhat] = feedForward(X,b1,b2,W1,W2) loss = cross_entropy_loss::backward(Yhat, Y) smpz3 = sigmoid::backward(loss,z3) [delta2,dJdW2,db2] = affine::backward(smpz3,a2,W2,b2) smpz2 = sigmoid::backward(delta2,z2) [delta2,dJdW1,db1] = affine::backward(smpz2,X,W1,b1) } upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1)) upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2)) mu = 0.9 max_iterations = 10000 iter = 0 learningRate = 0.001 [z2,a2,z3,YhatBefore] = feedForward(X,b1,b2,W1,W2) while( iter < max_iterations ){ smy = sigmoid::forward(y) [dJdW1, dJdW2] = gradient(X,b1,b2,W1,W2,smy) #update lrdJdW1 = learningRate * dJdW1 lrdJdW2 = learningRate * dJdW2 W1 = W1 - lrdJdW1 W2 = W2 - lrdJdW2 iter = iter + 1 } [z2,a2,z3,YhatAfter] = feedForward(X,b1,b2,W1,W2)

## Create a SystemML neural network for anomaly detection

Learning how to train a neural network in Python was educational, but now we need to build something useful on Apache Spark using Apache SystemML and its deep learning library with a generated data set. (Remember, we used a Lorenz Attractor model to get simulated real-time vibration sensor data in a bearing. And we need to get that data to the IBM Cloud platform. See the tutorial on how to generate data for anomaly detection.) When we set up our development environment we imported the WatsonIoTPlatformSystemMLLSTM Notebook and we'll take a look at it now.

### Train a feed-forward neural network for unsupervised machine learning

To feed three-dimensional time series data to a feed-forward neural network it makes sense to add a so-called "feature engineering" step. We'll use the DFT (discrete Fourier transform) to transform from the time to the frequency domain. Look at Figure 6 to get an idea of how the data looks in the time domain for the first 3,000 samples in a healthy state.

##### Figure 6. Data in the time domain in a healthy state

Notice that while this system oscillates between two semi-stable states, it is hard to identify any regular patterns.

Look at the same chart in Figure 7 after we've switched the test data generator to a broken state.

##### Figure 7. Data in the time domain in a broken state

The obvious result is that we see much more energy in the system. The peaks are exceeding 200 in contrast to the healthy state which never went over 50. Also, in my opinion, the frequency content of the second signal is higher.

Let's confirm the frequency of the second signal is higher by transforming the signal from the time to the frequency domain. The chart in Figure 8 contains the frequencies of the healthy signal.

##### Figure 8. Data in the frequency domain in a healthy state

And now let's contrast this with the broken signal in Figure 9.

##### Figure 9. Data in the frequency domain in a broken state

As expected, there are a lot more frequencies present in the broken signal.

We have enough evidence to construct an anomaly detector based on
*supervised* machine learning (with a state-of-the-art model
like a gradient boosted tree). But we want *unsupervised* machine
learning because we have no idea which parts of the signal are normal and
which are not.

A simple approach to unsupervised machine learning is to feed those 3,000 frequency bands (remember, DFT returns as many frequency bands as we have samples in the signal, and because we are sampling with 100 Hz for 30 seconds from the physical model this is also the number of frequency bands).

With this approach we have transformed our three-dimensional input data
(the three accelerometer axes we are measuring) into a 9,000 dimensional
data set (the 3,000 frequency bands per accelerometer axis). This is our
new 9,000 dimensional input feature space. We can use the 9,000
dimensional input space to train a feed-forward neural network. Our hidden
layer in the feed-forward neural network has only 100 neurons (instead of
the 9,000 we have in the input and output layer). This is called a
*bottleneck* and turns our neural network into an autoencoder. We
train the neural network by assigning the inputs on the input and output
layers. The neural network will learn to reconstruct the input on the
output. But the neural network has to learn the reconstruction going
through the 100 neuron hidden-layer bottleneck. This way we prevent the
neural network from learning about any noise or irrelevant data. The
Apache SystemML code we imported into our Notebook defines such a neural
network.

script = """ source("nn/layers/affine.dml") as affine source("nn/layers/conv2d_builtin.dml") as conv2d source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss source("nn/layers/l2_loss.dml") as l2_loss source("nn/layers/dropout.dml") as dropout source("nn/layers/l2_reg.dml") as l2_reg source("nn/layers/max_pool2d_builtin.dml") as max_pool2d source("nn/layers/relu.dml") as relu source("nn/layers/softmax.dml") as softmax source("nn/layers/sigmoid.dml") as sigmoid source("nn/optim/sgd_nesterov.dml") as sgd_nesterov source("nn/optim/rmsprop.dml") as rmsprop # inputLayerSize = 3000 outputLayerSize = 3000 hiddenLayer1Size = 100 Xt = t(X) n = nrow(Xt) means = colSums(Xt)/n stds = sqrt((colSums(Xt^2)/n - means*means)*n/(n-1)) + 1e-17 Xt = (Xt - means)/stds Xt = 1/(1+exp(-Xt)) y = Xt [W1, b1] = affine::init(inputLayerSize, hiddenLayer1Size) [W2, b2] = affine::init(hiddenLayer1Size, outputLayerSize) if (pushWeights) { W1 = W1push W2 = W2push } feedForward = function (matrix[double] X, matrix[double] b1, matrix[double] b2, #matrix[double] b3, matrix[double] W1, matrix[double] W2 #matrix[double] W3 ) return (matrix[double] z2, matrix[double] a2, matrix[double] z3, #matrix[double] a3, #matrix[double] z4, matrix[double] Y) { z2 = affine::forward(X, W1, b1) a2 = sigmoid::forward(z2) z3 = affine::forward(a2, W2, b2) Y = sigmoid::forward(z3) } gradient = function(matrix[double] X, matrix[double] b1, matrix[double] b2, #matrix[double] b3, matrix[double] W1, matrix[double] W2, #matrix[double] W3, matrix[double] Y) #{ return (matrix[double] dJdW1, matrix[double] dJdW2 #matrix[double] dJdW3 ) { #Compute derivative with respect to W and W2 for a given X and y: [z2,a2,z3,Yhat] = feedForward(X,b1,b2,W1,W2) loss = l2_loss::backward(Yhat, Y) smpz3 = sigmoid::backward(loss,z3) [delta2,dJdW2,db2] = affine::backward(smpz3,a2,W2,b2) smpz2 = sigmoid::backward(delta2,z2) [delta2,dJdW1,db1] = affine::backward(smpz2,X,W1,b1) } upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1)) upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2)) max_iterations = 10000 iter = 0 learningRate = 0.001 while( iter < max_iterations ){ [dJdW1, dJdW2] = gradient(Xt,b1,b2,W1,W2,y) #update lrdJdW1 = learningRate * dJdW1 lrdJdW2 = learningRate * dJdW2 W1 = W1 - lrdJdW1 W2 = W2 - lrdJdW2 iter = iter + 1 if (iter %% 100 == 0) { #[z2,a2,z3,a3,z4,Yhat] = feedForward(Xt,b1,b2,b3,W1,W2,W3) [z2,a2,z3,Yhat] = feedForward(Xt,b1,b2,W1,W2) sse = sqrt(sum((Xt-Yhat)^2)) print(sse) } } """

Let's walk through that code a bit. Initially, we define the number of neurons per layer. The input layer needs 3,000 neurons because the frequency domain of our 3,000 input samples has 3,000 dimensions after we've applied FFT (fast Fournier transform). Then, as we are using an autoencoder, the output needs the same number of neurons. (Remember, we are trying to reconstruct the input signal on the output.) Finally, the hidden layer defines our neural bottleneck to 100 neurons. Therefore, irrelevant and noisy data can't be learned. The autoencoder only earns what's absolutely necessary to reconstruct the signal.

inputLayerSize = 3000 outputLayerSize = 3000 hiddenLayer1Size = 100

Normalizing data is always a good practice, so we subtract the mean and divide by the standard deviation.

means = colSums(Xt)/n stds = sqrt((colSums(Xt^2)/n - means*means)*n/(n-1)) + 1e-17 Xt = (Xt - means)/stds

Next, we initialize the weights matrices for each layer.

[W1, b1] = affine::init(inputLayerSize, hiddenLayer1Size) [W2, b2] = affine::init(hiddenLayer1Size, outputLayerSize)

This code computes the forward pass of the neural network by sequentially computing the layers and activation functions.

z2 = affine::forward(X, W1, b1) a2 = sigmoid::forward(z2) z3 = affine::forward(a2, W2, b2) Y = sigmoid::forward(z3)

Of course, we need to compute the back-propagation pass.

**Note:** We've already computed the first derivative of the
back-propagation pass inside the SystemML deep learning library, so don't
get confused.

[z2,a2,z3,Yhat] = feedForward(X,b1,b2,W1,W2) loss = l2_loss::backward(Yhat, Y) smpz3 = sigmoid::backward(loss,z3) [delta2,dJdW2,db2] = affine::backward(smpz3,a2,W2,b2) smpz2 = sigmoid::backward(delta2,z2) [delta2,dJdW1,db1] = affine::backward(smpz2,X,W1,b1)

We conclude with some gradient descent where we print the loss every 100 iterations.

max_iterations = 10000 iter = 0 learningRate = 0.001 while( iter < max_iterations ){ [dJdW1, dJdW2] = gradient(Xt,b1,b2,W1,W2,y) #update lrdJdW1 = learningRate * dJdW1 lrdJdW2 = learningRate * dJdW2 W1 = W1 - lrdJdW1 W2 = W2 - lrdJdW2 iter = iter + 1 if (iter %% 100 == 0) { [z2,a2,z3,Yhat] = feedForward(Xt,b1,b2,W1,W2) sse = sqrt(sum((Xt-Yhat)^2)) print(sse) } }

### Train the neural network using healthy and broken data

We now train this neural network twice with healthy data and once with broken data using Python code.

dummyMatrix = np.array([[1],[1]]) with jvm_stdout(True): prog = dml(script).input(X=data_healthy_fft).input(pushWeights=False).input(W1push=dummyMatrix).input(W2push=dummyMatrix) result = ml.execute(prog) [W1,W2] = result.get("W1","W2") prog = dml(script).input(X=data_healthy_fft).input(pushWeights=True).input(W1push=W1).input(W2push=W2) result = ml.execute(prog) [W1,W2] = result.get("W1","W2") prog = dml(script).input(X=data_broken_fft).input(pushWeights=True).input(W1push=W1).input(W2push=W2) result = ml.execute(prog)

If we were to plot the loss over the training iterations we get Figure 10.

##### Figure 10. Training iterations with healthy and broken data

We can see that training with healthy data rapidly pushes down the loss to around 14. If we train for a longer period, we don't get any lower. But we can also see that after we show the neural network broken data we see a spike in loss.

**Note:** This is our simple anomaly detector. It is
unsupervised machine learning. There is no need to know in advance if the
neural network is trained on healthy or broken data!

### Improve anomaly detection by adding LSTM layers

We can outperform state-of-the-art time series anomaly detection algorithms and feed-forward neural networks by using long-short term memory (LSTM) networks.

Consider the information in Table 1 taken from
the 2012 Stanford publication titled *Deep Learning for Time Series Modeling* by Enzo
Busseti, Ian Osband, and Scott Wong.

##### Table 1. Results for different learning models

Learning Method | RMSE | % RMSE |
---|---|---|

Kernelized Regression | 1,540 | 8.3% |

Frequency NN | 1,251 | 6.7% |

Deep Feedforward NN | 1,103 | 5.9% |

Deep Recurrent NN | 530 | 2.8% |

We can see that the combination of FFT and feed-forward neural network (the second row in Table 1) outperforms the state-of-the-art kernelized regression (6.7% versus 8.3% RMSE or root-mean-square-error, a measure on prediction performance). We will skip experimenting with deep feed-forward neural networks and directly jump to experimenting with a deep, recurrent neural network because it uses LSTM layers. Using LSTM layers is a way to introduce memory to neural networks that makes them ideal for analyzing time-series and sequence data.

In the code migrate the feed-forward neural network implemented in SystemML to a LSTM network.

source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss source("nn/layers/l2_loss.dml") as l2_loss source("nn/layers/lstm.dml") as lstm source("nn/layers/sigmoid.dml") as sigmoid source("nn/optim/sgd_nesterov.dml") as sgd_nesterov source("nn/optim/rmsprop.dml") as rmsprop in_TS = 70 #in_TS out_TS = 30 #out_TS N = nrow(X) / (in_TS + out_TS) M = out_TS idx_mat = outer(seq(0,N-1,1), t(seq(0,in_TS+out_TS-1,1)), "+") + 1 idx_col = matrix(idx_mat, rows=nrow(idx_mat)*ncol(idx_mat), cols=1) rordrd_X = table(seq(1, nrow(idx_col), 1), idx_col, nrow(idx_col), nrow(idx_col)) %*% X X = matrix(rordrd_X, rows=nrow(idx_mat), cols=ncol(idx_mat)) #print(toString(X)) Y = X[,in_TS+1:in_TS+out_TS] X = X[,1:in_TS] max_iterations = 1000 iter = 0 learningRate = 0.01 decayRate = 0.95 [W, b, out0, c0] = lstm::init(N,1,M) if (pushWeights) { W = Wpush } rmspropCache = rmsprop::init(W) while( iter < max_iterations ){ [a1, c, c_out, c_c, c_ifog] = lstm::forward(X, W, b, in_TS, 1, FALSE, out0, c0) loss = l2_loss::forward(a1, Y) if(iter %% 100 == 0) print("iter=" + iter + " loss=" + loss) loss_grad = l2_loss::backward(a1, Y) [dX, dW, db, dout0, dc0] = lstm::backward(loss_grad, c0, X, W, b, in_TS, 1, FALSE, out0, c0, c_out, c_c, c_ifog) [W, rmspropCache] = rmsprop::update(W, dW, learningRate, decayRate, 1e-6, rmspropCache) iter = iter + 1 }

Let's walk through that code a bit. The first thing we notice is that we
import another layer type from the library called `LSTM`

:

source("nn/layers/lstm.dml") as lstm

**Note:** The deep learning framework on SystemML is
completely implemented in the linear algebra DSL of SystemML. Therefore,
it can make use of the performance optimizations stages of this amazing
engine.

As we are now predicting future time-steps from past ones, we specify the number of time-steps used as input and as output.

in_TS = 70 #in_TS out_TS = 30 #out_TS

Our time-series length exceeds the number of total time-steps considered by
the neural network (the sum of `in_TS`

and
`out_TS`

`)`

. For example, if you want to predict a
10 Hz sine wave and you train the neural network with one second worth of
data sampled 44.1 kHz you'll end up with 44,100 time-steps containing 10
periods of the oscillation. What you want to do is slice the signal into
chunks of `in_TS + out_TS`

time-steps.

idx_mat = outer(seq(0,N-1,1), t(seq(0,in_TS+out_TS-1,1)), "+") + 1 idx_col = matrix(idx_mat, rows=nrow(idx_mat)*ncol(idx_mat), cols=1) rordrd_X = table(seq(1, nrow(idx_col), 1), idx_col, nrow(idx_col), nrow(idx_col)) %*% X X = matrix(rordrd_X, rows=nrow(idx_mat), cols=ncol(idx_mat)) Y = X[,in_TS+1:in_TS+out_TS] X = X[,1:in_TS]

To clarify what happened, consider that

- Initially
`X`

was a vector of 44,100 measurements. - Then
`X`

got transformed into a matrix containing 441 rows of 100 time-steps. - Finally the matrix split into
`X`

containing 441 rows of 70 time-steps as input for training and`Y`

containing 441 rows of 30 time-steps as target matrix, which the LSTM network has to reconstruct.

If we follow the usual steps in neural network training, first we initialize the parameters needed for gradient descent.

max_iterations = 1000 iter = 0 learningRate = 0.01 decayRate = 0.95 [W, b, out0, c0] = lstm::init(N,1,M)

Then, we introduced a way to preserve the trained weights between intermediate starts of the SystemML engine.

if (pushWeights) { W = Wpush }

LSTM networks are challenging, so training is a bit more complicated (meaning we have to find a hyper-parameter set where the loss converges to a local minima). Therefore, one change we had to make was using another parameter update function for gradient descend which is called RMSPROP; it contains a state which we have to initialize.

rmspropCache = rmsprop::init(W)

We end up with the usual gradient descent loop.

while( iter < max_iterations ){ [a1, c, c_out, c_c, c_ifog] = lstm::forward(X, W, b, in_TS, 1, FALSE, out0, c0) loss = l2_loss::forward(a1, Y) if(iter %% 100 == 0) print("iter=" + iter + " loss=" + loss) loss_grad = l2_loss::backward(a1, Y) [dX, dW, db, dout0, dc0] = lstm::backward(loss_grad, c0, X, W, b, in_TS, 1, FALSE, out0, c0, c_out, c_c, c_ifog) [W, rmspropCache] = rmsprop::update(W, dW, learningRate, decayRate, 1e-6, rmspropCache) iter = iter + 1 }

**Note:** We are again making use of the SystemML deep
learning library written in the SystemML DSL for the RMSPROP update.

We update the parameter matrix `W`

and the internal state
representation `rmspropCache`

of the RMSPROP
implementation.

[W, rmspropCache] = rmsprop::update(W, dW, learningRate, decayRate, 1e-6, rmspropCache)

### Re-run the network in Python

Switch back again to Python to run this network three times: twice with healthy data and once with broken data (the same pattern we've used for the simple feed-forward approach). Open the WatsonIoTPlatformSystemMLLSTM Notebook again in Watson Studio and find the cell with the content below and run it (but make sure you've run all previous cells, otherwise the necessary context variables have not been initialized).

import numpy as np dummyW = np.array([[1],[1]]) with jvm_stdout(True): prog = dml(script).input(X=np.transpose(np.array([data_healthy[:,1]]))).input(pushWeights=False).input(Wpush=dummyW) result = ml.execute(prog) W = result.get("W") prog = dml(script).input(X=np.transpose(np.array([data_healthy[:,1]]))).input(pushWeights=True).input(Wpush=W) result = ml.execute(prog) W = result.get("W") prog = dml(script).input(X=np.transpose(np.array([data_broken[:,1]]))).input(pushWeights=True).input(Wpush=W) result = ml.execute(prog)

Look at Figure 11 for the loss over training time.

##### Figure 11. Loss over training time

While Figure 11 does not show a clear level of granularity, you can see the exact values when running the notebook. Two things are important here:

- Initially, at time step zero, we see a value of 695 as loss. This is
due to the random initialization of the parameter matrix
`W`

. But we rapidly converge to 530, independent of how long we are training. - After we switch to broken data our loss jumps up to 23,901 which is nearly two orders of magnitude higher! And more interestingly it rapidly converges to 23,427 and never changes again.

### Analyze the data in real-time with the IBM Watson IoT Platform using MQTT

The last step is to hook this anomaly detector up to the IBM Watson IoT
Platform using MQTT to analyze data in real-time. To hook-up our neural
network to the platform is straightforward. Figure 12 highlights the `org`

, `apiKey`

,
and `apiToken`

values you need. These credentials were
generated when you created an IBM Cloud app using the Internet of Things Platform Starter.

**Note:** Refer to the Generating data for anomaly detection article for details on this
process.

##### Figure 12. IBM Cloud app credentials

import ibmiotf.application options = {"org": "rwyrty", "id": "anything", "auth-method": "apikey", "auth-key": "a-rwyrty-f95d3ji16n", "auth-token": "ZHd1&O)_J1&TI4XP3z"} client = ibmiotf.application.Client(options) client.connect() from Queue import Queue q = Queue(7000) def myEventCallback(event): q.put(event.data) client.deviceEventCallback = myEventCallback client.subscribeToDeviceEvents("0.16.2", "lorenz", "osc")

### Perform streaming analysis by creating a count-based tumbling window

Now we need to pass a tumbling-count-based window of sensor data to a callback function. The central core of the analysis is where data gets fetched in windows from the IoT platform and then passed to the neural network in batches.

import numpy as np global firstCall firstCall = True global W W = np.array([[1],[1]]) def doNN(data): with jvm_stdout(True): global firstCall global W notFirstCall = not firstCall prog = dml(script).input(X=np.transpose(np.array([data[:,1]]))).input(pushWeights=notFirstCall).input(Wpush=W) result = ml.execute(prog) W = result.get("W") firstCall = False

**Note:** We are reading the weight matrix `W`

from
the Apache SystemML engine in each iteration to preserve the "learning"
and on each subsequent call we are not learning from scratch, but start
where we ended in the last iteration.

### Create a continuous application loop for training

Finally, we create an endless loop over the incoming data and whenever our tumbling window is filled we pass it to the neural network. This loop is essential: data is grabbed from the platform; the platform waits until the window is full; the window is passed to the neural network for training; it starts over and over again.

import numpy as np while True: while not q.empty(): sample = q.get() point = [sample["x"], sample["y"],sample["z"]] try: data except NameError: data = np.array(point) else: data = np.append(data,point) if data.size>=9000: data = np.reshape(data,(3000,3)) print data doNN(data) del data

Figure 13 shows the loss over time. As we can see for the first two batches (windows) of healthy data, the LSTM network clearly learns the inherent patterns really well. And after we see broken data we again see a clear spike of one fold difference.

##### Figure 13. The loss over time

## Conclusion

This completes our second deep learning tutorial for IoT time-series data. We've learned how Apache SystemML facilitates linear algebra operations by optimizing executions on the fly and by making use of Apache Spark as a runtime engine.

We've learned about the benefits of the deep learning library completely written in the Apache SystemML DSL, where all operations executed during neural network training and scoring are optimized and executed by Apache Spark.

Finally, we've shown that even a very simple single-layer LSTM network can outperform state-of-the-art anomaly detection algorithms on time-series sensor data – or any type of sequence data in general.

In the previous article we worked with the same generated test data, but with a different deep learning framework: Deeplearning4j. In the next article, we'll work with TensorFlow (TensorSpark).

### Acknowledgements

This tutorial would not be possible without the continuous support of my colleagues working at IBM Almaden Research and the IBM Spark Technology center:

Berthold Reinwald, Technical Lead - Large-scale Analytics, IBM Research
Almaden.

Watch an interview with Berthold.

Prithviraj Sen, Machine Learning Researcher, IBM Research Almaden.

Watch an interview with Prithvi.

Mike Dusenberry, Machine Learning and DeepLearning Engineer, IBM Spark Technology Center San Fransisco.

#### Downloadable resources

#### Related topics

- Apache SystemML site
- Deep learning architectures
- Recurrent neural networks deep dive
- Code Pattern: Perform a machine learning exercise