Overview
IBM® Automation Flink, which is based on Apache Flink®, provides event-processing capabilities.
The Apache Flink operator uses the FlinkDeployment
and FlinkSessionJob
custom resources to support Application
and Session
deployments. You need to specify the job
parameter for the
Application
deployment.
Event processing involves listening for events from one or more sources, performing meaningful operations such as aggregation and pattern detection, and then sending the processed events to one or more consumers or data stores.
A FlinkDeployment is an event processor, and a Flink job is an event processing task.
- Prerequisites
- Storage
- Example of FlinkDeployment custom resource
- [Example of FlinkDeployment custom resource]
Prerequisites
Create the flink-operator-cert
secret.
Storage
An RWX, shared PersistentVolumeClaim (PVC) for the Flink JobManagers and TaskManagers provides stateful checkpoint and savepoint for Flink jobs. When you have more than 1 Flink replica, the RWX PVC is mandatory.
For a single replica, you can mount the volume as read/write on a single node.
Note: Make sure that you provision an RWX storage class in your cluster.
In the FlinkDeployment
custom resource, provide the following PVC as the volume jobs-storage
:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: icp4adeploy-bai-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 20Gi
storageClassName: ocs-storagecluster-cephfs
Example of FlinkDeployment custom resource
You can define the Flink Application
and Session
cluster deployments with the FlinkDeployment
custom resource.
A Flink deployment custom resource with status is shown in the following example:
apiVersion: flink.ibm.com/v1beta1
kind: FlinkDeployment
metadata:
name: session-cluster-minimal-prod
spec:
flinkConfiguration:
license.accept: 'false'
high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: 'file:///opt/flink/volume/flink-ha'
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: '10'
restart-strategy.failure-rate.failure-rate-interval: '10 min'
restart-strategy.failure-rate.delay: '30 s'
execution.checkpointing.interval: '5000'
execution.checkpointing.unaligned: 'false'
state.backend.type: rocksdb
state.backend.incremental: 'true'
state.backend.rocksdb.use-bloom-filter: 'true'
state.checkpoints.dir: 'file:///opt/flink/volume/flink-cp'
state.checkpoints.num-retained: '3'
state.savepoints.dir: 'file:///opt/flink/volume/flink-sp'
taskmanager.numberOfTaskSlots: '10'
table.exec.source.idle-timeout: '30 s'
serviceAccount: flink
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: flink-logs
mountPath: /opt/flink/log
- name: flink-volume
mountPath: /opt/flink/volume
volumes:
- name: flink-logs
emptyDir: {}
- name: flink-volume
persistentVolumeClaim:
claimName: ibm-flink-pvc
jobManager:
replicas: 1
resource:
memory: '2048m'
cpu: 0.25
taskManager:
resource:
memory: '2048m'
cpu: 1
mode: native
Example of FlinkSessionJob custom resource
You can define the session job in the Session cluster with the FlinkSessionJob
custom resource. You can configure multiple FlinkSessionJob
for each Session cluster.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-session-deployment-example
spec:
image: flink:1.18.1
flinkVersion: v1_18
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
serviceAccount: flink
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example
spec:
deploymentName: basic-session-deployment-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 4
upgradeMode: stateless
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example2
spec:
deploymentName: basic-session-deployment-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1.jar
parallelism: 2
upgradeMode: stateless
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
{: codeblock}