package dev.responsive.kafka.clients;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/clients/ResponsiveConsumer.class */
public class ResponsiveConsumer<K, V> extends DelegatingConsumer<K, V> {
    private final List<Listener> listeners;
    private final Logger log;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dev/responsive/kafka/clients/ResponsiveConsumer$Listener.class */
    public interface Listener {
        default void onPartitionsRevoked(Collection<TopicPartition> collection) {
        }

        default void onPartitionsAssigned(Collection<TopicPartition> collection) {
        }

        default void onPartitionsLost(Collection<TopicPartition> collection) {
        }

        default void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        }

        default void onClose() {
        }
    }

    /* loaded from: input_file:dev/responsive/kafka/clients/ResponsiveConsumer$RebalanceListener.class */
    private static class RebalanceListener implements ConsumerRebalanceListener {
        private final ConsumerRebalanceListener wrappedRebalanceListener;
        private final List<Listener> listeners;
        private final Logger log;

        public RebalanceListener(ConsumerRebalanceListener consumerRebalanceListener, List<Listener> list, Logger logger) {
            this.wrappedRebalanceListener = consumerRebalanceListener;
            this.listeners = list;
            this.log = logger;
        }

        public void onPartitionsLost(Collection<TopicPartition> collection) {
            for (Listener listener : this.listeners) {
                ResponsiveConsumer.ignoreException(this.log, () -> {
                    listener.onPartitionsLost(collection);
                });
            }
            this.wrappedRebalanceListener.onPartitionsLost(collection);
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            for (Listener listener : this.listeners) {
                ResponsiveConsumer.ignoreException(this.log, () -> {
                    listener.onPartitionsRevoked(collection);
                });
            }
            this.wrappedRebalanceListener.onPartitionsRevoked(collection);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            for (Listener listener : this.listeners) {
                ResponsiveConsumer.ignoreException(this.log, () -> {
                    listener.onPartitionsAssigned(collection);
                });
            }
            this.wrappedRebalanceListener.onPartitionsAssigned(collection);
        }
    }

    public ResponsiveConsumer(String str, Consumer<K, V> consumer, List<Listener> list) {
        super(consumer);
        this.log = new LogContext(String.format("responsive-consumer [%s]", Objects.requireNonNull(str))).logger(ResponsiveConsumer.class);
        this.listeners = (List) Objects.requireNonNull(list);
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void subscribe(Collection<String> collection) {
        throw new IllegalStateException("Unexpected call to subscribe(Collection) on main consumer without a rebalance listener");
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        super.subscribe(collection, new RebalanceListener(consumerRebalanceListener, this.listeners, this.log));
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        super.subscribe(pattern, new RebalanceListener(consumerRebalanceListener, this.listeners, this.log));
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void subscribe(Pattern pattern) {
        throw new IllegalStateException("Unexpected call to subscribe(Pattern) on main consumer without a rebalance listener");
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void unsubscribe() {
        super.unsubscribe();
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void close() {
        super.close();
        this.listeners.forEach(listener -> {
            Objects.requireNonNull(listener);
            ignoreException(listener::onClose);
        });
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void close(Duration duration) {
        super.close(duration);
        this.listeners.forEach(listener -> {
            Objects.requireNonNull(listener);
            ignoreException(listener::onClose);
        });
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void commitSync() {
        throw new UnsupportedOperationException("ResponsiveConsumer only supports commit with offsets");
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void commitSync(Duration duration) {
        throw new UnsupportedOperationException("ResponsiveConsumer only supports commit with offsets");
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        super.commitSync(map);
        this.listeners.forEach(listener -> {
            listener.onCommit(map);
        });
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
        super.commitSync(map, duration);
        this.listeners.forEach(listener -> {
            listener.onCommit(map);
        });
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void commitAsync() {
        throw new UnsupportedOperationException("ResponsiveConsumer only supports commit with offsets");
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
        throw new UnsupportedOperationException("ResponsiveConsumer only supports commit with offsets");
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        throw new UnsupportedOperationException("ResponsiveConsumer only supports commitSync");
    }

    private void ignoreException(Runnable runnable) {
        ignoreException(this.log, runnable);
    }

    private static void ignoreException(Logger logger, Runnable runnable) {
        try {
            runnable.run();
        } catch (Throwable th) {
            logger.error("error calling rebalance listener", th);
        }
    }
}
