package io.micronaut.kubernetes.client.openapi.informer;

import io.micronaut.core.util.CollectionUtils;
import io.micronaut.kubernetes.client.openapi.common.KubernetesObject;
import io.micronaut.kubernetes.client.openapi.informer.DeltaFifo;
import io.micronaut.kubernetes.client.openapi.informer.ProcessorListener;
import io.micronaut.kubernetes.client.openapi.informer.cache.Indexer;
import java.util.AbstractMap;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/micronaut/kubernetes/client/openapi/informer/DeltaConsumer.class */
final class DeltaConsumer<ApiType extends KubernetesObject> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DeltaConsumer.class);
    private final AtomicBoolean active = new AtomicBoolean(false);
    private final DeltaFifo deltaQueue;
    private final Indexer<ApiType> indexer;
    private final SharedProcessor<ApiType> processor;
    private final TransformFunc transformFunc;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DeltaConsumer(DeltaFifo deltaFifo, Indexer<ApiType> indexer, SharedProcessor<ApiType> sharedProcessor, TransformFunc transformFunc) {
        this.deltaQueue = deltaFifo;
        this.indexer = indexer;
        this.processor = sharedProcessor;
        this.transformFunc = transformFunc;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.debug("Starting delta consumer");
        this.active.set(true);
        while (this.active.get()) {
            try {
                this.deltaQueue.pop(this::handleDeltas);
            } catch (InterruptedException e) {
                LOG.warn("Delta consumer thread has been interrupted", e);
                this.active.set(false);
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                LOG.error("Delta consumer recovered from crashing {}", e2.getMessage(), e2);
            }
        }
        LOG.debug("Stopping delta consumer");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.active.set(false);
    }

    void handleDeltas(Deque<AbstractMap.SimpleEntry<DeltaFifo.DeltaType, KubernetesObject>> deque) {
        if (CollectionUtils.isEmpty(deque)) {
            return;
        }
        deque.forEach(simpleEntry -> {
            DeltaFifo.DeltaType deltaType = (DeltaFifo.DeltaType) simpleEntry.getKey();
            KubernetesObject kubernetesObject = (KubernetesObject) simpleEntry.getValue();
            KubernetesObject transform = this.transformFunc == null ? kubernetesObject : this.transformFunc.transform(kubernetesObject);
            switch (deltaType) {
                case SYNC:
                case ADDED:
                case UPDATED:
                    boolean z = deltaType == DeltaFifo.DeltaType.SYNC;
                    ApiType byKey = this.indexer.getByKey((String) this.indexer.getKeyFunction().apply(transform));
                    if (byKey != null) {
                        this.indexer.update(transform);
                        this.processor.distribute(new ProcessorListener.UpdateNotification(byKey, kubernetesObject), z);
                        return;
                    } else {
                        this.indexer.add(transform);
                        this.processor.distribute(new ProcessorListener.AddNotification(kubernetesObject), z);
                        return;
                    }
                case DELETED:
                    this.indexer.delete(transform);
                    this.processor.distribute(new ProcessorListener.DeleteNotification(kubernetesObject), false);
                    return;
                default:
                    return;
            }
        });
    }
}
