package dev.responsive.kafka.internal.metrics;

import dev.responsive.kafka.internal.clients.ResponsiveConsumer;
import dev.responsive.kafka.internal.utils.Constants;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/metrics/EndOffsetsPoller.class */
public class EndOffsetsPoller {
    private static final Logger LOG = LoggerFactory.getLogger(EndOffsetsPoller.class);
    private static final int ADMIN_RECREATE_THRESHOLD = 3;
    private final Map<String, Listener> threadIdToMetrics;
    private final ResponsiveMetrics metrics;
    private final Map<String, Object> configs;
    private final Factories factories;
    private final ScheduledExecutorService executor;
    private Poller poller;

    @FunctionalInterface
    /* loaded from: input_file:dev/responsive/kafka/internal/metrics/EndOffsetsPoller$Factories.class */
    public interface Factories {
        Admin createAdminClient(Map<String, Object> map);
    }

    /* loaded from: input_file:dev/responsive/kafka/internal/metrics/EndOffsetsPoller$Listener.class */
    public static class Listener implements ResponsiveConsumer.Listener {
        private final String threadId;
        private final Map<TopicPartition, Long> endOffsets = new ConcurrentHashMap();
        private final Logger log;
        private final ResponsiveMetrics metrics;
        private final Consumer<String> onClose;

        private Listener(String str, ResponsiveMetrics responsiveMetrics, Consumer<String> consumer) {
            this.threadId = str;
            this.metrics = responsiveMetrics;
            this.onClose = consumer;
            this.log = LoggerFactory.getLogger(EndOffsetsPoller.class.getName() + "." + str);
        }

        @Override // dev.responsive.kafka.internal.clients.ResponsiveConsumer.Listener
        public void onPartitionsLost(Collection<TopicPartition> collection) {
            onPartitionsRevoked(collection);
        }

        @Override // dev.responsive.kafka.internal.clients.ResponsiveConsumer.Listener
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            for (TopicPartition topicPartition : collection) {
                this.metrics.removeMetric(endOffsetMetric(topicPartition));
                this.endOffsets.remove(topicPartition);
            }
        }

        @Override // dev.responsive.kafka.internal.clients.ResponsiveConsumer.Listener
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            for (TopicPartition topicPartition : collection) {
                this.endOffsets.put(topicPartition, -1L);
                this.metrics.addMetric(endOffsetMetric(topicPartition), (metricConfig, j) -> {
                    return this.endOffsets.getOrDefault(topicPartition, -1L);
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void update(Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> map) {
            for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry : map.entrySet()) {
                this.endOffsets.computeIfPresent(entry.getKey(), (topicPartition, l) -> {
                    return Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) entry.getValue()).offset());
                });
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Updated end offsets to [{}].", this.endOffsets.entrySet().stream().map(entry2 -> {
                    return "(" + entry2.getKey() + ": " + entry2.getValue() + ")";
                }).collect(Collectors.joining(", ")));
            }
        }

        public void close() {
            this.log.info("Cleaning up offset metrics");
            Iterator<TopicPartition> it = this.endOffsets.keySet().iterator();
            while (it.hasNext()) {
                this.metrics.removeMetric(endOffsetMetric(it.next()));
            }
            this.onClose.accept(this.threadId);
        }

        private MetricName endOffsetMetric(TopicPartition topicPartition) {
            return this.metrics.metricName(TopicMetrics.END_OFFSET, TopicMetrics.END_OFFSET_DESCRIPTION, this.metrics.topicLevelMetric(TopicMetrics.TOPIC_METRIC_GROUP, this.threadId, topicPartition));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/internal/metrics/EndOffsetsPoller$Poller.class */
    public static class Poller {
        private Admin adminClient;
        private int failuresWithoutReinit = 0;
        private final Supplier<Admin> adminSupplier;
        private final ScheduledFuture<?> future;
        private final ScheduledExecutorService executor;
        private final Supplier<Collection<Listener>> threadMetricsSupplier;

        private Poller(Supplier<Admin> supplier, ScheduledExecutorService scheduledExecutorService, Supplier<Collection<Listener>> supplier2) {
            this.adminSupplier = supplier;
            this.executor = scheduledExecutorService;
            this.threadMetricsSupplier = supplier2;
            init();
            this.future = scheduledExecutorService.scheduleAtFixedRate(this::pollEndOffsets, 0L, 30L, TimeUnit.SECONDS);
        }

        private void stop() {
            this.future.cancel(true);
            this.executor.schedule(this::close, 0L, TimeUnit.SECONDS);
        }

        private void init() {
            this.adminClient = this.adminSupplier.get();
            this.failuresWithoutReinit = 0;
        }

        private void close() {
            this.adminClient.close(Duration.ofNanos(0L));
        }

        private void maybeReinit() {
            if (this.failuresWithoutReinit >= 3) {
                EndOffsetsPoller.LOG.info("reinitializing admin client");
                close();
                init();
            }
        }

        private void doPollEndOffsets() {
            EndOffsetsPoller.LOG.info("Polling end offsets");
            maybeReinit();
            HashMap hashMap = new HashMap();
            Collection<Listener> collection = this.threadMetricsSupplier.get();
            Iterator<Listener> it = collection.iterator();
            while (it.hasNext()) {
                Iterator<TopicPartition> it2 = it.next().endOffsets.keySet().iterator();
                while (it2.hasNext()) {
                    hashMap.put(it2.next(), OffsetSpec.latest());
                }
            }
            try {
                Map map = (Map) this.adminClient.listOffsets(hashMap).all().get(10L, Constants.BLOCKING_TIMEOUT_UNIT);
                collection.forEach(listener -> {
                    listener.update(map);
                });
                this.failuresWithoutReinit = 0;
                EndOffsetsPoller.LOG.info("Finished updating end offsets");
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new RuntimeException(e);
            }
        }

        private void pollEndOffsets() {
            try {
                doPollEndOffsets();
            } catch (RuntimeException e) {
                EndOffsetsPoller.LOG.error("error polling end offsets. will retry at next poll interval", e);
                this.failuresWithoutReinit++;
            }
        }
    }

    public EndOffsetsPoller(Map<String, ?> map, ResponsiveMetrics responsiveMetrics, Factories factories) {
        this(map, responsiveMetrics, Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("responsive-end-offsets-poller");
            return thread;
        }), factories);
    }

    public EndOffsetsPoller(Map<String, ?> map, ResponsiveMetrics responsiveMetrics, ScheduledExecutorService scheduledExecutorService, Factories factories) {
        this.threadIdToMetrics = new HashMap();
        this.poller = null;
        this.configs = Map.copyOf((Map) Objects.requireNonNull(map));
        this.metrics = (ResponsiveMetrics) Objects.requireNonNull(responsiveMetrics);
        this.executor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService);
        this.factories = (Factories) Objects.requireNonNull(factories);
    }

    public synchronized Listener addForThread(String str) {
        LOG.debug("Adding end offset metrics for thread {}", str);
        Listener listener = new Listener(str, this.metrics, this::removeForThread);
        if (this.threadIdToMetrics.containsKey(str)) {
            String format = String.format("End offset poller already has metrics for %s", str);
            RuntimeException runtimeException = new RuntimeException(format);
            LOG.error(format, runtimeException);
            throw runtimeException;
        }
        if (this.threadIdToMetrics.isEmpty()) {
            initPoller();
        }
        this.threadIdToMetrics.put(str, listener);
        return listener;
    }

    private synchronized void removeForThread(String str) {
        LOG.debug("Removing end offset metrics for thread {}", str);
        if (this.threadIdToMetrics.remove(str) == null) {
            LOG.warn("No metrics found for thread {}", str);
        } else if (this.threadIdToMetrics.isEmpty()) {
            stopPoller();
        }
    }

    private synchronized Collection<Listener> getThreadMetrics() {
        return List.copyOf(this.threadIdToMetrics.values());
    }

    private void initPoller() {
        LOG.info("Initializing end offsets poller");
        if (this.poller != null) {
            throw new IllegalStateException("Poller was already initialized");
        }
        this.poller = new Poller(() -> {
            return this.factories.createAdminClient(this.configs);
        }, this.executor, this::getThreadMetrics);
    }

    private void stopPoller() {
        LOG.info("Stopping end offsets poller");
        try {
            this.poller.stop();
        } catch (RuntimeException e) {
            LOG.warn("Poller stop returned an unexpected error. It will be ignored, and the poller task + admin client might be leaked.", e);
        }
        this.poller = null;
    }
}
