package org.apache.flink.kubernetes.operator.observer.deployment;

import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.MissingJobManagerException;
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.class */
public abstract class AbstractFlinkDeploymentObserver extends AbstractFlinkResourceObserver<FlinkDeployment, FlinkDeploymentObserverContext> {
    protected final Logger logger;
    protected final FlinkService flinkService;

    public AbstractFlinkDeploymentObserver(FlinkService flinkService, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder) {
        super(flinkConfigManager, eventRecorder);
        this.logger = LoggerFactory.getLogger(getClass());
        this.flinkService = flinkService;
    }

    /* renamed from: getObserverContext, reason: avoid collision after fix types in other method */
    protected FlinkDeploymentObserverContext getObserverContext2(FlinkDeployment flinkDeployment, Context<?> context) {
        return new FlinkDeploymentObserverContext(flinkDeployment, this.configManager);
    }

    /* renamed from: observeInternal, reason: avoid collision after fix types in other method */
    public void observeInternal2(FlinkDeployment flinkDeployment, Context<?> context, FlinkDeploymentObserverContext flinkDeploymentObserverContext) {
        if (!isJmDeploymentReady(flinkDeployment)) {
            observeJmDeployment(flinkDeployment, context, flinkDeploymentObserverContext.getDeployedConfig());
        }
        if (isJmDeploymentReady(flinkDeployment)) {
            observeFlinkCluster(flinkDeployment, context, flinkDeploymentObserverContext);
        }
        if (isJmDeploymentReady(flinkDeployment)) {
            observeClusterInfo(flinkDeployment, flinkDeploymentObserverContext.getDeployedConfig());
        }
        clearErrorsIfDeploymentIsHealthy(flinkDeployment);
    }

    private void observeClusterInfo(FlinkDeployment flinkDeployment, Configuration configuration) {
        try {
            ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getClusterInfo().putAll(this.flinkService.getClusterInfo(configuration));
            this.logger.debug("ClusterInfo: {}", ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getClusterInfo());
        } catch (Exception e) {
            this.logger.error("Exception while fetching cluster info", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void observeJmDeployment(FlinkDeployment flinkDeployment, Context<?> context, Configuration configuration) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        JobManagerDeploymentStatus jobManagerDeploymentStatus = flinkDeploymentStatus.getJobManagerDeploymentStatus();
        if (isSuspendedJob(flinkDeployment)) {
            this.logger.debug("Skipping observe step for suspended application deployments");
            return;
        }
        this.logger.info("Observing JobManager deployment. Previous status: {}", jobManagerDeploymentStatus.name());
        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == jobManagerDeploymentStatus) {
            this.logger.info("JobManager deployment is ready");
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
            return;
        }
        Optional secondaryResource = context.getSecondaryResource(Deployment.class);
        if (!secondaryResource.isPresent()) {
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
            flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
            if (jobManagerDeploymentStatus == JobManagerDeploymentStatus.MISSING || jobManagerDeploymentStatus == JobManagerDeploymentStatus.ERROR) {
                return;
            }
            onMissingDeployment(flinkDeployment);
            return;
        }
        DeploymentStatus status = ((Deployment) secondaryResource.get()).getStatus();
        DeploymentSpec spec = ((Deployment) secondaryResource.get()).getSpec();
        if (status != null && status.getAvailableReplicas() != null && spec.getReplicas().intValue() == status.getReplicas().intValue() && spec.getReplicas().intValue() == status.getAvailableReplicas().intValue() && this.flinkService.isJobManagerPortReady(configuration)) {
            this.logger.info("JobManager deployment port is ready, waiting for the Flink REST API...");
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
            return;
        }
        try {
            checkFailedCreate(status);
            checkContainerBackoff(flinkDeployment, configuration);
            this.logger.info("JobManager is being deployed");
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        } catch (DeploymentFailedException e) {
            flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
            if (!JobManagerDeploymentStatus.ERROR.equals(flinkDeploymentStatus.getJobManagerDeploymentStatus())) {
                throw e;
            }
        }
    }

    private void checkFailedCreate(DeploymentStatus deploymentStatus) {
        for (DeploymentCondition deploymentCondition : deploymentStatus.getConditions()) {
            if ("FailedCreate".equals(deploymentCondition.getReason()) && "ReplicaFailure".equals(deploymentCondition.getType())) {
                throw new DeploymentFailedException(deploymentCondition);
            }
        }
    }

    private void checkContainerBackoff(FlinkDeployment flinkDeployment, Configuration configuration) {
        Iterator it = this.flinkService.getJmPodList(flinkDeployment, configuration).getItems().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Pod) it.next()).getStatus().getContainerStatuses().iterator();
            while (it2.hasNext()) {
                ContainerStateWaiting waiting = ((ContainerStatus) it2.next()).getState().getWaiting();
                if (waiting != null && Set.of(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF, DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF).contains(waiting.getReason())) {
                    throw new DeploymentFailedException(waiting);
                }
            }
        }
    }

    protected boolean isJmDeploymentReady(FlinkDeployment flinkDeployment) {
        return ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY;
    }

    protected void clearErrorsIfDeploymentIsHealthy(FlinkDeployment flinkDeployment) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        FlinkDeploymentReconciliationStatus reconciliationStatus = flinkDeploymentStatus.getReconciliationStatus();
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR || JobStatus.FAILED.name().equals(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getState()) || !reconciliationStatus.isLastReconciledSpecStable()) {
            return;
        }
        flinkDeploymentStatus.setError((String) null);
    }

    protected boolean isSuspendedJob(FlinkDeployment flinkDeployment) {
        JobSpec job = ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob();
        if (job == null) {
            return false;
        }
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        FlinkDeploymentSpec deserializeLastReconciledSpec = flinkDeploymentStatus.getReconciliationStatus().deserializeLastReconciledSpec();
        return flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING && job.getState() == JobState.SUSPENDED && deserializeLastReconciledSpec != null && deserializeLastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
    }

    private void onMissingDeployment(FlinkDeployment flinkDeployment) {
        this.logger.error("Missing JobManager deployment");
        ReconciliationUtils.updateForReconciliationError(flinkDeployment, new MissingJobManagerException("Missing JobManager deployment"), this.configManager.getOperatorConfiguration());
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) flinkDeployment, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.JobManagerDeployment, "Missing JobManager deployment");
    }

    /* renamed from: updateStatusToDeployedIfAlreadyUpgraded, reason: avoid collision after fix types in other method */
    protected void updateStatusToDeployedIfAlreadyUpgraded2(FlinkDeployment flinkDeployment, Context<?> context, FlinkDeploymentObserverContext flinkDeploymentObserverContext) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        context.getSecondaryResource(Deployment.class).ifPresent(deployment -> {
            Map annotations = deployment.getMetadata().getAnnotations();
            if (annotations == null) {
                return;
            }
            Long l = (Long) Optional.ofNullable((String) annotations.get(FlinkUtils.CR_GENERATION_LABEL)).map(Long::valueOf).orElse(-1L);
            Long upgradeTargetGeneration = ReconciliationUtils.getUpgradeTargetGeneration(flinkDeployment);
            if (!l.equals(upgradeTargetGeneration)) {
                this.logger.warn("Running deployment generation {} doesn't match upgrade target generation {}.", l, upgradeTargetGeneration);
                return;
            }
            this.logger.info("Pending upgrade is already deployed, updating status.");
            ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkDeployment);
            if (((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob() != null) {
                flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
            }
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        });
    }

    protected abstract void observeFlinkCluster(FlinkDeployment flinkDeployment, Context<?> context, FlinkDeploymentObserverContext flinkDeploymentObserverContext);

    @Override // org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver
    protected /* bridge */ /* synthetic */ void updateStatusToDeployedIfAlreadyUpgraded(FlinkDeployment flinkDeployment, Context context, FlinkDeploymentObserverContext flinkDeploymentObserverContext) {
        updateStatusToDeployedIfAlreadyUpgraded2(flinkDeployment, (Context<?>) context, flinkDeploymentObserverContext);
    }

    @Override // org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver
    public /* bridge */ /* synthetic */ void observeInternal(FlinkDeployment flinkDeployment, Context context, FlinkDeploymentObserverContext flinkDeploymentObserverContext) {
        observeInternal2(flinkDeployment, (Context<?>) context, flinkDeploymentObserverContext);
    }

    @Override // org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver
    protected /* bridge */ /* synthetic */ FlinkDeploymentObserverContext getObserverContext(FlinkDeployment flinkDeployment, Context context) {
        return getObserverContext2(flinkDeployment, (Context<?>) context);
    }
}
