package dev.responsive.kafka.clients;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/clients/ResponsiveRestoreConsumer.class */
public class ResponsiveRestoreConsumer<K, V> extends DelegatingConsumer<K, V> {
    private final Logger log;
    private final Function<TopicPartition, OptionalLong> startOffsets;
    private final Set<TopicPartition> uninitializedOffsets;

    public ResponsiveRestoreConsumer(String str, Consumer<K, V> consumer, Function<TopicPartition, OptionalLong> function) {
        super(consumer);
        this.uninitializedOffsets = new HashSet();
        this.startOffsets = (Function) Objects.requireNonNull(function);
        this.log = new LogContext(String.format("responsive-restore-consumer [%s]", Objects.requireNonNull(str))).logger(ResponsiveConsumer.class);
    }

    private Set<TopicPartition> initializeOffsets(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : collection) {
            OptionalLong apply = this.startOffsets.apply(topicPartition);
            if (apply.isPresent()) {
                super.seek(topicPartition, apply.getAsLong());
            } else {
                hashSet.add(topicPartition);
            }
        }
        this.uninitializedOffsets.removeAll(collection);
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void assign(Collection<TopicPartition> collection) {
        this.uninitializedOffsets.addAll((Collection) collection.stream().filter(topicPartition -> {
            return !super.assignment().contains(topicPartition);
        }).collect(Collectors.toSet()));
        this.uninitializedOffsets.retainAll(collection);
        super.assign(collection);
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void unsubscribe() {
        this.uninitializedOffsets.clear();
        super.unsubscribe();
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public ConsumerRecords<K, V> poll(Duration duration) {
        if (this.uninitializedOffsets.isEmpty()) {
            return super.poll(duration);
        }
        this.log.error("Found uninitialized changelog partitions during poll: {}", this.uninitializedOffsets);
        throw new IllegalStateException("Restore consumer invoked poll without initializing offsets");
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void seek(TopicPartition topicPartition, long j) {
        super.seek(topicPartition, Math.max(j, this.startOffsets.apply(topicPartition).orElse(j)));
        this.uninitializedOffsets.remove(topicPartition);
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        super.seek(topicPartition, new OffsetAndMetadata(Math.max(offsetAndMetadata.offset(), this.startOffsets.apply(topicPartition).orElse(offsetAndMetadata.offset())), offsetAndMetadata.leaderEpoch(), offsetAndMetadata.metadata()));
        this.uninitializedOffsets.remove(topicPartition);
    }

    @Override // dev.responsive.kafka.clients.DelegatingConsumer
    public void seekToBeginning(Collection<TopicPartition> collection) {
        Set<TopicPartition> initializeOffsets = initializeOffsets(collection);
        if (initializeOffsets.size() > 0) {
            super.seekToBeginning(initializeOffsets);
            this.uninitializedOffsets.removeAll(initializeOffsets);
        }
    }
}
