Contents


Developing cognitive IoT solutions for anomaly detection by using deep learning, Part 4

Using Apache SystemML for anomaly detection

Create a deep learning neural network on Apache Spark with Apache SystemML

Comments

Content series:

This content is part # of 5 in the series: Developing cognitive IoT solutions for anomaly detection by using deep learning, Part 4

Stay tuned for additional content in this series.

This content is part of the series:Developing cognitive IoT solutions for anomaly detection by using deep learning, Part 4

Stay tuned for additional content in this series.

At this point in the series of articles I've introduced you to deep learning and long-short term memory (LSTM) networks, shown you how to generate data for anomaly detection, and taught you how to use the Deeplearning4j toolkit. In this fourth article, I talk about Apache SystemML.

As a reminder, our task is to detect anomalies in vibration (accelerometer) sensor data in a bearing as shown in Figure 1.

Figure 1. Accelerometer sensor on a bearing records vibrations on each of the three geometrical axes x, y, and z

When talking about deep learning a lot of people talk about libraries such as Caffe, TensorFlow, and PyTorch. Those are great tools, but when you are familiar with Apache Spark – and I love how efficient Apache Spark scales out on large clusters – you have to ask yourself if there is a better tool to do deep learning on Apache Spark. I introduced you to the DeepLearning4J toolkit with its integrated runtime module and ND4J library already, but now I will talk about Apache SystemML.

What you’ll need to build your app

  • An IBM Cloud account. (Sign up for an IBM Cloud Lite account, a free account that never expires.)
  • An IBM Watson Studio account. You can use the IBM ID you've created while registering for the IBM Cloud Account.

    To get started using IBM Watson Studio, including signing up for an account, watch the videos in this video collection on developerWorks TV.

Setting up your development environment

Before we talk about the deep learning use case, spend some time setting up your development environment. We use Jupyter Notebooks running inside IBM Watson Studio. We need to work with these different notebooks:

  • The first notebook is used to explain how a feed-forward neural network works with basic Python (that is, not running on Apache Spark).
  • The second notebook uses Apache SystemML and is integrated with the Watson IoT platform.

Instead of accessing data from the object store, we'll be accessing it directly from MQTT by using the Watson IoT platform in realtime.

  1. Log in to datascience.ibm.com.
  2. To understand how a feed-forward neural network works with basic Python, import a notebook.
    1. Click Tools > Notebook.
    2. Click From URL, in the Name field add a name, and in the Notebook URL field, paste the following URL: https://raw.githubusercontent.com/romeokienzler/developerWorks/master/systemml/PythonFeedforward.ipynb
    3. Click Create Notebook.

      Note: Do not run this notebook now. We'll refer to it later in the article when we practice training a small neural network using Python.

  3. To work with Apache SystemML and the Watson IoT platform, import a notebook.
    1. Click Tools > Notebook.
    2. Click From URL, in the Name field add a name, and in the Notebook URL field, paste the following URL: https://raw.githubusercontent.com/romeokienzler/developerWorks/master/systemml/WatsonIoTPlatformSystemMLLSTM.ipynb
    3. Click Create Notebook.

Note: Do not run this notebook now. We'll refer to it later in the article when we practice training a small neural network in Python and when we implement a gradient descent to minimize functions.

What is Apache SystemML?

Apache SystemML began in 2007 as a research project in the IBM Almaden Research lab in California; it is older than Apache Spark. The project's intent was to improve the workflow of data scientists, especially those who want to improve and add functionality to existing machine learning algorithms. SystemML is divided into three core components:

  • A runtime component that transparently distributes work on Apache Spark.
  • A declarative machine learning (DML)language – an R-like syntax - that can express linear algebra and control structures used in advanced machine learning and deep learning.
  • A cost-based optimizer that decides which linear algebra operations should run multithreaded as SIMD instructions on CPUs or GPUs, scaled-out using a cluster, or a combination.

Transparent, distributed runtime streamlines processing

Running Apache SystemML on Apache Spark solves the data parallelization challenge in data processing and machine learning. Unlike other frameworks, SystemML runs transparently. Other deep learning frameworks like TensorFlow bring their own distributed computing environment with them and that needs additional effort to operate.

Declarative machine learning language implements linear algebra

Since its inception, Apache SystemML has gone through multiple re-factorings and is now one of the fastest machine learning libraries on the planet. Recently, deep learning support was added. This was quite easy to do because all the linear algebra to make this possible is implemented in the SystemML DML language.

Ten years ago, researchers realized out-of-the box machine learning algorithms perform very poorly on large data sets. Figure 2 illustrates the data analysis pipeline that had to be tuned after a small-scale version had been prototyped in MATLAB (matrix laboratory), R, or Python.

Figure 2. Data analysis pipeline

A cost-based optimizer improves processing of machine learning algorithms

The key component on Apache SystemML is the cost-based optimizer. SystemML supports parallel neural network training at the lowest level – in linear algebra. This optimizer turns a high-level description of an algorithm in a domain-specific language (DSL) into a highly optimized physical execution on Apache Spark.

What exactly is going on in the SystemML optimizer? The first thing the engine does is a compile step on the DSL: syntax checking, live variable analysis to determine which intermediate results still are needed, and a semantic check.

After the compile step is passed, an execution plan using high-level operators (HOPs) is generated. HOPs are constructed from the abstract syntax tree (AST) of the DSL. The following important optimization steps are taking place during that phase:

  • Static rewrites
    The DSL offers a rich set of syntactic and semantic features that make an implementation easy to understand, but might result in a non-optimal execution. SystemML detects the non-optimal expressions in the AST and rewrites them to a better performing expression (of course maintaining the semantical equivalency).
  • Dynamic rewrites
    Dynamic rewrites are like static rewrites, but consider the data sizes used in the expressions.

Figure 3 illustrates a rewrite where an entire expression in a HOP directed acyclic graph (DAG) is actually rewritten to use a fused operator called wdivmm (weighted divide matrix multiplication). The wdivmm operator is a way to avoid very large, dense intermediates and save compute by exploiting sparsity of the weight matrix W.

Figure 3. Static rewrite of the weighted divide matrix multiplication HOP (high-level operators)


Let's look at how low-level operators (LOPs) are selected and optimized. We'll stick to the weighted divide matrix multiplication example shown in Figure 3. Remember this HOP was selected during the HOP optimization process. Now the question is: does it make sense to use a parallel version of the corresponding LOP on the Apache Spark worker nodes or is a local execution preferable? Figure 4 shows that Apache SystemML determines that the entire computation including input, intermediates, and output fit into the main memory of the driver node and therefore it chooses the local operator wdivmm over the distributed operator mapwdivmm.

Figure 4. Low-level operator execution type based on estimated input, intermediate, and output matrix sizes


Is all this cost-based optimization effort worth it? Let's look at some performance comparisons in Figure 5 between a local R script, MLLib, and SystemML.

Figure 5. Runtime performance comparisons - This image is licensed under the Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License. To view a copy of this license, visithttp://creativecommons.org/licenses/by-nc-nd/4.0/.

The ALS algorithm (or alternating least squares, a matrix-factorization algorithm often used in recommender systems) was run on different data sets with 1.2, 12,and 120 GB of data using R, MLLib, and SystemML. We can clearly see that even on the smallest data set R is not a feasible solution because it took more than 24 hours, and we are not sure if it would ever have completed. On the 12 GB data set we've noticed that SystemML runs significantly faster than MLLib. And finally, on the 120 GB data set, the ALS implementation of MLLib didn't finish within one day and we gave up. The clear winner is SystemML with a runtime performance usable for production use.

1

Practice training a small neural network in Python

Now that you see what SystemML can do, I want to show you how to create a neural network. Before we start with our IoT time-series data, start with this Python example we imported in the PythonFeedforward Notebook. You can follow along using the notebook you imported when you set up your development environment.

  1. Log in to datascience.ibm.com.
  2. Click Default Project.
  3. To edit the notebook, click the pencil symbol near the notebook.
  4. To run the notebook, click the run symbol.

First, we implement a single, hidden-layer, feed-forward neural network. Next, we enhance it with a gradient descent (an algorithm that minimizes functions). We significantly improve performance by using SystemML to run all linear algebra operations in parallel. Finally, we leverage the deep neural network library in Apache SystemML to simplify algebra operations.

1a

Implement a single, hidden-layer, feed-forward neural network

Consider the following Python class. This code implements a single, hidden-layer, feed-forward neural network. It not only implements the forward pass, but also the back-propagation pass. This pass is implemented as the first derivative of the actual back-propagation function, which obtains the gradients of the direction in which we need to adjust the weight matrices W1 and W2 to fit the network to our data.

import numpy as np

class Neural_Network(object):
    def __init__(self):        
        #Define Hyperparameters
        self.inputLayerSize = 2
        self.outputLayerSize = 1
        self.hiddenLayerSize = 3
        
        #Weights (parameters)
        self.W1 = np.random.randn(self.inputLayerSize, self.hiddenLayerSize)
        self.W2 = np.random.randn(self.hiddenLayerSize, self.outputLayerSize)
        
    def forward(self, X):
        #Propagate inputs though network
        self.z2 = np.dot(X, self.W1)
        self.a2 = self.sigmoid(self.z2)
        self.z3 = np.dot(self.a2, self.W2)
        yHat = self.sigmoid(self.z3) 
        return yHat
        
    def sigmoid(self, z):
        #Apply sigmoid activation function to scalar, vector, or matrix
        return 1/(1+np.exp(-z))
    
    def costFunction(self, X, y):
        #Compute cost for given X,y, use weights already stored in class.
        self.yHat = self.forward(X)
        #print y
        #print self.yHat
        J = 0.5*sum((y-self.yHat)**2)
        return J
    
    def sigmoidPrime(self,z):
        #Gradient of sigmoid
        return np.exp(-z)/((1+np.exp(-z))**2)
    
    def tanh(self,x):
        return np.tanh(x)

    def tanh_deriv(self,x):
        return 1.0 - np.tanh(x)**2

    def logistic(x):
        return 1/(1 + np.exp(-x))

    def logistic_derivative(x):
        return logistic(x)*(1-logistic(x))

    def costFunctionPrime(self, X, y):
        #Compute derivative with respect to W and W2 for a given X and y:
        self.yHat = self.forward(X)
        
        delta3 = np.multiply(-(y-self.yHat), self.sigmoidPrime(self.z3))
        #delta3 = np.multiply(-(y-self.yHat), self.z3)
        dJdW2 = np.dot(self.a2.T, delta3)
        
        delta2 = np.dot(delta3, self.W2.T)*self.sigmoidPrime(self.z2)
        dJdW1 = np.dot(X.T, delta2)  
        
        return dJdW1, dJdW2
1b

Implement a gradient descent to minimize functions

Next, we will implement a gradient descent to minimize functions. We are calculating the gradients dJdW1 and dJdW2. By using the following updatefunction NN.W - learningRate * dJdW we gradually push the weight matrices W1 and W2 in the correct direction. However, note that this code is not running in parallel at all.

NN = Neural_Network()
max_iterations = 10000
iter = 0
learningRate = 0.01
while iter < max_iterations:
      dJdW1, dJdW2 = NN.costFunctionPrime(X,y)

      
      #update
      NN.W1 = NN.W1 - learningRate * dJdW1
      NN.W2 = NN.W2 - learningRate * dJdW2
      
      if iter % 1000 == 0:
            print NN.costFunction(X,y)
    
      iter = iter + 1

Apache SystemML is able to parallelize and distribute these linear algebra and deep learning operations. How can we parallelize this code easily on Apache Spark? By using SystemML.

Let's switch from this slow, single-threaded implementation to SystemML. You can follow along with this code by opening and using the WatsonIoTPlatformSystemMLLSTM Notebook in Watson Studio you imported when you set up your development environment. This code is very similar to the Python code before, but is implemented in SystemML DML language.

sigmoid = function(matrix[double] z) return (matrix[double] z) {
    z = 1/(1+exp(-z))
}


sigmoidPrime = function(matrix[double] z) return (matrix[double] z) {
        #Gradient of sigmoid
        z = exp(-z)/(1+exp(-z))
}

inputLayerSize = 2
outputLayerSize = 1
hiddenLayerSize = 3

W1 = rand(rows=inputLayerSize,cols=hiddenLayerSize)
W2 = rand(rows=hiddenLayerSize,cols=outputLayerSize)
feedForward = function (matrix[double] X,
                        matrix[double] W1,
                        matrix[double] W2) return (matrix[double] z2,matrix[double] a2,matrix[double] z3,matrix[double] Y) {
    z2 =  X %*% W1
    a2 =  sigmoid(z2)
    z3 = (a2 %*% W2)
    Y = sigmoid(z3)
}

gradient = function(matrix[double] X,
                        matrix[double] W1,
                        matrix[double] W2,
                        matrix[double] Y) return (matrix[double] dJdW1,matrix[double] dJdW2) {
    #Compute derivative with respect to W and W2 for a given X and y:
    [z2,a2,z3,Yhat] = feedForward(X,W1,W2)
    
    smpz3 =  sigmoidPrime(z3)
    delta3 = -(Y-Yhat) * smpz3
    dJdW2 = t(a2) %*% delta3
        
    smpz2 = sigmoidPrime(z2)
    delta2 = (delta3 %*% t(W2))*smpz2
    dJdW1 = t(X) %*% delta2  
}

upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1))
upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2))
mu = 0.9

max_iterations = 10000
iter = 0
learningRate = 0.001

[z2,a2,z3,YhatBefore] = feedForward(X,W1,W2)

while( iter < max_iterations ){
      [dJdW1, dJdW2] = gradient(X,W1,W2,y)

      #update
      lrdJdW1 = learningRate * dJdW1
      lrdJdW2 = learningRate * dJdW2
      W1 = W1 - lrdJdW1
      W2 = W2 - lrdJdW2
    
     iter = iter + 1
}

[z2,a2,z3,YhatAfter] = feedForward(X,W1,W2)
1c

Simplify the linear algebra with the SystemML deep neural network library

Writing all this linear algebra code can be laborious. Therefore, SystemML provides a deep neural network library. Consider the following code which re-implements our example using the library.

It looks much simpler, doesn't it? We are not using primitive linear algebra operations, instead we are calling forward and backward functions of the deep neural network library of SystemML.

source("nn/layers/affine.dml") as affine
source("nn/layers/conv2d_builtin.dml") as conv2d
source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss
source("nn/layers/dropout.dml") as dropout
source("nn/layers/l2_reg.dml") as l2_reg
source("nn/layers/max_pool2d_builtin.dml") as max_pool2d
source("nn/layers/relu.dml") as relu
source("nn/layers/softmax.dml") as softmax
source("nn/layers/sigmoid.dml") as sigmoid
source("nn/optim/sgd_nesterov.dml") as sgd_nesterov
#
inputLayerSize = 2
outputLayerSize = 1
hiddenLayerSize = 3

[W1, b1] = affine::init(inputLayerSize, hiddenLayerSize)
[W2, b2] = affine::init(hiddenLayerSize, outputLayerSize) 


sigmoidPrime = function(matrix[double] z) return (matrix[double] z) {
        #Gradient of sigmoid
        z = exp(-z)/(1+exp(-z))
}



#W1 = rand(rows=inputLayerSize,cols=hiddenLayerSize)
#W2 = rand(rows=hiddenLayerSize,cols=outputLayerSize)

feedForward = function (matrix[double] X,
                        matrix[double] b1,
                        matrix[double] b2,
                        matrix[double] W1,
                        matrix[double] W2) return (matrix[double] z2,matrix[double] a2,matrix[double] z3,matrix[double] Y) {
    z2 = affine::forward(X, W1, b1)
    a2 = sigmoid::forward(z2)
    z3 = affine::forward(a2, W2, b2)
    Y = sigmoid::forward(z3)
}
gradient = function(matrix[double] X,
                        matrix[double] b1,
                        matrix[double] b2,
                        matrix[double] W1,
                        matrix[double] W2,
                        matrix[double] Y) return (matrix[double] dJdW1,matrix[double] dJdW2) {
    #Compute derivative with respect to W and W2 for a given X and y:
    [z2,a2,z3,Yhat] = feedForward(X,b1,b2,W1,W2)
    loss = cross_entropy_loss::backward(Yhat, Y)
    
    smpz3 =  sigmoid::backward(loss,z3) 
    [delta2,dJdW2,db2] = affine::backward(smpz3,a2,W2,b2)
                
    smpz2 =  sigmoid::backward(delta2,z2)
    [delta2,dJdW1,db1] = affine::backward(smpz2,X,W1,b1)
}

upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1))
upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2))
mu = 0.9

max_iterations = 10000
iter = 0
learningRate = 0.001

[z2,a2,z3,YhatBefore] = feedForward(X,b1,b2,W1,W2)

while( iter < max_iterations ){
      smy = sigmoid::forward(y)
      [dJdW1, dJdW2] = gradient(X,b1,b2,W1,W2,smy)

      #update
      lrdJdW1 = learningRate * dJdW1
      lrdJdW2 = learningRate * dJdW2
      W1 = W1 - lrdJdW1
      W2 = W2 - lrdJdW2
    
     iter = iter + 1
}

[z2,a2,z3,YhatAfter] = feedForward(X,b1,b2,W1,W2)
2

Create a SystemML neural network for anomaly detection

Learning how to train a neural network in Python was educational, but now we need to build something useful on Apache Spark using Apache SystemML and its deep learning library with a generated data set. (Remember, we used a Lorenz Attractor model to get simulated real-time vibration sensor data in a bearing. And we need to get that data to the IBM Cloud platform. See the tutorial on how to generate data for anomaly detection.) When we set up our development environment we imported the WatsonIoTPlatformSystemMLLSTM Notebook and we'll take a look at it now.

2a

Train a feed-forward neural network for unsupervised machine learning

To feed three-dimensional time series data to a feed-forward neural network it makes sense to add a so-called "feature engineering" step. We'll use the DFT (discrete Fourier transform) to transform from the time to the frequency domain. Look at Figure 6 to get an idea of how the data looks in the time domain for the first 3,000 samples in a healthy state.

Figure 6. Data in the time domain in a healthy state

Notice that while this system oscillates between two semi-stable states, it is hard to identify any regular patterns.

Look at the same chart in Figure 7 after we've switched the test data generator to a broken state.

Figure 7. Data in the time domain in a broken state

The obvious result is that we see much more energy in the system. The peaks are exceeding 200 in contrast to the healthy state which never went over 50. Also, in my opinion, the frequency content of the second signal is higher.

Let's confirm the frequency of the second signal is higher by transforming the signal from the time to the frequency domain. The chart in Figure 8 contains the frequencies of the healthy signal.

Figure 8. Data in the frequency domain in a healthy state

And now let's contrast this with the broken signal in Figure 9.

Figure 9. Data in the frequency domain in a broken state

As expected, there are a lot more frequencies present in the broken signal.

We have enough evidence to construct an anomaly detector based on supervised machine learning (with a state-of-the-art model like a gradient boosted tree). But we want unsupervised machine learning because we have no idea which parts of the signal are normal and which are not.

A simple approach to unsupervised machine learning is to feed those 3,000 frequency bands (remember, DFT returns as many frequency bands as we have samples in the signal, and because we are sampling with 100 Hz for 30 seconds from the physical model this is also the number of frequency bands).

With this approach we have transformed our three-dimensional input data (the three accelerometer axes we are measuring) into a 9,000 dimensional data set (the 3,000 frequency bands per accelerometer axis). This is our new 9,000 dimensional input feature space. We can use the 9,000 dimensional input space to train a feed-forward neural network. Our hidden layer in the feed-forward neural network has only 100 neurons (instead of the 9,000 we have in the input and output layer). This is called a bottleneck and turns our neural network into an autoencoder. We train the neural network by assigning the inputs on the input and output layers. The neural network will learn to reconstruct the input on the output. But the neural network has to learn the reconstruction going through the 100 neuron hidden-layer bottleneck. This way we prevent the neural network from learning about any noise or irrelevant data. The Apache SystemML code we imported into our Notebook defines such a neural network.

script = """
source("nn/layers/affine.dml") as affine
source("nn/layers/conv2d_builtin.dml") as conv2d
source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss
source("nn/layers/l2_loss.dml") as l2_loss
source("nn/layers/dropout.dml") as dropout
source("nn/layers/l2_reg.dml") as l2_reg
source("nn/layers/max_pool2d_builtin.dml") as max_pool2d
source("nn/layers/relu.dml") as relu
source("nn/layers/softmax.dml") as softmax
source("nn/layers/sigmoid.dml") as sigmoid
source("nn/optim/sgd_nesterov.dml") as sgd_nesterov
source("nn/optim/rmsprop.dml") as rmsprop
#
inputLayerSize = 3000
outputLayerSize = 3000
hiddenLayer1Size = 100


Xt = t(X)

n = nrow(Xt)
means = colSums(Xt)/n
stds = sqrt((colSums(Xt^2)/n - means*means)*n/(n-1)) + 1e-17
Xt = (Xt - means)/stds

Xt = 1/(1+exp(-Xt))

y = Xt

[W1, b1] = affine::init(inputLayerSize, hiddenLayer1Size)
[W2, b2] = affine::init(hiddenLayer1Size, outputLayerSize) 
if (pushWeights) {
    W1 = W1push
    W2 = W2push
}  


feedForward = function (matrix[double] X,
                        matrix[double] b1,
                        matrix[double] b2,
                        #matrix[double] b3,
                        matrix[double] W1,
                        matrix[double] W2
                        #matrix[double] W3
                        ) 
                        return (matrix[double] z2,
                        matrix[double] a2,
                        matrix[double] z3,
                        #matrix[double] a3,
                        #matrix[double] z4,
                        matrix[double] Y) {
    z2 = affine::forward(X, W1, b1)
    a2 = sigmoid::forward(z2)
    z3 = affine::forward(a2, W2, b2)
    Y = sigmoid::forward(z3)
}

gradient = function(matrix[double] X,
                        matrix[double] b1,
                        matrix[double] b2,
                        #matrix[double] b3,
                        matrix[double] W1,
                        matrix[double] W2,
                        #matrix[double] W3,
                        matrix[double] Y) #{
                        return (matrix[double] dJdW1,
                        matrix[double] dJdW2
                        #matrix[double] dJdW3
                        ) {
    #Compute derivative with respect to W and W2 for a given X and y:
    [z2,a2,z3,Yhat] = feedForward(X,b1,b2,W1,W2)
    
    loss = l2_loss::backward(Yhat, Y)
    
    smpz3 =  sigmoid::backward(loss,z3) 
    [delta2,dJdW2,db2] = affine::backward(smpz3,a2,W2,b2)
                
    smpz2 =  sigmoid::backward(delta2,z2)
    [delta2,dJdW1,db1] = affine::backward(smpz2,X,W1,b1)
}
upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1))
upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2))

max_iterations = 10000
iter = 0
learningRate = 0.001

while( iter < max_iterations ){
      [dJdW1, dJdW2] = gradient(Xt,b1,b2,W1,W2,y)

      #update
      lrdJdW1 = learningRate * dJdW1
      lrdJdW2 = learningRate * dJdW2
      W1 = W1 - lrdJdW1
      W2 = W2 - lrdJdW2
      
      iter = iter + 1
      if (iter %% 100 == 0) {
          #[z2,a2,z3,a3,z4,Yhat] = feedForward(Xt,b1,b2,b3,W1,W2,W3)
          [z2,a2,z3,Yhat] = feedForward(Xt,b1,b2,W1,W2)
          sse = sqrt(sum((Xt-Yhat)^2))
          print(sse)
      }
}
"""

Let's walk through that code a bit. Initially, we define the number of neurons per layer. The input layer needs 3,000 neurons because the frequency domain of our 3,000 input samples has 3,000 dimensions after we've applied FFT (fast Fournier transform). Then, as we are using an autoencoder, the output needs the same number of neurons. (Remember, we are trying to reconstruct the input signal on the output.) Finally, the hidden layer defines our neural bottleneck to 100 neurons. Therefore, irrelevant and noisy data can't be learned. The autoencoder only earns what's absolutely necessary to reconstruct the signal.

inputLayerSize = 3000
outputLayerSize = 3000
hiddenLayer1Size = 100

Normalizing data is always a good practice, so we subtract the mean and divide by the standard deviation.

means = colSums(Xt)/n
stds = sqrt((colSums(Xt^2)/n - means*means)*n/(n-1)) + 1e-17
Xt = (Xt - means)/stds

Next, we initialize the weights matrices for each layer.

[W1, b1] = affine::init(inputLayerSize, hiddenLayer1Size)
[W2, b2] = affine::init(hiddenLayer1Size, outputLayerSize)

This code computes the forward pass of the neural network by sequentially computing the layers and activation functions.

    z2 = affine::forward(X, W1, b1)
    a2 = sigmoid::forward(z2)
    z3 = affine::forward(a2, W2, b2)
    Y = sigmoid::forward(z3)

Of course, we need to compute the back-propagation pass.

Note: We've already computed the first derivative of the back-propagation pass inside the SystemML deep learning library, so don't get confused.

    [z2,a2,z3,Yhat] = feedForward(X,b1,b2,W1,W2)
    
    loss = l2_loss::backward(Yhat, Y)
    
    smpz3 =  sigmoid::backward(loss,z3) 
    [delta2,dJdW2,db2] = affine::backward(smpz3,a2,W2,b2)
                
    smpz2 =  sigmoid::backward(delta2,z2)
    [delta2,dJdW1,db1] = affine::backward(smpz2,X,W1,b1)

We conclude with some gradient descent where we print the loss every 100 iterations.

max_iterations = 10000
iter = 0
learningRate = 0.001

while( iter < max_iterations ){
      [dJdW1, dJdW2] = gradient(Xt,b1,b2,W1,W2,y)

      #update
      lrdJdW1 = learningRate * dJdW1
      lrdJdW2 = learningRate * dJdW2
      W1 = W1 - lrdJdW1
      W2 = W2 - lrdJdW2
      
      iter = iter + 1
      if (iter %% 100 == 0) {
          [z2,a2,z3,Yhat] = feedForward(Xt,b1,b2,W1,W2)
          sse = sqrt(sum((Xt-Yhat)^2))
          print(sse)
      }
}
2b

Train the neural network using healthy and broken data

We now train this neural network twice with healthy data and once with broken data using Python code.

dummyMatrix = np.array([[1],[1]])
with jvm_stdout(True):
    prog = dml(script).input(X=data_healthy_fft).input(pushWeights=False).input(W1push=dummyMatrix).input(W2push=dummyMatrix)
    result = ml.execute(prog)
    [W1,W2] = result.get("W1","W2")
    prog = dml(script).input(X=data_healthy_fft).input(pushWeights=True).input(W1push=W1).input(W2push=W2)
    result = ml.execute(prog)
    [W1,W2] = result.get("W1","W2")
    prog = dml(script).input(X=data_broken_fft).input(pushWeights=True).input(W1push=W1).input(W2push=W2)
    result = ml.execute(prog)

If we were to plot the loss over the training iterations we get Figure 10.

Figure 10. Training iterations with healthy and broken data

We can see that training with healthy data rapidly pushes down the loss to around 14. If we train for a longer period, we don't get any lower. But we can also see that after we show the neural network broken data we see a spike in loss.

Note: This is our simple anomaly detector. It is unsupervised machine learning. There is no need to know in advance if the neural network is trained on healthy or broken data!

2c

Improve anomaly detection by adding LSTM layers

We can outperform state-of-the-art time series anomaly detection algorithms and feed-forward neural networks by using long-short term memory (LSTM) networks.

Consider the information in Table 1 taken from the 2012 Stanford publication titled Deep Learning for Time Series Modeling by Enzo Busseti, Ian Osband, and Scott Wong.

Table 1. Results for different learning models
Learning MethodRMSE% RMSE
Kernelized Regression1,5408.3%
Frequency NN1,2516.7%
Deep Feedforward NN1,1035.9%
Deep Recurrent NN5302.8%

We can see that the combination of FFT and feed-forward neural network (the second row in Table 1) outperforms the state-of-the-art kernelized regression (6.7% versus 8.3% RMSE or root-mean-square-error, a measure on prediction performance). We will skip experimenting with deep feed-forward neural networks and directly jump to experimenting with a deep, recurrent neural network because it uses LSTM layers. Using LSTM layers is a way to introduce memory to neural networks that makes them ideal for analyzing time-series and sequence data.

In the code migrate the feed-forward neural network implemented in SystemML to a LSTM network.

source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss
source("nn/layers/l2_loss.dml") as l2_loss

source("nn/layers/lstm.dml") as lstm
source("nn/layers/sigmoid.dml") as sigmoid

source("nn/optim/sgd_nesterov.dml") as sgd_nesterov
source("nn/optim/rmsprop.dml") as rmsprop


in_TS = 70 #in_TS
out_TS = 30 #out_TS
N = nrow(X) / (in_TS + out_TS)
M = out_TS

idx_mat = outer(seq(0,N-1,1), t(seq(0,in_TS+out_TS-1,1)), "+") + 1

idx_col = matrix(idx_mat, rows=nrow(idx_mat)*ncol(idx_mat), cols=1)
rordrd_X = table(seq(1, nrow(idx_col), 1), idx_col, nrow(idx_col), nrow(idx_col)) %*% X

X = matrix(rordrd_X, rows=nrow(idx_mat), cols=ncol(idx_mat))
#print(toString(X))

Y = X[,in_TS+1:in_TS+out_TS]
X = X[,1:in_TS]

max_iterations = 1000
iter = 0
learningRate = 0.01
decayRate = 0.95

[W, b, out0, c0] = lstm::init(N,1,M)

if (pushWeights) {
    W = Wpush
} 


rmspropCache = rmsprop::init(W)

while( iter < max_iterations ){
    [a1, c, c_out, c_c, c_ifog] = lstm::forward(X, W, b, in_TS, 1, FALSE, out0, c0)
    loss = l2_loss::forward(a1, Y)

    if(iter %% 100 == 0) print("iter=" + iter + " loss=" + loss)
    
    loss_grad = l2_loss::backward(a1, Y)
    [dX, dW, db, dout0, dc0] = lstm::backward(loss_grad, c0, X, W, b, in_TS, 1, FALSE, out0, c0, c_out, c_c, c_ifog)

    [W, rmspropCache] = rmsprop::update(W, dW, learningRate, decayRate, 1e-6, rmspropCache)
    
    iter = iter + 1
}

Let's walk through that code a bit. The first thing we notice is that we import another layer type from the library called LSTM:

source("nn/layers/lstm.dml") as lstm

Note: The deep learning framework on SystemML is completely implemented in the linear algebra DSL of SystemML. Therefore, it can make use of the performance optimizations stages of this amazing engine.

As we are now predicting future time-steps from past ones, we specify the number of time-steps used as input and as output.

in_TS = 70 #in_TS
out_TS = 30 #out_TS

Our time-series length exceeds the number of total time-steps considered by the neural network (the sum of in_TS and out_TS). For example, if you want to predict a 10 Hz sine wave and you train the neural network with one second worth of data sampled 44.1 kHz you'll end up with 44,100 time-steps containing 10 periods of the oscillation. What you want to do is slice the signal into chunks of in_TS + out_TS time-steps.

idx_mat = outer(seq(0,N-1,1), t(seq(0,in_TS+out_TS-1,1)), "+") + 1

idx_col = matrix(idx_mat, rows=nrow(idx_mat)*ncol(idx_mat), cols=1)
rordrd_X = table(seq(1, nrow(idx_col), 1), idx_col, nrow(idx_col), nrow(idx_col)) %*% X

X = matrix(rordrd_X, rows=nrow(idx_mat), cols=ncol(idx_mat))
Y = X[,in_TS+1:in_TS+out_TS]
X = X[,1:in_TS]

To clarify what happened, consider that

  • Initially X was a vector of 44,100 measurements.
  • Then X got transformed into a matrix containing 441 rows of 100 time-steps.
  • Finally the matrix split into X containing 441 rows of 70 time-steps as input for training and Y containing 441 rows of 30 time-steps as target matrix, which the LSTM network has to reconstruct.

If we follow the usual steps in neural network training, first we initialize the parameters needed for gradient descent.

max_iterations = 1000
iter = 0
learningRate = 0.01
decayRate = 0.95

[W, b, out0, c0] = lstm::init(N,1,M)

Then, we introduced a way to preserve the trained weights between intermediate starts of the SystemML engine.

if (pushWeights) {
    W = Wpush
}

LSTM networks are challenging, so training is a bit more complicated (meaning we have to find a hyper-parameter set where the loss converges to a local minima). Therefore, one change we had to make was using another parameter update function for gradient descend which is called RMSPROP; it contains a state which we have to initialize.

rmspropCache = rmsprop::init(W)

We end up with the usual gradient descent loop.

while( iter < max_iterations ){
    [a1, c, c_out, c_c, c_ifog] = lstm::forward(X, W, b, in_TS, 1, FALSE, out0, c0)
    loss = l2_loss::forward(a1, Y)

    if(iter %% 100 == 0) print("iter=" + iter + " loss=" + loss)
    loss_grad = l2_loss::backward(a1, Y)
    [dX, dW, db, dout0, dc0] = lstm::backward(loss_grad, c0, X, W, b, in_TS, 1, FALSE, out0, c0, c_out, c_c, c_ifog)

    [W, rmspropCache] = rmsprop::update(W, dW, learningRate, decayRate, 1e-6, rmspropCache)
    
    iter = iter + 1
}

Note: We are again making use of the SystemML deep learning library written in the SystemML DSL for the RMSPROP update.

We update the parameter matrix W and the internal state representation rmspropCache of the RMSPROP implementation.

 [W, rmspropCache] = rmsprop::update(W, dW, learningRate, decayRate, 1e-6, rmspropCache)
2d

Re-run the network in Python

Switch back again to Python to run this network three times: twice with healthy data and once with broken data (the same pattern we've used for the simple feed-forward approach). Open the WatsonIoTPlatformSystemMLLSTM Notebook again in Watson Studio and find the cell with the content below and run it (but make sure you've run all previous cells, otherwise the necessary context variables have not been initialized).

import numpy as np
dummyW = np.array([[1],[1]])
with jvm_stdout(True):
    prog = dml(script).input(X=np.transpose(np.array([data_healthy[:,1]]))).input(pushWeights=False).input(Wpush=dummyW)
    result = ml.execute(prog)
    W = result.get("W")
    prog = dml(script).input(X=np.transpose(np.array([data_healthy[:,1]]))).input(pushWeights=True).input(Wpush=W)
    result = ml.execute(prog)
    W = result.get("W")
    prog = dml(script).input(X=np.transpose(np.array([data_broken[:,1]]))).input(pushWeights=True).input(Wpush=W)
    result = ml.execute(prog)

Look at Figure 11 for the loss over training time.

Figure 11. Loss over training time

While Figure 11 does not show a clear level of granularity, you can see the exact values when running the notebook. Two things are important here:

  • Initially, at time step zero, we see a value of 695 as loss. This is due to the random initialization of the parameter matrix W. But we rapidly converge to 530, independent of how long we are training.
  • After we switch to broken data our loss jumps up to 23,901 which is nearly two orders of magnitude higher! And more interestingly it rapidly converges to 23,427 and never changes again.
2e

Analyze the data in real-time with the IBM Watson IoT Platform using MQTT

The last step is to hook this anomaly detector up to the IBM Watson IoT Platform using MQTT to analyze data in real-time. To hook-up our neural network to the platform is straightforward. Figure 12 highlights the org, apiKey, and apiToken values you need. These credentials were generated when you created an IBM Cloud app using the Internet of Things Platform Starter.

Note: Refer to the Generating data for anomaly detection article for details on this process.

Figure 12. IBM Cloud app credentials
import ibmiotf.application
options = {"org": "rwyrty", "id": "anything", "auth-method": "apikey", "auth-key": "a-rwyrty-f95d3ji16n", "auth-token": "ZHd1&O)_J1&TI4XP3z"}
client = ibmiotf.application.Client(options)
client.connect()

from Queue import Queue
q = Queue(7000)

def myEventCallback(event):
    q.put(event.data)

client.deviceEventCallback = myEventCallback
client.subscribeToDeviceEvents("0.16.2", "lorenz", "osc")
2f

Perform streaming analysis by creating a count-based tumbling window

Now we need to pass a tumbling-count-based window of sensor data to a callback function. The central core of the analysis is where data gets fetched in windows from the IoT platform and then passed to the neural network in batches.

import numpy as np
global firstCall
firstCall = True
global W
W = np.array([[1],[1]])
def doNN(data):
    with jvm_stdout(True):
        global firstCall
        global W
        notFirstCall = not firstCall
        prog = dml(script).input(X=np.transpose(np.array([data[:,1]]))).input(pushWeights=notFirstCall).input(Wpush=W)
        result = ml.execute(prog)
        W = result.get("W")
        firstCall = False

Note: We are reading the weight matrix W from the Apache SystemML engine in each iteration to preserve the "learning" and on each subsequent call we are not learning from scratch, but start where we ended in the last iteration.

2g

Create a continuous application loop for training

Finally, we create an endless loop over the incoming data and whenever our tumbling window is filled we pass it to the neural network. This loop is essential: data is grabbed from the platform; the platform waits until the window is full; the window is passed to the neural network for training; it starts over and over again.

import numpy as np
while True:
    while not q.empty():
        sample = q.get()
        point = [sample["x"], sample["y"],sample["z"]]
        try:
            data
        except NameError:
            data = np.array(point)
        else:
            data = np.append(data,point)
        if data.size>=9000:
            data = np.reshape(data,(3000,3))
            print data
            doNN(data)
            del data

Figure 13 shows the loss over time. As we can see for the first two batches (windows) of healthy data, the LSTM network clearly learns the inherent patterns really well. And after we see broken data we again see a clear spike of one fold difference.

Figure 13. The loss over time

Conclusion

This completes our second deep learning tutorial for IoT time-series data. We've learned how Apache SystemML facilitates linear algebra operations by optimizing executions on the fly and by making use of Apache Spark as a runtime engine.

We've learned about the benefits of the deep learning library completely written in the Apache SystemML DSL, where all operations executed during neural network training and scoring are optimized and executed by Apache Spark.

Finally, we've shown that even a very simple single-layer LSTM network can outperform state-of-the-art anomaly detection algorithms on time-series sensor data – or any type of sequence data in general.

In the previous article we worked with the same generated test data, but with a different deep learning framework: Deeplearning4j. In the next article, we'll work with TensorFlow (TensorSpark).

Acknowledgements

This tutorial would not be possible without the continuous support of my colleagues working at IBM Almaden Research and the IBM Spark Technology center:

Berthold Reinwald, Technical Lead - Large-scale Analytics, IBM Research Almaden.
Watch an interview with Berthold.

Prithviraj Sen, Machine Learning Researcher, IBM Research Almaden.
Watch an interview with Prithvi.

Mike Dusenberry, Machine Learning and DeepLearning Engineer, IBM Spark Technology Center San Fransisco.


Downloadable resources


Related topics


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Internet of Things, Cognitive computing, Data and analytics
ArticleID=1052440
ArticleTitle=Developing cognitive IoT solutions for anomaly detection by using deep learning, Part 4: Using Apache SystemML for anomaly detection
publish-date=11222017