package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.class */
public class SessionJobReconciler extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class);
    private final FlinkConfigManager configManager;

    public SessionJobReconciler(KubernetesClient kubernetesClient, EventRecorder eventRecorder, StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder, FlinkConfigManager flinkConfigManager) {
        super(kubernetesClient, eventRecorder, statusRecorder, new NoopJobAutoscalerFactory());
        this.configManager = flinkConfigManager;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler, org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean readyToReconcile(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
        return sessionClusterReady(flinkResourceContext.getJosdkContext().getSecondaryResource(FlinkDeployment.class)) && super.readyToReconcile(flinkResourceContext);
    }

    public void deploy(FlinkResourceContext<FlinkSessionJob> flinkResourceContext, FlinkSessionJobSpec flinkSessionJobSpec, Configuration configuration, Optional<String> optional, boolean z) throws Exception {
        JobID submitJobToSessionCluster = flinkResourceContext.getFlinkService().submitJobToSessionCluster(flinkResourceContext.getResource().getMetadata(), flinkSessionJobSpec, configuration, optional.orElse(null));
        FlinkSessionJobStatus flinkSessionJobStatus = (FlinkSessionJobStatus) flinkResourceContext.getResource().getStatus();
        flinkSessionJobStatus.getJobStatus().setJobId(submitJobToSessionCluster.toHexString());
        flinkSessionJobStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    protected void cancelJob(FlinkResourceContext<FlinkSessionJob> flinkResourceContext, UpgradeMode upgradeMode) throws Exception {
        flinkResourceContext.getFlinkService().cancelSessionJob(flinkResourceContext.getResource(), upgradeMode, flinkResourceContext.getObserveConfig());
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    protected void cleanupAfterFailedJob(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
        if (flinkResourceContext.getJosdkContext().getSecondaryResource(FlinkDeployment.class).isPresent()) {
            String jobId = ((FlinkSessionJobStatus) flinkResourceContext.getResource().getStatus()).getJobStatus().getJobId();
            if (jobId != null) {
                try {
                    cancelJob(flinkResourceContext, UpgradeMode.STATELESS);
                } catch (ExecutionException e) {
                    Throwable cause = e.getCause();
                    if (cause instanceof FlinkJobNotFoundException) {
                        LOG.error("Job {} not found in the Flink cluster.", jobId, e);
                        return DeleteControl.defaultDelete();
                    }
                    if (cause instanceof FlinkJobTerminatedWithoutCancellationException) {
                        LOG.error("Job {} already terminated without cancellation.", jobId, e);
                        return DeleteControl.defaultDelete();
                    }
                    long millis = this.configManager.getOperatorConfiguration().getProgressCheckInterval().toMillis();
                    LOG.error("Failed to cancel job {}, will reschedule after {} milliseconds.", new Object[]{jobId, Long.valueOf(millis), e});
                    return DeleteControl.noFinalizerRemoval().rescheduleAfter(millis);
                } catch (Exception e2) {
                    LOG.error("Failed to cancel job {}.", jobId, e2);
                }
            }
        } else {
            LOG.info("Session cluster deployment not available");
        }
        return DeleteControl.defaultDelete();
    }

    public static boolean sessionClusterReady(Optional<FlinkDeployment> optional) {
        if (!optional.isPresent()) {
            LOG.warn("Session cluster deployment is not found");
            return false;
        }
        JobManagerDeploymentStatus jobManagerDeploymentStatus = ((FlinkDeploymentStatus) optional.get().getStatus()).getJobManagerDeploymentStatus();
        if (jobManagerDeploymentStatus == JobManagerDeploymentStatus.READY) {
            return true;
        }
        LOG.info("Session cluster deployment is in {} status, not ready for serve", jobManagerDeploymentStatus);
        return false;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ void deploy(FlinkResourceContext flinkResourceContext, AbstractFlinkSpec abstractFlinkSpec, Configuration configuration, Optional optional, boolean z) throws Exception {
        deploy((FlinkResourceContext<FlinkSessionJob>) flinkResourceContext, (FlinkSessionJobSpec) abstractFlinkSpec, configuration, (Optional<String>) optional, z);
    }
}
