package org.apache.flink.kubernetes.operator.observer;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.observer.ObserverContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/JobStatusObserver.class */
public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>, CTX extends ObserverContext> {
    private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
    private static final int MAX_ERROR_STRING_LENGTH = 512;
    public static final String MISSING_SESSION_JOB_ERR = "Missing Session Job";
    protected final FlinkService flinkService;
    protected final EventRecorder eventRecorder;
    protected final FlinkConfigManager configManager;

    public JobStatusObserver(FlinkService flinkService, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder) {
        this.flinkService = flinkService;
        this.eventRecorder = eventRecorder;
        this.configManager = flinkConfigManager;
    }

    public boolean observe(R r, Context context, CTX ctx) {
        JobStatus jobStatus = ((CommonStatus) r.getStatus()).getJobStatus();
        LOG.info("Observing job status");
        String state = jobStatus.getState();
        try {
            ArrayList arrayList = new ArrayList(this.flinkService.listJobs(ctx.getDeployedConfig()));
            if (arrayList.isEmpty()) {
                LOG.debug("No jobs found on the cluster");
                ifRunningMoveToReconciling(jobStatus, state);
                onNoJobsFound(r, ctx.getDeployedConfig());
                return false;
            }
            Optional<JobStatusMessage> filterTargetJob = filterTargetJob(jobStatus, arrayList);
            if (!filterTargetJob.isEmpty()) {
                updateJobStatus(r, filterTargetJob.get(), ctx.getDeployedConfig());
                ReconciliationUtils.checkAndUpdateStableSpec((CommonStatus) r.getStatus());
                return true;
            }
            LOG.warn("No matching jobs found on the cluster");
            ifRunningMoveToReconciling(jobStatus, state);
            onTargetJobNotFound(r, ctx.getDeployedConfig());
            return false;
        } catch (Exception e) {
            LOG.error("Exception while listing jobs", e);
            ifRunningMoveToReconciling(jobStatus, state);
            if (!(e instanceof TimeoutException)) {
                return false;
            }
            onTimeout(r, context, ctx);
            return false;
        }
    }

    protected abstract void onTargetJobNotFound(R r, Configuration configuration);

    protected void onNoJobsFound(R r, Configuration configuration) {
    }

    private void ifRunningMoveToReconciling(JobStatus jobStatus, String str) {
        if (org.apache.flink.api.common.JobStatus.RUNNING.name().equals(str)) {
            jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
        }
    }

    protected abstract void onTimeout(R r, Context<?> context, CTX ctx);

    protected abstract Optional<JobStatusMessage> filterTargetJob(JobStatus jobStatus, List<JobStatusMessage> list);

    private void updateJobStatus(R r, JobStatusMessage jobStatusMessage, Configuration configuration) {
        JobStatus jobStatus = ((CommonStatus) r.getStatus()).getJobStatus();
        String state = jobStatus.getState();
        jobStatus.setState(jobStatusMessage.getJobState().name());
        jobStatus.setJobName(jobStatusMessage.getJobName());
        jobStatus.setJobId(jobStatusMessage.getJobId().toHexString());
        jobStatus.setStartTime(String.valueOf(jobStatusMessage.getStartTime()));
        if (jobStatus.getState().equals(state)) {
            LOG.info("Job status ({}) unchanged", state);
            return;
        }
        jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
        String format = state == null ? String.format("Job status changed to %s", jobStatus.getState()) : String.format("Job status changed from %s to %s", state, jobStatus.getState());
        LOG.info(format);
        setErrorIfPresent(r, jobStatusMessage, configuration);
        this.eventRecorder.triggerEvent(r, EventRecorder.Type.Normal, EventRecorder.Reason.JobStatusChanged, EventRecorder.Component.Job, format);
    }

    private void setErrorIfPresent(R r, JobStatusMessage jobStatusMessage, Configuration configuration) {
        if (jobStatusMessage.getJobState() == org.apache.flink.api.common.JobStatus.FAILED) {
            try {
                this.flinkService.requestJobResult(configuration, jobStatusMessage.getJobId()).getSerializedThrowable().ifPresent(serializedThrowable -> {
                    FlinkResourceExceptionUtils.updateFlinkResourceException(serializedThrowable, r, this.configManager.getOperatorConfiguration());
                    LOG.error("Job {} failed with error: {}", jobStatusMessage.getJobId(), serializedThrowable.getFullStringifiedStackTrace());
                });
            } catch (Exception e) {
                LOG.warn("Failed to request the job result", e);
            }
        }
    }
}
