Overview

IBM® Automation Flink, which is based on Apache Flink®, provides event-processing capabilities.

The Apache Flink operator uses the FlinkDeployment and FlinkSessionJob custom resources to support Application and Session deployments. You need to specify the job parameter for the Application deployment.

Event processing involves listening for events from one or more sources, performing meaningful operations such as aggregation and pattern detection, and then sending the processed events to one or more consumers or data stores.

A FlinkDeployment is an event processor, and a Flink job is an event processing task.

Prerequisites

Create the flink-operator-cert secret.

Storage

An RWX, shared PersistentVolumeClaim (PVC) for the Flink JobManagers and TaskManagers provides stateful checkpoint and savepoint for Flink jobs. When you have more than 1 Flink replica, the RWX PVC is mandatory.

For a single replica, you can mount the volume as read/write on a single node.

Note: Make sure that you provision an RWX storage class in your cluster.

In the FlinkDeployment custom resource, provide the following PVC as the volume jobs-storage:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: icp4adeploy-bai-pvc
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 20Gi
  storageClassName: ocs-storagecluster-cephfs

Example of FlinkDeployment custom resource

You can define the Flink Application and Session cluster deployments with the FlinkDeployment custom resource.

A Flink deployment custom resource with status is shown in the following example:

apiVersion: flink.ibm.com/v1beta1
kind: FlinkDeployment
metadata:
  name: session-cluster-minimal-prod
spec:
  flinkConfiguration:
    license.accept: 'false'
    high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: 'file:///opt/flink/volume/flink-ha'
    restart-strategy: failure-rate
    restart-strategy.failure-rate.max-failures-per-interval: '10'
    restart-strategy.failure-rate.failure-rate-interval: '10 min'
    restart-strategy.failure-rate.delay: '30 s'
    execution.checkpointing.interval: '5000'
    execution.checkpointing.unaligned: 'false'
    state.backend.type: rocksdb
    state.backend.incremental: 'true'
    state.backend.rocksdb.use-bloom-filter: 'true'
    state.checkpoints.dir: 'file:///opt/flink/volume/flink-cp'
    state.checkpoints.num-retained: '3'
    state.savepoints.dir: 'file:///opt/flink/volume/flink-sp'
    taskmanager.numberOfTaskSlots: '10'
    table.exec.source.idle-timeout: '30 s'
  serviceAccount: flink
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - name: flink-logs
              mountPath: /opt/flink/log
            - name: flink-volume
              mountPath: /opt/flink/volume
      volumes:
        - name: flink-logs
          emptyDir: {}
        - name: flink-volume
          persistentVolumeClaim:
            claimName: ibm-flink-pvc
  jobManager:
    replicas: 1
    resource:
      memory: '2048m'
      cpu: 0.25
  taskManager:
    resource:
      memory: '2048m'
      cpu: 1
  mode: native

Example of FlinkSessionJob custom resource

You can define the session job in the Session cluster with the FlinkSessionJob custom resource. You can configure multiple FlinkSessionJob for each Session cluster.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-session-deployment-example
spec:
  image: flink:1.18.1
  flinkVersion: v1_18
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  serviceAccount: flink
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: basic-session-job-example
spec:
  deploymentName: basic-session-deployment-example
  job:
    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
    parallelism: 4
    upgradeMode: stateless

---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: basic-session-job-example2
spec:
  deploymentName: basic-session-deployment-example
  job:
    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1.jar
    parallelism: 2
    upgradeMode: stateless
    entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample

{: codeblock}