package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.class */
public abstract class AbstractFlinkResourceReconciler<CR extends AbstractFlinkResource<SPEC, STATUS>, SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>> implements Reconciler<CR> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
    protected final FlinkConfigManager configManager;
    protected final EventRecorder eventRecorder;
    protected final StatusRecorder<CR, STATUS> statusRecorder;
    protected final KubernetesClient kubernetesClient;
    public static final String MSG_SUSPENDED = "Suspending existing deployment.";
    public static final String MSG_SPEC_CHANGED = "%s change(s) detected (%s), starting reconciliation.";
    public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
    public static final String MSG_SUBMIT = "Starting deployment";
    protected Clock clock = Clock.systemDefaultZone();

    public AbstractFlinkResourceReconciler(KubernetesClient kubernetesClient, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder, StatusRecorder<CR, STATUS> statusRecorder) {
        this.kubernetesClient = kubernetesClient;
        this.configManager = flinkConfigManager;
        this.eventRecorder = eventRecorder;
        this.statusRecorder = statusRecorder;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public final void reconcile(CR cr, Context<?> context) throws Exception {
        AbstractFlinkSpec abstractFlinkSpec = (AbstractFlinkSpec) cr.getSpec();
        Configuration deployConfig = getDeployConfig(cr.getMetadata(), abstractFlinkSpec, context);
        CommonStatus commonStatus = (CommonStatus) cr.getStatus();
        ReconciliationStatus reconciliationStatus = ((CommonStatus) cr.getStatus()).getReconciliationStatus();
        if (!readyToReconcile(cr, context, deployConfig)) {
            LOG.info("Not ready for reconciliation yet...");
            return;
        }
        if (reconciliationStatus.isBeforeFirstDeployment()) {
            LOG.info("Deploying for the first time");
            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
            this.statusRecorder.patchAndCacheStatus(cr);
            deploy(cr, abstractFlinkSpec, commonStatus, context, deployConfig, Optional.ofNullable(abstractFlinkSpec.getJob()).map((v0) -> {
                return v0.getInitialSavepointPath();
            }), false);
            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
            return;
        }
        DiffResult m37build = new ReflectiveDiffBuilder((AbstractFlinkSpec) cr.getSpec(), ((CommonStatus) cr.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec()).m37build();
        FlinkService flinkService = getFlinkService(cr, context);
        boolean z = DiffType.IGNORE != m37build.getType() || reconciliationStatus.getState() == ReconciliationState.UPGRADING;
        Configuration observeConfig = getObserveConfig(cr, context);
        if (z) {
            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                return;
            }
            String format = String.format(MSG_SPEC_CHANGED, m37build.getType(), m37build);
            LOG.info(format);
            if (reconciliationStatus.getState() != ReconciliationState.UPGRADING) {
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) cr, EventRecorder.Type.Normal, EventRecorder.Reason.SpecChanged, EventRecorder.Component.JobManagerDeployment, format);
            }
            reconcileSpecChange(cr, context, observeConfig, deployConfig, m37build.getType());
            return;
        }
        if (!shouldRollBack(cr, observeConfig, flinkService)) {
            if (reconcileOtherChanges(cr, context, observeConfig)) {
                return;
            }
            LOG.info("Resource fully reconciled, nothing to do...");
        } else {
            if (initiateRollBack(commonStatus)) {
                return;
            }
            LOG.warn(MSG_ROLLBACK);
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) cr, EventRecorder.Type.Normal, EventRecorder.Reason.Rollback, EventRecorder.Component.JobManagerDeployment, MSG_ROLLBACK);
            rollback(cr, context, observeConfig);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Configuration getDeployConfig(ObjectMeta objectMeta, SPEC spec, Context<?> context);

    protected abstract Configuration getObserveConfig(CR cr, Context<?> context);

    protected abstract boolean readyToReconcile(CR cr, Context<?> context, Configuration configuration);

    protected abstract void reconcileSpecChange(CR cr, Context<?> context, Configuration configuration, Configuration configuration2, DiffType diffType) throws Exception;

    protected abstract void rollback(CR cr, Context<?> context, Configuration configuration) throws Exception;

    protected abstract boolean reconcileOtherChanges(CR cr, Context<?> context, Configuration configuration) throws Exception;

    public final DeleteControl cleanup(CR cr, Context<?> context) {
        return cleanupInternal(cr, context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void deploy(CR cr, SPEC spec, STATUS status, Context<?> context, Configuration configuration, Optional<String> optional, boolean z) throws Exception;

    protected abstract DeleteControl cleanupInternal(CR cr, Context<?> context);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract FlinkService getFlinkService(CR cr, Context<?> context);

    private boolean checkNewSpecAlreadyDeployed(CR cr, Configuration configuration) {
        if (((CommonStatus) cr.getStatus()).getReconciliationStatus().getState() == ReconciliationState.UPGRADING) {
            return false;
        }
        if (!((AbstractFlinkSpec) cr.getSpec()).equals(ReconciliationUtils.getDeployedSpec(cr))) {
            return false;
        }
        LOG.info("The new spec matches the currently deployed last stable spec. No upgrade needed.");
        ReconciliationUtils.updateStatusForDeployedSpec(cr, configuration);
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean shouldRollBack(AbstractFlinkResource<SPEC, STATUS> abstractFlinkResource, Configuration configuration, FlinkService flinkService) {
        AbstractFlinkSpec deserializeLastStableSpec;
        ReconciliationStatus reconciliationStatus = ((CommonStatus) abstractFlinkResource.getStatus()).getReconciliationStatus();
        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
            return true;
        }
        if (!((Boolean) configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)).booleanValue() || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK || reconciliationStatus.isLastReconciledSpecStable() || (deserializeLastStableSpec = reconciliationStatus.deserializeLastStableSpec()) == null) {
            return false;
        }
        if ((deserializeLastStableSpec.getJob() != null && deserializeLastStableSpec.getJob().getState() == JobState.SUSPENDED) || flinkVersionChanged((AbstractFlinkSpec) abstractFlinkResource.getSpec(), deserializeLastStableSpec)) {
            return false;
        }
        if (!this.clock.instant().minus((TemporalAmount) configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT)).isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) {
            return false;
        }
        boolean isHaMetadataAvailable = flinkService.isHaMetadataAvailable(configuration);
        if (!isHaMetadataAvailable) {
            LOG.warn("Rollback is not possible due to missing HA metadata");
        }
        return isHaMetadataAvailable;
    }

    private boolean initiateRollBack(STATUS status) {
        ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
            return false;
        }
        LOG.warn("Preparing to roll back to last stable spec.");
        if (StringUtils.isEmpty(status.getError())) {
            status.setError("Deployment is not ready within the configured timeout, rolling back.");
        }
        reconciliationStatus.setState(ReconciliationState.ROLLING_BACK);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean shouldRecoverDeployment(Configuration configuration, FlinkDeployment flinkDeployment) {
        boolean z = false;
        if (((Boolean) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)).booleanValue()) {
            LOG.debug("Checking whether jobmanager deployment needs recovery");
            if (jmMissingForRunningDeployment(flinkDeployment)) {
                LOG.debug("Jobmanager deployment is missing, trying to recover");
                if (FlinkUtils.isKubernetesHAActivated(configuration)) {
                    LOG.debug("HA is enabled, recovering lost jobmanager deployment");
                    z = true;
                } else {
                    LOG.warn("Could not recover lost jobmanager deployment without HA enabled");
                }
            }
        }
        return z;
    }

    private boolean jmMissingForRunningDeployment(FlinkDeployment flinkDeployment) {
        JobSpec job = ReconciliationUtils.getDeployedSpec(flinkDeployment).getJob();
        return (job == null || job.getState() == JobState.RUNNING) && ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean flinkVersionChanged(SPEC spec, SPEC spec2) {
        return (spec instanceof FlinkDeploymentSpec) && ((FlinkDeploymentSpec) spec).getFlinkVersion() != ((FlinkDeploymentSpec) spec2).getFlinkVersion();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setOwnerReference(CR cr, Configuration configuration) {
        configuration.set(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE, List.of(Map.of("apiVersion", cr.getApiVersion(), "kind", cr.getKind(), "name", cr.getMetadata().getName(), "uid", cr.getMetadata().getUid(), "blockOwnerDeletion", "false", "controller", "false")));
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = clock;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public /* bridge */ /* synthetic */ DeleteControl cleanup(Object obj, Context context) {
        return cleanup((AbstractFlinkResourceReconciler<CR, SPEC, STATUS>) obj, (Context<?>) context);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public /* bridge */ /* synthetic */ void reconcile(Object obj, Context context) throws Exception {
        reconcile((AbstractFlinkResourceReconciler<CR, SPEC, STATUS>) obj, (Context<?>) context);
    }
}
