package org.apache.flink.kubernetes.operator.observer;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/JobStatusObserver.class */
public abstract class JobStatusObserver<CTX> {
    private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
    private final FlinkService flinkService;

    public JobStatusObserver(FlinkService flinkService) {
        this.flinkService = flinkService;
    }

    public boolean observe(JobStatus jobStatus, Configuration configuration, CTX ctx) {
        LOG.info("Observing job status");
        String state = jobStatus.getState();
        try {
            ArrayList arrayList = new ArrayList(this.flinkService.listJobs(configuration));
            if (arrayList.isEmpty()) {
                ifRunningMoveToReconciling(jobStatus, state);
                return false;
            }
            Optional<String> updateJobStatus = updateJobStatus(jobStatus, arrayList);
            if (updateJobStatus.isEmpty()) {
                jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
                return false;
            }
            if (updateJobStatus.get().equals(state)) {
                LOG.info("Job status ({}) unchanged", state);
                return true;
            }
            LOG.info("Job status successfully updated from {} to {}", state, updateJobStatus.get());
            return true;
        } catch (Exception e) {
            LOG.error("Exception while listing jobs", e);
            if (e instanceof TimeoutException) {
                onTimeout(ctx);
            }
            ifRunningMoveToReconciling(jobStatus, state);
            return false;
        }
    }

    private void ifRunningMoveToReconciling(JobStatus jobStatus, String str) {
        if (org.apache.flink.api.common.JobStatus.RUNNING.name().equals(str)) {
            jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
        }
    }

    protected abstract void onTimeout(CTX ctx);

    protected abstract Optional<String> updateJobStatus(JobStatus jobStatus, List<JobStatusMessage> list);
}
