package org.apache.flink.kubernetes.operator.reconciler;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.class */
public class ReconciliationUtils {
    public static final String INTERNAL_METADATA_JSON_KEY = "resource_metadata";
    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationUtils.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public static <SPEC extends AbstractFlinkSpec> void updateStatusForDeployedSpec(AbstractFlinkResource<SPEC, ?> abstractFlinkResource, Configuration configuration) {
        JobSpec job = ((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getJob();
        updateStatusForSpecReconciliation(abstractFlinkResource, job != null ? job.getState() : null, configuration, false);
    }

    public static <SPEC extends AbstractFlinkSpec> void updateStatusBeforeDeploymentAttempt(AbstractFlinkResource<SPEC, ?> abstractFlinkResource, Configuration configuration) {
        updateStatusForSpecReconciliation(abstractFlinkResource, JobState.SUSPENDED, configuration, true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(AbstractFlinkResource<SPEC, ?> abstractFlinkResource, JobState jobState, Configuration configuration, boolean z) {
        CommonStatus commonStatus = (CommonStatus) abstractFlinkResource.getStatus();
        AbstractFlinkSpec abstractFlinkSpec = (AbstractFlinkSpec) abstractFlinkResource.getSpec();
        ReconciliationStatus reconciliationStatus2 = commonStatus.getReconciliationStatus2();
        commonStatus.setError("");
        if (abstractFlinkSpec.getJob() != null) {
            AbstractFlinkSpec abstractFlinkSpec2 = (AbstractFlinkSpec) clone(abstractFlinkSpec);
            JobSpec job = abstractFlinkSpec2.getJob();
            job.setState(jobState);
            AbstractFlinkSpec deserializeLastReconciledSpec = reconciliationStatus2.deserializeLastReconciledSpec();
            if (deserializeLastReconciledSpec != null) {
                job.setSavepointTriggerNonce(deserializeLastReconciledSpec.getJob().getSavepointTriggerNonce());
            }
            if (abstractFlinkResource instanceof FlinkDeployment) {
                ((FlinkDeploymentStatus) commonStatus).setTaskManager(getTaskManagerInfo(abstractFlinkResource.getMetadata().getName(), configuration, jobState));
            }
            reconciliationStatus2.serializeAndSetLastReconciledSpec(abstractFlinkSpec2, abstractFlinkResource);
            if (abstractFlinkSpec.getJob().getState() == JobState.SUSPENDED) {
                reconciliationStatus2.markReconciledSpecAsStable();
            }
        } else {
            reconciliationStatus2.serializeAndSetLastReconciledSpec(abstractFlinkSpec, abstractFlinkResource);
        }
        reconciliationStatus2.setReconciliationTimestamp(System.currentTimeMillis());
        reconciliationStatus2.setState(z ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
    }

    public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSavepointTriggerNonce(SavepointInfo savepointInfo, AbstractFlinkResource<SPEC, ?> abstractFlinkResource) {
        if (savepointInfo.getTriggerType() != SavepointTriggerType.MANUAL) {
            return;
        }
        CommonStatus commonStatus = (CommonStatus) abstractFlinkResource.getStatus();
        AbstractFlinkSpec abstractFlinkSpec = (AbstractFlinkSpec) abstractFlinkResource.getSpec();
        ReconciliationStatus<SPEC> reconciliationStatus2 = commonStatus.getReconciliationStatus2();
        SPEC deserializeLastReconciledSpec = reconciliationStatus2.deserializeLastReconciledSpec();
        deserializeLastReconciledSpec.getJob().setSavepointTriggerNonce(abstractFlinkSpec.getJob().getSavepointTriggerNonce());
        reconciliationStatus2.serializeAndSetLastReconciledSpec(deserializeLastReconciledSpec, abstractFlinkResource);
        reconciliationStatus2.setReconciliationTimestamp(System.currentTimeMillis());
    }

    private static TaskManagerInfo getTaskManagerInfo(String str, Configuration configuration, JobState jobState) {
        return jobState == JobState.RUNNING ? new TaskManagerInfo("component=taskmanager,app=" + str, FlinkUtils.getNumTaskManagers(configuration)) : new TaskManagerInfo("", 0);
    }

    public static void updateForReconciliationError(AbstractFlinkResource<?, ?> abstractFlinkResource, String str) {
        ((CommonStatus) abstractFlinkResource.getStatus()).setError(str);
    }

    public static <T> T clone(T t) {
        if (t == null) {
            return null;
        }
        try {
            return (T) objectMapper.readValue(objectMapper.writeValueAsString(t), t.getClass());
        } catch (JsonProcessingException e) {
            throw new IllegalStateException((Throwable) e);
        }
    }

    public static <SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, R extends CustomResource<SPEC, STATUS>> UpdateControl<R> toUpdateControl(FlinkOperatorConfiguration flinkOperatorConfiguration, R r, R r2, boolean z) {
        CommonStatus commonStatus = (CommonStatus) r.getStatus();
        UpdateControl<R> noUpdate = UpdateControl.noUpdate();
        return !z ? noUpdate : upgradeStarted(commonStatus.getReconciliationStatus2().getState(), ((CommonStatus) r2.getStatus()).getReconciliationStatus2().getState()) ? noUpdate.rescheduleAfter(0L) : commonStatus instanceof FlinkDeploymentStatus ? noUpdate.rescheduleAfter(((FlinkDeploymentStatus) commonStatus).getJobManagerDeploymentStatus().rescheduleAfter((FlinkDeployment) r, flinkOperatorConfiguration).toMillis()) : noUpdate.rescheduleAfter(flinkOperatorConfiguration.getReconcileInterval().toMillis());
    }

    public static boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously(AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration) {
        return (getDeployedSpec(abstractFlinkResource).getJob().getUpgradeMode() == UpgradeMode.LAST_STATE || ((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getJob().getUpgradeMode() != UpgradeMode.LAST_STATE || FlinkUtils.isKubernetesHAActivated(configuration)) ? false : true;
    }

    public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec(AbstractFlinkResource<SPEC, ?> abstractFlinkResource) {
        ReconciliationStatus<SPEC> reconciliationStatus2 = ((CommonStatus) abstractFlinkResource.getStatus()).getReconciliationStatus2();
        return reconciliationStatus2.getState() != ReconciliationState.ROLLED_BACK ? reconciliationStatus2.deserializeLastReconciledSpec() : reconciliationStatus2.deserializeLastStableSpec();
    }

    private static boolean upgradeStarted(ReconciliationState reconciliationState, ReconciliationState reconciliationState2) {
        if (reconciliationState == reconciliationState2) {
            return false;
        }
        return reconciliationState == ReconciliationState.ROLLING_BACK || reconciliationState == ReconciliationState.UPGRADING;
    }

    public static <T extends AbstractFlinkSpec> Tuple2<T, ObjectNode> deserializeSpecWithMeta(@Nullable String str, Class<T> cls) {
        if (str == null) {
            return null;
        }
        try {
            ObjectNode readTree = objectMapper.readTree(str);
            ObjectNode remove = readTree.remove(INTERNAL_METADATA_JSON_KEY);
            if (remove != null) {
                return Tuple2.of((AbstractFlinkSpec) objectMapper.treeToValue(readTree.get("spec"), cls), remove);
            }
            readTree.remove("apiVersion");
            return Tuple2.of((AbstractFlinkSpec) objectMapper.treeToValue(readTree, cls), (Object) null);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e);
        }
    }

    public static String writeSpecWithMeta(AbstractFlinkSpec abstractFlinkSpec, AbstractFlinkResource<?, ?> abstractFlinkResource) {
        ObjectNode createObjectNode = objectMapper.createObjectNode();
        createObjectNode.put("apiVersion", abstractFlinkResource.getApiVersion());
        createObjectNode.putObject("metadata").put("generation", abstractFlinkResource.getMetadata().getGeneration());
        return writeSpecWithMeta(abstractFlinkSpec, createObjectNode);
    }

    public static String writeSpecWithMeta(AbstractFlinkSpec abstractFlinkSpec, ObjectNode objectNode) {
        ObjectNode createObjectNode = objectMapper.createObjectNode();
        createObjectNode.set("spec", objectMapper.valueToTree(Preconditions.checkNotNull(abstractFlinkSpec)));
        createObjectNode.set(INTERNAL_METADATA_JSON_KEY, objectNode);
        try {
            return objectMapper.writeValueAsString(createObjectNode);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Could not serialize spec, this indicates a bug...", e);
        }
    }

    public static boolean isJobInTerminalState(CommonStatus<?> commonStatus) {
        return JobStatus.valueOf(commonStatus.getJobStatus().getState()).isGloballyTerminalState();
    }

    public static boolean isJobRunning(CommonStatus<?> commonStatus) {
        return JobStatus.RUNNING.name().equals(commonStatus.getJobStatus().getState());
    }

    public static <SPEC extends AbstractFlinkSpec> boolean applyValidationErrorAndResetSpec(AbstractFlinkResource<SPEC, ?> abstractFlinkResource, String str) {
        CommonStatus commonStatus = (CommonStatus) abstractFlinkResource.getStatus();
        if (!str.equals(commonStatus.getError())) {
            LOG.error("Validation failed: " + str);
            updateForReconciliationError(abstractFlinkResource, str);
        }
        SPEC deserializeLastReconciledSpec = commonStatus.getReconciliationStatus2().deserializeLastReconciledSpec();
        if (deserializeLastReconciledSpec == null) {
            return false;
        }
        abstractFlinkResource.setSpec(deserializeLastReconciledSpec);
        if (commonStatus.getReconciliationStatus2().getState() != ReconciliationState.UPGRADING) {
            return true;
        }
        ((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getJob().setState(JobState.RUNNING);
        return true;
    }

    public static <STATUS extends CommonStatus<?>, R extends AbstractFlinkResource<?, STATUS>> ErrorStatusUpdateControl<R> toErrorStatusUpdateControl(R r, Optional<RetryInfo> optional, Exception exc, StatusRecorder<STATUS> statusRecorder) {
        optional.ifPresent(retryInfo -> {
            LOG.warn("Attempt count: {}, last attempt: {}", Integer.valueOf(retryInfo.getAttemptCount()), Boolean.valueOf(retryInfo.isLastAttempt()));
        });
        statusRecorder.updateStatusFromCache(r);
        updateForReconciliationError(r, exc instanceof ReconciliationException ? exc.getCause().toString() : exc.toString());
        statusRecorder.patchAndCacheStatus(r);
        return ErrorStatusUpdateControl.noStatusUpdate();
    }

    public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?> abstractFlinkResource) {
        Tuple2 deserializeLastReconciledSpecWithMeta = ((CommonStatus) abstractFlinkResource.getStatus()).getReconciliationStatus2().deserializeLastReconciledSpecWithMeta();
        if (deserializeLastReconciledSpecWithMeta.f1 == null) {
            return -1L;
        }
        return Long.valueOf(((ObjectNode) deserializeLastReconciledSpecWithMeta.f1).get("metadata").get("generation").asLong(-1L));
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec] */
    public static void checkAndUpdateStableSpec(CommonStatus<?> commonStatus) {
        JobStatus valueOf = JobStatus.valueOf(commonStatus.getJobStatus().getState());
        if (commonStatus.getReconciliationStatus2().getState() != ReconciliationState.DEPLOYED) {
            return;
        }
        if (valueOf == JobStatus.RUNNING) {
            commonStatus.getReconciliationStatus2().markReconciledSpecAsStable();
        } else if (commonStatus.getReconciliationStatus2().deserializeLastReconciledSpec().getJob().getState() == JobState.RUNNING && valueOf == JobStatus.FINISHED) {
            commonStatus.getReconciliationStatus2().markReconciledSpecAsStable();
        }
    }

    public static <SPEC extends AbstractFlinkSpec> void updateStatusForAlreadyUpgraded(AbstractFlinkResource<SPEC, ?> abstractFlinkResource) {
        ReconciliationStatus<SPEC> reconciliationStatus2 = ((CommonStatus) abstractFlinkResource.getStatus()).getReconciliationStatus2();
        Tuple2<SPEC, ObjectNode> deserializeLastReconciledSpecWithMeta = reconciliationStatus2.deserializeLastReconciledSpecWithMeta();
        JobSpec job = ((AbstractFlinkSpec) deserializeLastReconciledSpecWithMeta.f0).getJob();
        if (job != null) {
            job.setState(JobState.RUNNING);
        }
        reconciliationStatus2.setState(ReconciliationState.DEPLOYED);
        reconciliationStatus2.setLastReconciledSpec(writeSpecWithMeta((AbstractFlinkSpec) deserializeLastReconciledSpecWithMeta.f0, (ObjectNode) deserializeLastReconciledSpecWithMeta.f1));
    }
}
