package org.apache.flink.kubernetes.operator.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.exception.StatusConflictException;
import org.apache.flink.kubernetes.operator.listener.AuditUtils;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/utils/StatusRecorder.class */
public class StatusRecorder<CR extends AbstractFlinkResource<?, STATUS>, STATUS extends CommonStatus<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(StatusRecorder.class);
    protected final ObjectMapper objectMapper = new ObjectMapper();
    protected final ConcurrentHashMap<ResourceID, ObjectNode> statusCache = new ConcurrentHashMap<>();
    private final KubernetesClient client;
    private final MetricManager<CR> metricManager;
    private final BiConsumer<CR, STATUS> statusUpdateListener;

    public StatusRecorder(KubernetesClient kubernetesClient, MetricManager<CR> metricManager, BiConsumer<CR, STATUS> biConsumer) {
        this.client = kubernetesClient;
        this.statusUpdateListener = biConsumer;
        this.metricManager = metricManager;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void patchAndCacheStatus(CR cr) {
        ObjectNode objectNode = (ObjectNode) this.objectMapper.convertValue(cr.getStatus(), ObjectNode.class);
        ResourceID fromResource = ResourceID.fromResource(cr);
        ObjectNode objectNode2 = this.statusCache.get(fromResource);
        if (objectNode.equals(objectNode2)) {
            LOG.debug("No status change.");
            return;
        }
        CommonStatus commonStatus = (CommonStatus) this.objectMapper.convertValue(objectNode2, cr instanceof FlinkDeployment ? FlinkDeploymentStatus.class : FlinkSessionJobStatus.class);
        KubernetesClientException kubernetesClientException = null;
        for (int i = 0; i < 3; i++) {
            try {
                replaceStatus(cr, commonStatus);
                kubernetesClientException = null;
            } catch (KubernetesClientException e) {
                LOG.error("Error while patching status, retrying {}/3...", Integer.valueOf(i + 1), e);
                Thread.sleep(1000L);
                kubernetesClientException = e;
            }
        }
        if (kubernetesClientException != null) {
            throw kubernetesClientException;
        }
        this.statusCache.put(fromResource, objectNode);
        this.statusUpdateListener.accept(cr, commonStatus);
        this.metricManager.onUpdate(cr);
    }

    private void replaceStatus(CR cr, STATUS status) throws JsonProcessingException {
        int i = 0;
        while (true) {
            try {
                cr.getMetadata().setResourceVersion(((AbstractFlinkResource) this.client.resource(cr).lockResourceVersion().updateStatus()).getMetadata().getResourceVersion());
                return;
            } catch (KubernetesClientException e) {
                if (e.getCode() != 409) {
                    throw e;
                }
                String resourceVersion = cr.getMetadata().getResourceVersion();
                LOG.debug("Could not apply status update for resource version {}", resourceVersion);
                AbstractFlinkResource abstractFlinkResource = (AbstractFlinkResource) this.client.resource(cr).get();
                String resourceVersion2 = abstractFlinkResource.getMetadata().getResourceVersion();
                if (resourceVersion2.equals(resourceVersion)) {
                    LOG.error("Unable to fetch latest resource version");
                    throw e;
                }
                if (!((CommonStatus) abstractFlinkResource.getStatus()).equals(status)) {
                    throw new StatusConflictException("Status have been modified externally in version " + resourceVersion2 + " Previous: " + this.objectMapper.writeValueAsString(status) + " Latest: " + this.objectMapper.writeValueAsString(abstractFlinkResource.getStatus()));
                }
                int i2 = i;
                i++;
                if (i2 >= 3) {
                    throw e;
                }
                LOG.debug("Retrying status update for latest version {}", resourceVersion2);
                cr.getMetadata().setResourceVersion(resourceVersion2);
            }
        }
    }

    public void updateStatusFromCache(CR cr) {
        ResourceID fromResource = ResourceID.fromResource(cr);
        ObjectNode objectNode = this.statusCache.get(fromResource);
        if (objectNode != null) {
            cr.setStatus((CommonStatus) this.objectMapper.convertValue(objectNode, ((CommonStatus) cr.getStatus()).getClass()));
        } else {
            this.statusCache.put(fromResource, (ObjectNode) this.objectMapper.convertValue(cr.getStatus(), ObjectNode.class));
            if (ResourceLifecycleState.CREATED.equals(((CommonStatus) cr.getStatus()).getLifecycleState())) {
                this.statusUpdateListener.accept(cr, (CommonStatus) cr.getStatus());
            }
        }
        this.metricManager.onUpdate(cr);
    }

    public void removeCachedStatus(CR cr) {
        this.statusCache.remove(ResourceID.fromResource(cr));
        this.metricManager.onRemove(cr);
    }

    public static <S extends CommonStatus<?>, CR extends AbstractFlinkResource<?, S>> StatusRecorder<CR, S> create(KubernetesClient kubernetesClient, MetricManager<CR> metricManager, Collection<FlinkResourceListener> collection) {
        return new StatusRecorder<>(kubernetesClient, metricManager, (abstractFlinkResource, commonStatus) -> {
            final Instant now = Instant.now();
            ?? r0 = new FlinkResourceListener.StatusUpdateContext() { // from class: org.apache.flink.kubernetes.operator.utils.StatusRecorder.1
                /* JADX WARN: Incorrect return type in method signature: ()TS; */
                /* JADX WARN: Unknown type variable: S in type: S */
                public CommonStatus getPreviousStatus() {
                    return commonStatus;
                }

                /* JADX WARN: Unknown type variable: S in type: org.apache.flink.kubernetes.operator.api.AbstractFlinkResource<?, S> */
                public AbstractFlinkResource<?, S> getFlinkResource() {
                    return abstractFlinkResource;
                }

                public KubernetesClient getKubernetesClient() {
                    return kubernetesClient;
                }

                public Instant getTimestamp() {
                    return now;
                }
            };
            collection.forEach(flinkResourceListener -> {
                if (abstractFlinkResource instanceof FlinkDeployment) {
                    flinkResourceListener.onDeploymentStatusUpdate(r0);
                } else {
                    flinkResourceListener.onSessionJobStatusUpdate(r0);
                }
            });
            AuditUtils.logContext((FlinkResourceListener.StatusUpdateContext) r0);
        });
    }
}
