package org.apache.flink.kubernetes.operator.utils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.listener.AuditUtils;
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/utils/StatusRecorder.class */
public class StatusRecorder<CR extends AbstractFlinkResource<?, STATUS>, STATUS extends CommonStatus<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(StatusRecorder.class);
    protected final ObjectMapper objectMapper = new ObjectMapper();
    protected final ConcurrentHashMap<Tuple2<String, String>, ObjectNode> statusCache = new ConcurrentHashMap<>();
    private final KubernetesClient client;
    private final MetricManager<CR> metricManager;
    private final BiConsumer<CR, STATUS> statusUpdateListener;

    public StatusRecorder(KubernetesClient kubernetesClient, MetricManager<CR> metricManager, BiConsumer<CR, STATUS> biConsumer) {
        this.client = kubernetesClient;
        this.statusUpdateListener = biConsumer;
        this.metricManager = metricManager;
    }

    public void patchAndCacheStatus(CR cr) {
        Class<?> cls = cr.getClass();
        String namespace = cr.getMetadata().getNamespace();
        String name = cr.getMetadata().getName();
        cr.getMetadata().setResourceVersion((String) null);
        ObjectNode objectNode = (ObjectNode) this.objectMapper.convertValue(cr.getStatus(), ObjectNode.class);
        ObjectNode put = this.statusCache.put(getKey(cr), objectNode);
        if (objectNode.equals(put)) {
            LOG.debug("No status change.");
            return;
        }
        CommonStatus commonStatus = (CommonStatus) this.objectMapper.convertValue(put, cr instanceof FlinkDeployment ? FlinkDeploymentStatus.class : FlinkSessionJobStatus.class);
        Exception exc = null;
        for (int i = 0; i < 3; i++) {
            try {
                ((Resource) ((NonNamespaceOperation) this.client.resources(cls).inNamespace(namespace)).withName(name)).patchStatus(cr);
                this.statusUpdateListener.accept(cr, commonStatus);
                this.metricManager.onUpdate(cr);
                return;
            } catch (Exception e) {
                LOG.error("Error while patching status, retrying {}/3...", Integer.valueOf(i + 1), e);
                Thread.sleep(1000L);
                exc = e;
            }
        }
        throw exc;
    }

    public void updateStatusFromCache(CR cr) {
        Tuple2<String, String> key = getKey(cr);
        ObjectNode objectNode = this.statusCache.get(key);
        if (objectNode != null) {
            cr.setStatus((CommonStatus) this.objectMapper.convertValue(objectNode, ((CommonStatus) cr.getStatus()).getClass()));
        } else {
            this.statusCache.put(key, (ObjectNode) this.objectMapper.convertValue(cr.getStatus(), ObjectNode.class));
            if (ResourceLifecycleState.CREATED.equals(((CommonStatus) cr.getStatus()).getLifecycleState())) {
                this.statusUpdateListener.accept(cr, (CommonStatus) cr.getStatus());
            }
        }
        this.metricManager.onUpdate(cr);
    }

    public void removeCachedStatus(CR cr) {
        this.statusCache.remove(getKey(cr));
        this.metricManager.onRemove(cr);
    }

    protected static Tuple2<String, String> getKey(HasMetadata hasMetadata) {
        return Tuple2.of(hasMetadata.getMetadata().getNamespace(), hasMetadata.getMetadata().getName());
    }

    public static <S extends CommonStatus<?>, CR extends AbstractFlinkResource<?, S>> StatusRecorder<CR, S> create(KubernetesClient kubernetesClient, MetricManager<CR> metricManager, Collection<FlinkResourceListener> collection) {
        return new StatusRecorder<>(kubernetesClient, metricManager, (abstractFlinkResource, commonStatus) -> {
            final Instant now = Instant.now();
            ?? r0 = new FlinkResourceListener.StatusUpdateContext() { // from class: org.apache.flink.kubernetes.operator.utils.StatusRecorder.1
                /* JADX WARN: Incorrect return type in method signature: ()TS; */
                /* JADX WARN: Unknown type variable: S in type: S */
                @Override // org.apache.flink.kubernetes.operator.listener.FlinkResourceListener.StatusUpdateContext
                public CommonStatus getPreviousStatus() {
                    return CommonStatus.this;
                }

                /* JADX WARN: Unknown type variable: S in type: org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource<?, S> */
                @Override // org.apache.flink.kubernetes.operator.listener.FlinkResourceListener.ResourceContext
                public AbstractFlinkResource<?, S> getFlinkResource() {
                    return abstractFlinkResource;
                }

                @Override // org.apache.flink.kubernetes.operator.listener.FlinkResourceListener.ResourceContext
                public KubernetesClient getKubernetesClient() {
                    return kubernetesClient;
                }

                @Override // org.apache.flink.kubernetes.operator.listener.FlinkResourceListener.ResourceContext
                public Instant getTimestamp() {
                    return now;
                }
            };
            collection.forEach(flinkResourceListener -> {
                if (abstractFlinkResource instanceof FlinkDeployment) {
                    flinkResourceListener.onDeploymentStatusUpdate(r0);
                } else {
                    flinkResourceListener.onSessionJobStatusUpdate(r0);
                }
            });
            AuditUtils.logContext((FlinkResourceListener.StatusUpdateContext) r0);
        });
    }
}
