package dev.responsive.internal.clients;

import dev.responsive.internal.clients.ResponsiveConsumer;
import dev.responsive.internal.clients.ResponsiveProducer;
import dev.responsive.internal.metrics.EndOffsetsPoller;
import dev.responsive.internal.metrics.MetricPublishingCommitListener;
import dev.responsive.internal.stores.ResponsiveStoreRegistry;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/internal/clients/ResponsiveKafkaClientSupplier.class */
public final class ResponsiveKafkaClientSupplier implements KafkaClientSupplier {
    private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaClientSupplier.class);
    private final SharedListeners sharedListeners;
    private final KafkaClientSupplier wrapped;
    private final ResponsiveStoreRegistry storeRegistry;
    private final Factories factories;
    private final Metrics metrics;
    private final EndOffsetsPoller endOffsetsPoller;
    private final String applicationId;
    private final boolean eos;

    /* loaded from: input_file:dev/responsive/internal/clients/ResponsiveKafkaClientSupplier$CloseListener.class */
    class CloseListener implements ResponsiveConsumer.Listener, ResponsiveProducer.Listener {
        private final String threadId;

        private CloseListener(String str) {
            this.threadId = str;
        }

        @Override // dev.responsive.internal.clients.ResponsiveConsumer.Listener, dev.responsive.internal.clients.ResponsiveProducer.Listener
        public void onClose() {
            ResponsiveKafkaClientSupplier.this.sharedListeners.derefListenersForThread(this.threadId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dev/responsive/internal/clients/ResponsiveKafkaClientSupplier$Factories.class */
    public interface Factories {
        default EndOffsetsPoller createEndOffsetPoller(Map<String, ?> map, Metrics metrics) {
            return new EndOffsetsPoller(map, metrics);
        }

        default <K, V> ResponsiveProducer<K, V> createResponsiveProducer(String str, Producer<K, V> producer, List<ResponsiveProducer.Listener> list) {
            return new ResponsiveProducer<>(str, producer, list);
        }

        default <K, V> ResponsiveConsumer<K, V> createResponsiveConsumer(String str, Consumer<K, V> consumer, List<ResponsiveConsumer.Listener> list) {
            return new ResponsiveConsumer<>(str, consumer, list);
        }

        default OffsetRecorder createOffsetRecorder(boolean z) {
            return new OffsetRecorder(z);
        }

        default MetricPublishingCommitListener createMetricsPublishingCommitListener(Metrics metrics, String str, String str2, OffsetRecorder offsetRecorder) {
            return new MetricPublishingCommitListener(metrics, str, str2, offsetRecorder);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/internal/clients/ResponsiveKafkaClientSupplier$ListenersForThread.class */
    public static class ListenersForThread implements Closeable {
        final String threadId;
        final OffsetRecorder offsetRecorder;
        final MetricPublishingCommitListener committedOffsetMetricListener;
        final StoreCommitListener storeCommitListener;
        final EndOffsetsPoller.Listener endOffsetsPollerListener;

        public ListenersForThread(String str, OffsetRecorder offsetRecorder, MetricPublishingCommitListener metricPublishingCommitListener, StoreCommitListener storeCommitListener, EndOffsetsPoller.Listener listener) {
            this.threadId = str;
            this.offsetRecorder = offsetRecorder;
            this.committedOffsetMetricListener = metricPublishingCommitListener;
            this.storeCommitListener = storeCommitListener;
            this.endOffsetsPollerListener = listener;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.committedOffsetMetricListener.close();
            this.endOffsetsPollerListener.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/internal/clients/ResponsiveKafkaClientSupplier$ReferenceCounted.class */
    public static class ReferenceCounted<T extends Closeable> {
        final T val;
        final Logger refLog;
        int count = 1;

        private ReferenceCounted(String str, T t) {
            this.refLog = new LogContext(String.format("[%s]: ", str)).logger(ReferenceCounted.class);
            this.val = t;
        }

        private void ref() {
            this.refLog.info("Bumping ref count to {}", Integer.valueOf(this.count + 1));
            this.count++;
        }

        private boolean deref() {
            this.refLog.info("Reducing ref count to {}", Integer.valueOf(this.count - 1));
            this.count--;
            if (this.count != 0) {
                return false;
            }
            this.refLog.info("Closing reference value");
            try {
                this.val.close();
                return true;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private T getVal() {
            return this.val;
        }
    }

    /* loaded from: input_file:dev/responsive/internal/clients/ResponsiveKafkaClientSupplier$SharedListeners.class */
    private static class SharedListeners {
        private final Map<String, ReferenceCounted<ListenersForThread>> threadListeners = new HashMap();

        private SharedListeners() {
        }

        private synchronized ListenersForThread getAndMaybeInitListenersForThread(boolean z, String str, Metrics metrics, String str2, Map<String, Object> map, EndOffsetsPoller endOffsetsPoller, ResponsiveStoreRegistry responsiveStoreRegistry, Factories factories) {
            if (this.threadListeners.containsKey(str)) {
                ReferenceCounted<ListenersForThread> referenceCounted = this.threadListeners.get(str);
                referenceCounted.ref();
                return referenceCounted.getVal();
            }
            OffsetRecorder createOffsetRecorder = factories.createOffsetRecorder(z);
            ReferenceCounted<ListenersForThread> referenceCounted2 = new ReferenceCounted<>(String.format("ListenersForThread(%s)", str), new ListenersForThread(str, createOffsetRecorder, factories.createMetricsPublishingCommitListener(metrics, str, str2, createOffsetRecorder), new StoreCommitListener(responsiveStoreRegistry, createOffsetRecorder), endOffsetsPoller.addForThread(str)));
            this.threadListeners.put(str, referenceCounted2);
            return referenceCounted2.getVal();
        }

        private synchronized void derefListenersForThread(String str) {
            if (this.threadListeners.get(str).deref()) {
                this.threadListeners.remove(str);
            }
        }
    }

    public ResponsiveKafkaClientSupplier(KafkaClientSupplier kafkaClientSupplier, StreamsConfig streamsConfig, ResponsiveStoreRegistry responsiveStoreRegistry, Metrics metrics) {
        this(new Factories() { // from class: dev.responsive.internal.clients.ResponsiveKafkaClientSupplier.1
        }, kafkaClientSupplier, streamsConfig, responsiveStoreRegistry, metrics);
    }

    ResponsiveKafkaClientSupplier(Factories factories, KafkaClientSupplier kafkaClientSupplier, StreamsConfig streamsConfig, ResponsiveStoreRegistry responsiveStoreRegistry, Metrics metrics) {
        this.sharedListeners = new SharedListeners();
        this.factories = factories;
        this.wrapped = kafkaClientSupplier;
        this.storeRegistry = responsiveStoreRegistry;
        this.metrics = metrics;
        this.eos = !"at_least_once".equals(streamsConfig.getString("processing.guarantee"));
        this.endOffsetsPoller = factories.createEndOffsetPoller(streamsConfig.originals(), metrics);
        this.applicationId = streamsConfig.getString("application.id");
    }

    public Admin getAdmin(Map<String, Object> map) {
        return this.wrapped.getAdmin(map);
    }

    public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
        String str = (String) map.get("client.id");
        LOG.info("Creating responsive producer: {}", str);
        String threadIdFromProducerConfig = threadIdFromProducerConfig(str);
        return this.factories.createResponsiveProducer((String) map.get("client.id"), this.wrapped.getProducer(map), Collections.unmodifiableList(Arrays.asList(this.sharedListeners.getAndMaybeInitListenersForThread(this.eos, threadIdFromProducerConfig, this.metrics, this.applicationId, map, this.endOffsetsPoller, this.storeRegistry, this.factories).offsetRecorder.getProducerListener(), new CloseListener(threadIdFromProducerConfig))));
    }

    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
        String str = (String) map.get("client.id");
        LOG.info("Creating responsive main consumer: {}", str);
        String threadIdFromConsumerConfig = threadIdFromConsumerConfig(str);
        ListenersForThread andMaybeInitListenersForThread = this.sharedListeners.getAndMaybeInitListenersForThread(this.eos, threadIdFromConsumerConfig, this.metrics, this.applicationId, map, this.endOffsetsPoller, this.storeRegistry, this.factories);
        return this.factories.createResponsiveConsumer(str, this.wrapped.getConsumer(map), List.of(andMaybeInitListenersForThread.committedOffsetMetricListener, andMaybeInitListenersForThread.offsetRecorder.getConsumerListener(), andMaybeInitListenersForThread.endOffsetsPollerListener, new CloseListener(threadIdFromConsumerConfig)));
    }

    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> map) {
        String str = (String) map.get("client.id");
        LOG.info("Creating responsive restore consumer: {}", str);
        Consumer restoreConsumer = this.wrapped.getRestoreConsumer(map);
        ResponsiveStoreRegistry responsiveStoreRegistry = this.storeRegistry;
        Objects.requireNonNull(responsiveStoreRegistry);
        return new ResponsiveRestoreConsumer(str, restoreConsumer, responsiveStoreRegistry::getCommittedOffset);
    }

    public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> map) {
        LOG.info("Creating responsive global consumer");
        map.put("group.id", this.applicationId + "-global");
        map.put("auto.offset.reset", "earliest");
        map.put("enable.auto.commit", true);
        return new ResponsiveGlobalConsumer(map, this.wrapped.getGlobalConsumer(map), getAdmin(map));
    }

    private String threadIdFromProducerConfig(String str) {
        Matcher matcher = Pattern.compile(".*-(StreamThread-\\d+)-producer").matcher(str);
        if (matcher.find()) {
            return matcher.group(1);
        }
        LOG.error("Unable to parse thread id from producer client id = {}", str);
        throw new RuntimeException("unexpected client id " + str);
    }

    private String threadIdFromConsumerConfig(String str) {
        Matcher matcher = Pattern.compile(".*-(StreamThread-\\d+)-consumer$").matcher(str);
        if (matcher.find()) {
            return matcher.group(1);
        }
        LOG.error("Unable to parse thread id from consumer client id = {}", str);
        throw new RuntimeException("Unexpected client id " + str);
    }
}
