package org.apache.flink.kubernetes.operator.observer.deployment;

import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.MissingJobManagerException;
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.class */
public abstract class AbstractFlinkDeploymentObserver extends AbstractFlinkResourceObserver<FlinkDeployment> {
    protected final Logger logger;

    public AbstractFlinkDeploymentObserver(EventRecorder eventRecorder) {
        super(eventRecorder);
        this.logger = LoggerFactory.getLogger(getClass());
    }

    @Override // org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver
    public void observeInternal(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        FlinkDeployment resource = flinkResourceContext.getResource();
        if (!isJmDeploymentReady(resource)) {
            observeJmDeployment(flinkResourceContext);
        }
        if (isJmDeploymentReady(resource)) {
            observeFlinkCluster(flinkResourceContext);
        }
        if (isJmDeploymentReady(resource)) {
            observeClusterInfo(flinkResourceContext);
        }
        clearErrorsIfDeploymentIsHealthy(resource);
    }

    private void observeClusterInfo(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        FlinkDeployment resource = flinkResourceContext.getResource();
        try {
            ((FlinkDeploymentStatus) resource.getStatus()).getClusterInfo().putAll(flinkResourceContext.getFlinkService().getClusterInfo(flinkResourceContext.getObserveConfig()));
            this.logger.debug("ClusterInfo: {}", ((FlinkDeploymentStatus) resource.getStatus()).getClusterInfo());
        } catch (Exception e) {
            this.logger.error("Exception while fetching cluster info", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void observeJmDeployment(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        FlinkDeployment resource = flinkResourceContext.getResource();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) resource.getStatus();
        JobManagerDeploymentStatus jobManagerDeploymentStatus = flinkDeploymentStatus.getJobManagerDeploymentStatus();
        if (isSuspendedJob(resource)) {
            this.logger.debug("Skipping observe step for suspended application deployments");
            return;
        }
        this.logger.info("Observing JobManager deployment. Previous status: {}", jobManagerDeploymentStatus.name());
        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == jobManagerDeploymentStatus) {
            this.logger.info("JobManager deployment is ready");
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
            return;
        }
        Optional secondaryResource = flinkResourceContext.getJosdkContext().getSecondaryResource(Deployment.class);
        if (!secondaryResource.isPresent()) {
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
            flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
            if (jobManagerDeploymentStatus == JobManagerDeploymentStatus.MISSING || jobManagerDeploymentStatus == JobManagerDeploymentStatus.ERROR) {
                return;
            }
            onMissingDeployment(flinkResourceContext);
            return;
        }
        DeploymentStatus status = ((Deployment) secondaryResource.get()).getStatus();
        DeploymentSpec spec = ((Deployment) secondaryResource.get()).getSpec();
        if (status != null && status.getAvailableReplicas() != null && spec.getReplicas().intValue() == status.getReplicas().intValue() && spec.getReplicas().intValue() == status.getAvailableReplicas().intValue() && flinkResourceContext.getFlinkService().isJobManagerPortReady(flinkResourceContext.getObserveConfig())) {
            this.logger.info("JobManager deployment port is ready, waiting for the Flink REST API...");
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
            return;
        }
        try {
            checkFailedCreate(status);
            checkContainerBackoff(flinkResourceContext);
            this.logger.info("JobManager is being deployed");
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        } catch (DeploymentFailedException e) {
            flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
            if (!JobManagerDeploymentStatus.ERROR.equals(flinkDeploymentStatus.getJobManagerDeploymentStatus())) {
                throw e;
            }
        }
    }

    private void checkFailedCreate(DeploymentStatus deploymentStatus) {
        for (DeploymentCondition deploymentCondition : deploymentStatus.getConditions()) {
            if ("FailedCreate".equals(deploymentCondition.getReason()) && "ReplicaFailure".equals(deploymentCondition.getType())) {
                throw new DeploymentFailedException(deploymentCondition);
            }
        }
    }

    private void checkContainerBackoff(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        Iterator it = flinkResourceContext.getFlinkService().getJmPodList(flinkResourceContext.getResource(), flinkResourceContext.getObserveConfig()).getItems().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Pod) it.next()).getStatus().getContainerStatuses().iterator();
            while (it2.hasNext()) {
                ContainerStateWaiting waiting = ((ContainerStatus) it2.next()).getState().getWaiting();
                if (waiting != null && Set.of(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF, DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF, DeploymentFailedException.REASON_ERR_IMAGE_PULL).contains(waiting.getReason())) {
                    throw new DeploymentFailedException(waiting);
                }
            }
        }
    }

    protected boolean isJmDeploymentReady(FlinkDeployment flinkDeployment) {
        return ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY;
    }

    protected void clearErrorsIfDeploymentIsHealthy(FlinkDeployment flinkDeployment) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        FlinkDeploymentReconciliationStatus reconciliationStatus = flinkDeploymentStatus.getReconciliationStatus();
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR || JobStatus.FAILED.name().equals(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getState()) || !reconciliationStatus.isLastReconciledSpecStable()) {
            return;
        }
        flinkDeploymentStatus.setError((String) null);
    }

    protected boolean isSuspendedJob(FlinkDeployment flinkDeployment) {
        JobSpec job = ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob();
        if (job == null) {
            return false;
        }
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        FlinkDeploymentSpec deserializeLastReconciledSpec = flinkDeploymentStatus.getReconciliationStatus().deserializeLastReconciledSpec();
        return flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING && job.getState() == JobState.SUSPENDED && deserializeLastReconciledSpec != null && deserializeLastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
    }

    private void onMissingDeployment(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        this.logger.error("Missing JobManager deployment");
        ReconciliationUtils.updateForReconciliationError(flinkResourceContext, new MissingJobManagerException("Missing JobManager deployment"));
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) flinkResourceContext.getResource(), EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.JobManagerDeployment, "Missing JobManager deployment");
    }

    @Override // org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver
    protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        FlinkDeployment resource = flinkResourceContext.getResource();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) resource.getStatus();
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            return false;
        }
        Optional secondaryResource = flinkResourceContext.getJosdkContext().getSecondaryResource(Deployment.class);
        if (!secondaryResource.isPresent()) {
            return false;
        }
        Deployment deployment = (Deployment) secondaryResource.get();
        if (deployment.isMarkedForDeletion()) {
            this.logger.debug("Deployment already marked for deletion, ignoring...");
            return false;
        }
        Map annotations = deployment.getMetadata().getAnnotations();
        if (annotations == null) {
            this.logger.warn("Running deployment doesn't have any annotations. This could indicate a deployment error.");
            return false;
        }
        Long l = (Long) Optional.ofNullable((String) annotations.get(FlinkUtils.CR_GENERATION_LABEL)).map(Long::valueOf).orElse(-1L);
        Long upgradeTargetGeneration = ReconciliationUtils.getUpgradeTargetGeneration(resource);
        if (!l.equals(upgradeTargetGeneration)) {
            this.logger.warn("Running deployment generation {} doesn't match upgrade target generation {}.", l, upgradeTargetGeneration);
            return false;
        }
        this.logger.info("Pending upgrade is already deployed, updating status.");
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        return true;
    }

    protected abstract void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> flinkResourceContext);
}
