Troubleshooting Apache Flink jobs

If you observe that no events are flowing to HDFS or to Elasticsearch, and that Flink job logs report errors, explore possible diagnoses and solutions.

Tip:

Make sure that the jq command-line JSON processor is installed. Some of these troubleshooting procedures require this tool. The jq tool is available from this page: https://stedolan.github.io/jq/.

In code lines, the NAMESPACE or bai_namespace placeholder is the namespace where Business Automation Insights is deployed. The CR_NAME or custom_resource_name placeholder is the name of the custom resource that was used to deploy Business Automation Insights.

All task slots seem to be busy. However, not enough task slots are assigned to a job.

Problem
The job manager log reports errors such as the following one.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 300000 ms. 
Slots required: 8, slots allocated: 0

The Flink web interface is accessible and in the overview page, you see 0 (zero) as the number of available task slots.

Cause
  • If this issue happens after initial configuration, it means that you did not configure enough task slots for running all the jobs. Verify whether the number of task slots that are displayed in the Flink web interface is equal to, or greater than, the number of running jobs. If it is not, update your IBM Business Automation Insights configuration with the correct number of task manager replicas and task slots.
  • If the issue happens after you have updated your Business Automation Insights configuration, the problem might indicate that Flink did not correctly update the metadata about task slot assignment after a failing task manager recovered.
Solution
  1. Restart each task manager one by one, in any order, by running these delete commands. The <custom_resource_name> placeholder stands for the name of your custom resource for IBM Business Automation Insights.
    oc delete pod <custom_resource_name>-[...]-ep-taskmanager-0
    oc delete pod <custom_resource_name>-[...]-ep-taskmanager-1
    ...

    These commands redistribute the jobs to the different task slots. The task manager StatefulSet immediately redeploys new instances.

  2. After the restart, verify from the Flink web interface that all jobs are running and that task slots are correctly assigned.

After an update of the job submitter, the processing job is in Canceled state and issues an error message.

Problem
The following error message is displayed.
Get latest completed checkpoint for <job-id> job
REST endpoint did not return latest completed checkpoint. Getting it from Persistent Volume...
Error: There is no checkpoint to recover from.
Diagnosis
This error can happen when the version of a job is updated, for example to try to fix a failure, and this failure is preventing the creation of new checkpoints and savepoints.
Solution
Restart the job from the latest successful checkpoint or savepoint.
  1. You can find the latest successful checkpoint in the <bai-pv>/checkpoints/<job-name>/<job-id> directory.
    <bai-pv>
    The directory where the Business Automation Insights persistent volume (PV) was created. Set this variable to /mnt/pv, which is the folder where the PV is mapped within the job submitters.
    <job-name>
    The name of the failing job, for example bai/bpmn.
    <job-id>
    The job identifier that is indicated by the error message. Pick the most recent checkpoint, that is, the higher <checkpoint-id> value and verify that the folder is not empty.
  2. If all <checkpoint-id> folders are empty, and only in this case, use the latest savepoint of the corresponding processing job, which you can find in the <bai-pv>/savepoints/<job-name> directory.
  3. Update the <job_name>.recovery_path parameter of the failing job by following the procedure in Updating your Business Automation Insights custom resource.

Jobs are not running and the Flink web interface is not accessible after a system restart.

Cause
When you try to access the Flink web interface, you see the following message.
{"errors":["Service temporarily unavailable due to an ongoing leader election. 
Please refresh."]}
The job manager logs the following message.
INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool
- Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{######}]
...
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool   
- Pending slot request [SlotRequestId{######}] timed out.
The job manager also reports errors such as the following one.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 300000 ms. 
Slots required: 6, slots allocated: 0
Cause
These messages indicate that the job manager was not correctly updated after the system restart. As a consequence, the job manager does not have access to the task managers to assign job execution.
Solution
Restart the job manager to update it with the correct data, by running the delete command.
oc delete pod <custom_resource_name>-[...]-ep-jobmanager-<id>

A new job manager instance is deployed. After the redeployment, all jobs should be running again and the Flink web interface should be accessible.

Job pods, such as <custom_resource_name>-bai-bpmn or <custom_resource_name>-bai-icm, are stuck in Init:0/1 status

Problem
The pods of <custom_resource_name>-bai-bpmn and <custom_resource_name>-bai-icm jobs first require that <custom_resource_name>-bai-setup job completes successfully. The <custom_resource_name>-bai-setup job attempts up to three retries on failure. Past these three retries, it does not trigger any new pod creation. As a side-effect, this can cause pods of the <custom_resource_name>-bai-bpmn and <custom_resource_name>-bai-icm jobs to remain stuck in Init:0/1 status.
  1. When you run the get pods command, you might observe the following results.
    oc get pods -n <bai-namespace>
    Table 1. Pod retrieval results
    NAME READY RESTARTS STATUS AGE
    ...

    <bai_cr_name>-bpmn-aaaaa

    ...

    <bai_cr_name>-bai-icm-bbbbb

    ...

    ...

    0/1

    ...

    0/1

    ...

    ...

    Init:0/1

    ...

    Init:0/1

    ...

    ...

    0

    ...

    0

    ...

    ...

    2h

    ...

    2h

    ...

  2. Retrieve the job manager logs and look up the logs of the <custom_resource_name>-bai-bpmn and <custom_resource_name>-bai-icm pods.
    JOB_MANAGER=$(oc get pod -l component=jobmanager --no-headers -o custom-columns=NAME:.metadata.name)
    oc logs $JOB_MANAGER -c jobmanager
Cause
This situation can happen if you set up Elasticsearch incorrectly when you installed the release. First, make sure that Elasticsearch is properly up and running. After Elasticsearch is up and running, you can apply the following solution.
Solution
  1. Delete all the pods that were previously created by the <custom_resource_name>-bai-setup job.
    oc delete pod <custom_resource_name>-bai-setup-aaaaa
    oc delete pod <custom_resource_name>-bai-setup-bbbbb
    oc delete pod <custom_resource_name>-bai-setup-ccccc
    oc delete pod <custom_resource_name>-bai-setup-ddddd
  2. Run the following command to re-create the <bai_cr_name>-bai-setup job.
    oc get job <custom_resource_name>-bai-setup -o json | jq "del(.spec.selector)" | jq "del(.spec.template.metadata.labels)" | oc replace --force -f -

You are trying to remove an operator without first creating savepoints

Problem
The job submitter pods are in Error state and you find errors in the logs, such as the following one.
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/mnt/pv/savepoints/dba/bai-<job-name>/savepoint-<savepoint-id> Cannot map checkpoint/savepoint state for operator xxxxxxxxxxxxxx to the new program, because the operator is not available in the new program.
Cause
This error occurs if you are trying to update your release and remove an operator, for example HDFS, but you did not first create the necessary savepoints and no values were passed to the <job_name>.recovery_path parameter of the jobs.
Solution
The error message contains a path to a savepoint that is created dynamically to try to update the job. You can use that savepoint to restart the jobs from them by updating the IBM Business Automation Insights release and passing the correct values for each job in its <job_name>.recovery_path parameter. For more information about the parameters that need to be updated in the release, see Advanced updates to your IBM Business Automation Insights deployment.

Tracing IBM Cloud Pak for Business Automation raw events in Flink jobs

Problem
The Flink jobs that process IBM Cloud Pak for Business Automation raw events have been successfully submitted to the Flink job manager. However, these jobs do not process the raw events types that they are designed to handle.
Note: This problem does not affect custom events.
Cause
Likely, the connection settings to the Kafka brokers are incorrect or some Flink jobs failed before they could process the raw events types.
Solution
The solution consists in activating verbose logs, restarting the job manager and task managers, and finally restarting the Flink jobs.
  1. Enable the option for verbose logs in the custom resource (CR) YAML file.
    spec.bai_configuration.flink.verbose_logs: true
  2. Restart the job manager.
    oc delete pod <jobmanager> -n <bai_namespace>
  3. Restart all the task managers.
    oc delete pod <taskmanager> -n <bai_namespace>
  4. Restart the Flink jobs.
    1. Retrieve the names of the jobs.
      oc get jobs -o=custom-columns=Name:.metadata.name --no-headers
    2. For each job name, run the following command by replacing the <job_name> placeholder with the actual name of the job.
      oc get job <job_name> -o json | jq 'del(.spec.selector)' | jq 'del(.spec.template.metadata.labels)' | oc replace --force -f -
    Whenever a Flink job receives an IBM Cloud Pak for Business Automation raw event to process to a time series and possibly afterward to a summary, this raw event is written as INFO log level to the log of the task manager this Flink job is deployed to.

Jobs are not running due to a Flink job manager failure and the job manager age is more recent than the age of task manager pods

Problem
No events are processed and you see no running Flink jobs in the Flink console.
Cause
The job manager pod experienced a failure and the related pod restarted. In that case, the Kubernetes submitter jobs are still present but do not restart.
Solution
  1. Identify the latest checkpoints that were written by the job manager for each Flink job before the outage.
    1. Connect to the job manager.
      JOB_MANAGER_POD=$(oc get pods -l component=jobmanager --no-headers -o custom-columns=Namae:.metadata.name | head -n 1)
        oc rsh -c jobmanager ${JOB_MANAGER_POD}
    2. Identify the latest Flink job identifiers.
      LATEST_JOB_IDS=$(for DIR in $(find /mnt/pv/checkpoints/dba/* -type d -prune); do echo -n $DIR/; ls -t $DIR | head -n 1 | tail -n 1;done)
    3. Retrieve the directory paths containing the latest checkpoints that were written by the job manager for each running FLink job.
      for CHK_DIR in ${LATEST_JOB_IDS}; do ls -t $CHK_DIR/chk*/_metadata 2>/dev/null| head -n 1 | tail -n 1; done | sed 's#/_metadata$##'
  2. Restart the Flink jobs from their respective checkpoints.
    1. Edit the Business Automation Insights custom resource for each Flink job. Example for the BPMN Flink job:
      spec:
        bai_configuration:
          bpmn:
            install: true
            parallelism: 1
            recovery_path: /mnt/pv/checkpoints/dba/bai-bpmn/6d134e8ce753f2a0ec63222532145fef/chk-67
    2. Save the changes.
    3. Check whether the Flink jobs are restarted.
      oc exec -c jobmanager -it ${JOB_MANAGER_POD} -- /opt/flink/bin/flink list | grep RUNNING