package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.class */
public class SessionReconciler extends AbstractFlinkResourceReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
    private final FlinkConfigManager configManager;

    public SessionReconciler(KubernetesClient kubernetesClient, EventRecorder eventRecorder, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder, FlinkConfigManager flinkConfigManager) {
        super(kubernetesClient, eventRecorder, statusRecorder, new NoopJobAutoscalerFactory());
        this.configManager = flinkConfigManager;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected boolean readyToReconcile(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        return true;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected boolean reconcileSpecChange(FlinkResourceContext<FlinkDeployment> flinkResourceContext, Configuration configuration) throws Exception {
        FlinkDeployment resource = flinkResourceContext.getResource();
        deleteSessionCluster(flinkResourceContext);
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, configuration, this.clock);
        this.statusRecorder.patchAndCacheStatus(resource);
        deploy2(flinkResourceContext, (FlinkDeploymentSpec) resource.getSpec(), configuration, Optional.empty(), false);
        ReconciliationUtils.updateStatusForDeployedSpec(resource, configuration, this.clock);
        return true;
    }

    private void deleteSessionCluster(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        FlinkDeployment resource = flinkResourceContext.getResource();
        flinkResourceContext.getFlinkService().deleteClusterDeployment(resource.getMetadata(), (FlinkDeploymentStatus) resource.getStatus(), flinkResourceContext.getDeployConfig((AbstractFlinkSpec) flinkResourceContext.getResource().getSpec()), false);
        flinkResourceContext.getFlinkService().waitForClusterShutdown(flinkResourceContext.getObserveConfig());
    }

    /* renamed from: deploy, reason: avoid collision after fix types in other method */
    public void deploy2(FlinkResourceContext<FlinkDeployment> flinkResourceContext, FlinkDeploymentSpec flinkDeploymentSpec, Configuration configuration, Optional<String> optional, boolean z) throws Exception {
        FlinkDeployment resource = flinkResourceContext.getResource();
        setOwnerReference(resource, configuration);
        flinkResourceContext.getFlinkService().submitSessionCluster(configuration);
        ((FlinkDeploymentStatus) resource.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(resource.getMetadata(), flinkDeploymentSpec, configuration, this.kubernetesClient);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected void rollback(FlinkResourceContext<FlinkDeployment> flinkResourceContext) throws Exception {
        FlinkDeploymentReconciliationStatus reconciliationStatus = ((FlinkDeploymentStatus) flinkResourceContext.getResource().getStatus()).getReconciliationStatus();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) reconciliationStatus.deserializeLastStableSpec();
        deleteSessionCluster(flinkResourceContext);
        deploy2(flinkResourceContext, flinkDeploymentSpec, flinkResourceContext.getDeployConfig(flinkDeploymentSpec), Optional.empty(), false);
        reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> flinkResourceContext) throws Exception {
        if (!shouldRecoverDeployment(flinkResourceContext.getObserveConfig(), flinkResourceContext.getResource())) {
            return false;
        }
        recoverSession(flinkResourceContext);
        return true;
    }

    private void recoverSession(FlinkResourceContext<FlinkDeployment> flinkResourceContext) throws Exception {
        flinkResourceContext.getFlinkService().submitSessionCluster(flinkResourceContext.getObserveConfig());
        ((FlinkDeploymentStatus) flinkResourceContext.getResource().getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        Set secondaryResources = flinkResourceContext.getJosdkContext().getSecondaryResources(FlinkSessionJob.class);
        AbstractFlinkResource<?, ?> abstractFlinkResource = (FlinkDeployment) flinkResourceContext.getResource();
        if (secondaryResources.isEmpty()) {
            LOG.info("Stopping session cluster");
            flinkResourceContext.getFlinkService().deleteClusterDeployment(abstractFlinkResource.getMetadata(), (FlinkDeploymentStatus) abstractFlinkResource.getStatus(), flinkResourceContext.getDeployConfig((AbstractFlinkSpec) flinkResourceContext.getResource().getSpec()), true);
            return DeleteControl.defaultDelete();
        }
        String format = String.format("The session jobs %s should be deleted first", secondaryResources.stream().map(flinkSessionJob -> {
            return flinkSessionJob.getMetadata().getName();
        }).collect(Collectors.toList()));
        if (this.eventRecorder.triggerEvent(abstractFlinkResource, EventRecorder.Type.Warning, EventRecorder.Reason.CleanupFailed, EventRecorder.Component.Operator, format)) {
            LOG.warn(format);
        }
        return DeleteControl.noFinalizerRemoval().rescheduleAfter(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis());
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ void deploy(FlinkResourceContext<FlinkDeployment> flinkResourceContext, FlinkDeploymentSpec flinkDeploymentSpec, Configuration configuration, Optional optional, boolean z) throws Exception {
        deploy2(flinkResourceContext, flinkDeploymentSpec, configuration, (Optional<String>) optional, z);
    }
}
