package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.class */
public class SessionJobReconciler extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class);

    public SessionJobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder, StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
        super(kubernetesClient, flinkService, flinkConfigManager, eventRecorder, statusRecorder);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public Configuration getObserveConfig(FlinkSessionJob flinkSessionJob, Context context) {
        return getDeployConfig(flinkSessionJob.getMetadata(), (FlinkSessionJobSpec) flinkSessionJob.getSpec(), context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkSessionJobSpec flinkSessionJobSpec, Context context) {
        Optional secondaryResource = context.getSecondaryResource(FlinkDeployment.class);
        if (sessionClusterReady(secondaryResource)) {
            return this.configManager.getSessionJobConfig((FlinkDeployment) secondaryResource.get(), flinkSessionJobSpec);
        }
        return null;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler, org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean readyToReconcile(FlinkSessionJob flinkSessionJob, Context context, Configuration configuration) {
        return sessionClusterReady(context.getSecondaryResource(FlinkDeployment.class)) && super.readyToReconcile((SessionJobReconciler) flinkSessionJob, context, configuration);
    }

    protected void deploy(FlinkSessionJob flinkSessionJob, FlinkSessionJobSpec flinkSessionJobSpec, FlinkSessionJobStatus flinkSessionJobStatus, Configuration configuration, Optional<String> optional, boolean z) throws Exception {
        flinkSessionJobStatus.setJobStatus(new JobStatus().toBuilder().jobId(this.flinkService.submitJobToSessionCluster(flinkSessionJob.getMetadata(), flinkSessionJobSpec, configuration, optional.orElse(null)).toHexString()).state(org.apache.flink.api.common.JobStatus.RECONCILING.name()).build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    public void cancelJob(FlinkSessionJob flinkSessionJob, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        this.flinkService.cancelSessionJob(flinkSessionJob, upgradeMode, configuration);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public DeleteControl cleanupInternal(FlinkSessionJob flinkSessionJob, Context context) {
        if (!context.getSecondaryResource(FlinkDeployment.class).isPresent()) {
            LOG.info("Session cluster deployment not available");
        } else if (((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getJobId() != null) {
            try {
                cancelJob(flinkSessionJob, UpgradeMode.STATELESS, getObserveConfig(flinkSessionJob, context));
            } catch (Exception e) {
                LOG.error("Failed to cancel job.", e);
            }
        }
        return DeleteControl.defaultDelete();
    }

    public static boolean sessionClusterReady(Optional<FlinkDeployment> optional) {
        if (!optional.isPresent()) {
            LOG.warn("Session cluster deployment is not found");
            return false;
        }
        JobManagerDeploymentStatus jobManagerDeploymentStatus = ((FlinkDeploymentStatus) optional.get().getStatus()).getJobManagerDeploymentStatus();
        if (jobManagerDeploymentStatus == JobManagerDeploymentStatus.READY) {
            return true;
        }
        LOG.info("Session cluster deployment is in {} status, not ready for serve", jobManagerDeploymentStatus);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ void deploy(AbstractFlinkResource abstractFlinkResource, AbstractFlinkSpec abstractFlinkSpec, CommonStatus commonStatus, Configuration configuration, Optional optional, boolean z) throws Exception {
        deploy((FlinkSessionJob) abstractFlinkResource, (FlinkSessionJobSpec) abstractFlinkSpec, (FlinkSessionJobStatus) commonStatus, configuration, (Optional<String>) optional, z);
    }
}
