package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.class */
public class SessionJobReconciler extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class);
    private final FlinkServiceFactory flinkServiceFactory;

    public SessionJobReconciler(KubernetesClient kubernetesClient, FlinkServiceFactory flinkServiceFactory, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder, StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder) {
        super(kubernetesClient, flinkConfigManager, eventRecorder, statusRecorder);
        this.flinkServiceFactory = flinkServiceFactory;
    }

    protected FlinkService getFlinkService(FlinkSessionJob flinkSessionJob, Context<?> context) {
        Optional secondaryResource = context.getSecondaryResource(FlinkDeployment.class);
        if (sessionClusterReady(secondaryResource)) {
            return this.flinkServiceFactory.getOrCreate((FlinkDeployment) secondaryResource.get());
        }
        return null;
    }

    protected Configuration getObserveConfig(FlinkSessionJob flinkSessionJob, Context<?> context) {
        return getDeployConfig(flinkSessionJob.getMetadata(), (FlinkSessionJobSpec) flinkSessionJob.getSpec(), context);
    }

    protected Configuration getDeployConfig(ObjectMeta objectMeta, FlinkSessionJobSpec flinkSessionJobSpec, Context<?> context) {
        Optional secondaryResource = context.getSecondaryResource(FlinkDeployment.class);
        if (sessionClusterReady(secondaryResource)) {
            return this.configManager.getSessionJobConfig((FlinkDeployment) secondaryResource.get(), flinkSessionJobSpec);
        }
        return null;
    }

    public boolean readyToReconcile(FlinkSessionJob flinkSessionJob, Context<?> context, Configuration configuration) {
        return sessionClusterReady(context.getSecondaryResource(FlinkDeployment.class)) && super.readyToReconcile((SessionJobReconciler) flinkSessionJob, context, configuration);
    }

    protected void deploy(FlinkSessionJob flinkSessionJob, FlinkSessionJobSpec flinkSessionJobSpec, FlinkSessionJobStatus flinkSessionJobStatus, Context<?> context, Configuration configuration, Optional<String> optional, boolean z) throws Exception {
        flinkSessionJobStatus.setJobStatus(new JobStatus().toBuilder().jobId(getFlinkService(flinkSessionJob, context).submitJobToSessionCluster(flinkSessionJob.getMetadata(), flinkSessionJobSpec, configuration, optional.orElse(null)).toHexString()).state(org.apache.flink.api.common.JobStatus.RECONCILING.name()).build());
    }

    /* renamed from: cancelJob, reason: avoid collision after fix types in other method */
    protected void cancelJob2(FlinkSessionJob flinkSessionJob, Context<?> context, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        getFlinkService(flinkSessionJob, context).cancelSessionJob(flinkSessionJob, upgradeMode, configuration);
    }

    /* renamed from: cleanupAfterFailedJob, reason: avoid collision after fix types in other method */
    protected void cleanupAfterFailedJob2(FlinkSessionJob flinkSessionJob, Context<?> context, Configuration configuration) throws Exception {
    }

    public DeleteControl cleanupInternal(FlinkSessionJob flinkSessionJob, Context<?> context) {
        if (context.getSecondaryResource(FlinkDeployment.class).isPresent()) {
            String jobId = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getJobId();
            if (jobId != null) {
                try {
                    cancelJob2(flinkSessionJob, context, UpgradeMode.STATELESS, getObserveConfig(flinkSessionJob, context));
                } catch (Exception e) {
                    LOG.error("Failed to cancel job {}.", jobId, e);
                }
            }
        } else {
            LOG.info("Session cluster deployment not available");
        }
        return DeleteControl.defaultDelete();
    }

    public static boolean sessionClusterReady(Optional<FlinkDeployment> optional) {
        if (!optional.isPresent()) {
            LOG.warn("Session cluster deployment is not found");
            return false;
        }
        JobManagerDeploymentStatus jobManagerDeploymentStatus = ((FlinkDeploymentStatus) optional.get().getStatus()).getJobManagerDeploymentStatus();
        if (jobManagerDeploymentStatus == JobManagerDeploymentStatus.READY) {
            return true;
        }
        LOG.info("Session cluster deployment is in {} status, not ready for serve", jobManagerDeploymentStatus);
        return false;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    protected /* bridge */ /* synthetic */ void cleanupAfterFailedJob(FlinkSessionJob flinkSessionJob, Context context, Configuration configuration) throws Exception {
        cleanupAfterFailedJob2(flinkSessionJob, (Context<?>) context, configuration);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    protected /* bridge */ /* synthetic */ void cancelJob(FlinkSessionJob flinkSessionJob, Context context, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        cancelJob2(flinkSessionJob, (Context<?>) context, upgradeMode, configuration);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler, org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ boolean readyToReconcile(AbstractFlinkResource abstractFlinkResource, Context context, Configuration configuration) {
        return readyToReconcile((FlinkSessionJob) abstractFlinkResource, (Context<?>) context, configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ FlinkService getFlinkService(AbstractFlinkResource abstractFlinkResource, Context context) {
        return getFlinkService((FlinkSessionJob) abstractFlinkResource, (Context<?>) context);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ DeleteControl cleanupInternal(AbstractFlinkResource abstractFlinkResource, Context context) {
        return cleanupInternal((FlinkSessionJob) abstractFlinkResource, (Context<?>) context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ void deploy(AbstractFlinkResource abstractFlinkResource, AbstractFlinkSpec abstractFlinkSpec, CommonStatus commonStatus, Context context, Configuration configuration, Optional optional, boolean z) throws Exception {
        deploy((FlinkSessionJob) abstractFlinkResource, (FlinkSessionJobSpec) abstractFlinkSpec, (FlinkSessionJobStatus) commonStatus, (Context<?>) context, configuration, (Optional<String>) optional, z);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected /* bridge */ /* synthetic */ Configuration getObserveConfig(AbstractFlinkResource abstractFlinkResource, Context context) {
        return getObserveConfig((FlinkSessionJob) abstractFlinkResource, (Context<?>) context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ Configuration getDeployConfig(ObjectMeta objectMeta, AbstractFlinkSpec abstractFlinkSpec, Context context) {
        return getDeployConfig(objectMeta, (FlinkSessionJobSpec) abstractFlinkSpec, (Context<?>) context);
    }
}
