package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.class */
public abstract class AbstractFlinkResourceReconciler<CR extends AbstractFlinkResource<SPEC, STATUS>, SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>> implements Reconciler<CR> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
    protected final FlinkConfigManager configManager;
    protected final EventRecorder eventRecorder;
    protected final StatusRecorder<STATUS> statusRecorder;
    protected final KubernetesClient kubernetesClient;
    protected final FlinkService flinkService;
    public static final String MSG_SUSPENDED = "Suspending existing deployment.";
    public static final String MSG_SPEC_CHANGED = "Detected spec change, starting reconciliation.";
    public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
    public static final String MSG_SUBMIT = "Starting deployment";

    public AbstractFlinkResourceReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder, StatusRecorder<STATUS> statusRecorder) {
        this.kubernetesClient = kubernetesClient;
        this.flinkService = flinkService;
        this.configManager = flinkConfigManager;
        this.eventRecorder = eventRecorder;
        this.statusRecorder = statusRecorder;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public final void reconcile(CR cr, Context context) throws Exception {
        AbstractFlinkSpec abstractFlinkSpec = (AbstractFlinkSpec) cr.getSpec();
        Configuration deployConfig = getDeployConfig(cr.getMetadata(), abstractFlinkSpec, context);
        CommonStatus commonStatus = (CommonStatus) cr.getStatus();
        ReconciliationStatus<SPEC> reconciliationStatus2 = ((CommonStatus) cr.getStatus()).getReconciliationStatus2();
        if (!readyToReconcile(cr, context, deployConfig)) {
            LOG.info("Not ready for reconciliation yet...");
            return;
        }
        if (reconciliationStatus2.isFirstDeployment()) {
            LOG.info("Deploying for the first time");
            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
            this.statusRecorder.patchAndCacheStatus(cr);
            deploy(cr, abstractFlinkSpec, commonStatus, deployConfig, Optional.ofNullable(abstractFlinkSpec.getJob()).map((v0) -> {
                return v0.getInitialSavepointPath();
            }), false);
            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
            return;
        }
        boolean z = reconciliationStatus2.getState() == ReconciliationState.UPGRADING || !((AbstractFlinkSpec) cr.getSpec()).equals(((CommonStatus) cr.getStatus()).getReconciliationStatus2().deserializeLastReconciledSpec());
        Configuration observeConfig = getObserveConfig(cr, context);
        if (z) {
            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                return;
            }
            LOG.info(MSG_SPEC_CHANGED);
            if (reconciliationStatus2.getState() != ReconciliationState.UPGRADING) {
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) cr, EventRecorder.Type.Normal, EventRecorder.Reason.SpecChanged, EventRecorder.Component.JobManagerDeployment, MSG_SPEC_CHANGED);
            }
            reconcileSpecChange(cr, observeConfig, deployConfig);
            return;
        }
        if (!shouldRollBack(cr, observeConfig)) {
            if (reconcileOtherChanges(cr, observeConfig)) {
                return;
            }
            LOG.info("Resource fully reconciled, nothing to do...");
        } else {
            if (initiateRollBack(commonStatus)) {
                return;
            }
            LOG.warn(MSG_ROLLBACK);
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) cr, EventRecorder.Type.Normal, EventRecorder.Reason.Rollback, EventRecorder.Component.JobManagerDeployment, MSG_ROLLBACK);
            rollback(cr, context, observeConfig);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Configuration getDeployConfig(ObjectMeta objectMeta, SPEC spec, Context context);

    protected abstract Configuration getObserveConfig(CR cr, Context context);

    protected abstract boolean readyToReconcile(CR cr, Context context, Configuration configuration);

    protected abstract void reconcileSpecChange(CR cr, Configuration configuration, Configuration configuration2) throws Exception;

    protected abstract void rollback(CR cr, Context context, Configuration configuration) throws Exception;

    protected abstract boolean reconcileOtherChanges(CR cr, Configuration configuration) throws Exception;

    @Override // org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public final DeleteControl cleanup(CR cr, Context context) {
        return cleanupInternal(cr, context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void deploy(CR cr, SPEC spec, STATUS status, Configuration configuration, Optional<String> optional, boolean z) throws Exception;

    protected abstract DeleteControl cleanupInternal(CR cr, Context context);

    private boolean checkNewSpecAlreadyDeployed(CR cr, Configuration configuration) {
        if (((CommonStatus) cr.getStatus()).getReconciliationStatus2().getState() == ReconciliationState.UPGRADING) {
            return false;
        }
        if (!((AbstractFlinkSpec) cr.getSpec()).equals(ReconciliationUtils.getDeployedSpec(cr))) {
            return false;
        }
        LOG.info("The new spec matches the currently deployed last stable spec. No upgrade needed.");
        ReconciliationUtils.updateStatusForDeployedSpec(cr, configuration);
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean shouldRollBack(AbstractFlinkResource<SPEC, STATUS> abstractFlinkResource, Configuration configuration) {
        SPEC deserializeLastStableSpec;
        ReconciliationStatus<SPEC> reconciliationStatus2 = ((CommonStatus) abstractFlinkResource.getStatus()).getReconciliationStatus2();
        if (reconciliationStatus2.getState() == ReconciliationState.ROLLING_BACK) {
            return true;
        }
        if (!((Boolean) configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)).booleanValue() || reconciliationStatus2.getState() == ReconciliationState.ROLLED_BACK || reconciliationStatus2.isLastReconciledSpecStable() || (deserializeLastStableSpec = reconciliationStatus2.deserializeLastStableSpec()) == null) {
            return false;
        }
        if ((deserializeLastStableSpec.getJob() != null && deserializeLastStableSpec.getJob().getState() == JobState.SUSPENDED) || flinkVersionChanged((AbstractFlinkSpec) abstractFlinkResource.getSpec(), deserializeLastStableSpec)) {
            return false;
        }
        if (!Instant.now().minus((TemporalAmount) configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT)).isAfter(Instant.ofEpochMilli(reconciliationStatus2.getReconciliationTimestamp()))) {
            return false;
        }
        boolean isHaMetadataAvailable = this.flinkService.isHaMetadataAvailable(configuration);
        if (!isHaMetadataAvailable) {
            LOG.warn("Rollback is not possible due to missing HA metadata");
        }
        return isHaMetadataAvailable;
    }

    private boolean initiateRollBack(STATUS status) {
        ReconciliationStatus<SPEC> reconciliationStatus2 = status.getReconciliationStatus2();
        if (reconciliationStatus2.getState() == ReconciliationState.ROLLING_BACK) {
            return false;
        }
        LOG.warn("Preparing to roll back to last stable spec.");
        if (StringUtils.isEmpty(status.getError())) {
            status.setError("Deployment is not ready within the configured timeout, rolling back.");
        }
        reconciliationStatus2.setState(ReconciliationState.ROLLING_BACK);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static boolean shouldRecoverDeployment(Configuration configuration, FlinkDeployment flinkDeployment) {
        if (!jmMissingForRunningDeployment(flinkDeployment) || !((Boolean) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)).booleanValue()) {
            return false;
        }
        if (FlinkUtils.isKubernetesHAActivated(configuration)) {
            return true;
        }
        LOG.warn("Could not recover lost deployment without HA enabled");
        return false;
    }

    private static boolean jmMissingForRunningDeployment(FlinkDeployment flinkDeployment) {
        JobSpec job = ((FlinkDeploymentSpec) ReconciliationUtils.getDeployedSpec(flinkDeployment)).getJob();
        return (job == null || job.getState() == JobState.RUNNING) && ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean flinkVersionChanged(SPEC spec, SPEC spec2) {
        return (spec instanceof FlinkDeploymentSpec) && ((FlinkDeploymentSpec) spec).getFlinkVersion() != ((FlinkDeploymentSpec) spec2).getFlinkVersion();
    }
}
