package com.google.cloud.pubsublite.kafka;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.protobuf.util.Timestamps;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.class */
public class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
    private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private final SubscriptionPath subscriptionPath;
    private final TopicPath topicPath;
    private final SharedBehavior shared;
    private final ConsumerFactory consumerFactory;
    private final AssignerFactory assignerFactory;
    private final CursorClient cursorClient;
    private final TopicStatsClient topicStatsClient;
    private final List<AutoCloseable> toClose;
    private Optional<Assigner> assigner = Optional.empty();
    private Optional<SingleSubscriptionConsumer> consumer = Optional.empty();

    /* loaded from: input_file:com/google/cloud/pubsublite/kafka/PubsubLiteConsumer$NoOpRebalanceListener.class */
    private static class NoOpRebalanceListener implements ConsumerRebalanceListener {
        private NoOpRebalanceListener() {
        }

        @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        }

        @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PubsubLiteConsumer(SubscriptionPath subscriptionPath, TopicPath topicPath, SharedBehavior sharedBehavior, ConsumerFactory consumerFactory, AssignerFactory assignerFactory, CursorClient cursorClient, TopicStatsClient topicStatsClient, AutoCloseable... autoCloseableArr) {
        this.subscriptionPath = subscriptionPath;
        this.topicPath = topicPath;
        this.shared = sharedBehavior;
        this.consumerFactory = consumerFactory;
        this.assignerFactory = assignerFactory;
        this.cursorClient = cursorClient;
        this.topicStatsClient = topicStatsClient;
        this.toClose = ImmutableList.builder().add((Object[]) autoCloseableArr).add((ImmutableList.Builder) cursorClient).add((ImmutableList.Builder) topicStatsClient).add((ImmutableList.Builder) sharedBehavior).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TopicPartition toTopicPartition(Partition partition) {
        return new TopicPartition(this.topicPath.toString(), (int) partition.value());
    }

    private SingleSubscriptionConsumer requireValidConsumer() {
        if (this.consumer.isPresent()) {
            return this.consumer.get();
        }
        throw new IllegalStateException("Neither subscribe nor assign has been called.");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Uuid clientInstanceId(Duration duration) {
        throw new IllegalStateException("Pub/Sub Lite Kafka Connector does not support telemetry");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> assignment() {
        return (Set) requireValidConsumer().assignment().stream().map(this::toTopicPartition).collect(Collectors.toSet());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<String> subscription() {
        return this.consumer.isPresent() ? ImmutableSet.of(this.topicPath.toString()) : ImmutableSet.of();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Pattern pattern) {
        subscribe(pattern, new NoOpRebalanceListener());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        throw new UnsupportedOperationException("Pattern assignment is not available for Pub/Sub Lite.");
    }

    private void checkTopic(String str) {
        try {
            if (TopicPath.parse(str).equals(this.topicPath)) {
            } else {
                throw new UnsupportedOperationException("Pub/Sub Lite consumers may only interact with the one topic they are configured for.");
            }
        } catch (ApiException e) {
            throw KafkaExceptionUtils.toKafka(e);
        }
    }

    private Partition checkTopicGetPartition(TopicPartition topicPartition) {
        checkTopic(topicPartition.topic());
        try {
            return Partition.of(topicPartition.partition());
        } catch (ApiException e) {
            throw KafkaExceptionUtils.toKafka(e);
        }
    }

    private PartitionAssignmentReceiver newAssignmentReceiver(ConsumerRebalanceListener consumerRebalanceListener) {
        AtomicReference atomicReference = new AtomicReference(ImmutableSet.of());
        return set -> {
            Set set = (Set) atomicReference.get();
            HashSet hashSet = new HashSet(set);
            hashSet.removeAll(set);
            HashSet hashSet2 = new HashSet(set);
            hashSet2.removeAll(set);
            if (!hashSet.isEmpty()) {
                consumerRebalanceListener.onPartitionsLost((Collection) hashSet.stream().map(this::toTopicPartition).collect(Collectors.toSet()));
            }
            if (!hashSet2.isEmpty()) {
                consumerRebalanceListener.onPartitionsAssigned((Collection) hashSet2.stream().map(this::toTopicPartition).collect(Collectors.toSet()));
            }
            this.consumer.get().setAssignment(set);
            atomicReference.set(set);
        };
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection) {
        subscribe(collection, new NoOpRebalanceListener());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        if (collection.size() != 1) {
            throw new UnsupportedOperationException("Subscribing to multiple topics is not available for Pub/Sub Lite.");
        }
        checkTopic(collection.iterator().next());
        if (this.consumer.isPresent()) {
            if (!this.assigner.isPresent()) {
                throw new IllegalStateException("Called subscribe after calling assign.");
            }
            return;
        }
        this.consumer = Optional.of(this.consumerFactory.newConsumer());
        try {
            this.assigner = Optional.of(this.assignerFactory.New(newAssignmentReceiver(consumerRebalanceListener)));
            this.assigner.get().startAsync().awaitRunning();
        } catch (ApiException e) {
            throw KafkaExceptionUtils.toKafka(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void assign(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            unsubscribe();
        } else {
            if (this.assigner.isPresent()) {
                throw new IllegalStateException("Called assign after calling subscribe.");
            }
            Set<Partition> set = (Set) collection.stream().map(this::checkTopicGetPartition).collect(Collectors.toSet());
            if (!this.consumer.isPresent()) {
                this.consumer = Optional.of(this.consumerFactory.newConsumer());
            }
            this.consumer.get().setAssignment(set);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void unsubscribe() {
        this.assigner.ifPresent(assigner -> {
            assigner.stopAsync().awaitTerminated();
        });
        this.assigner = Optional.empty();
        this.consumer.ifPresent(singleSubscriptionConsumer -> {
            singleSubscriptionConsumer.close(INFINITE_DURATION);
        });
        this.consumer = Optional.empty();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerRecords<byte[], byte[]> poll(long j) {
        return poll(Duration.ofMillis(j));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerRecords<byte[], byte[]> poll(Duration duration) {
        return requireValidConsumer().poll(duration);
    }

    Map<Partition, Offset> checkAndTransformOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        try {
            map.forEach((topicPartition, offsetAndMetadata) -> {
                builder.put(checkTopicGetPartition(topicPartition), Offset.of(offsetAndMetadata.offset()));
            });
            return builder.build();
        } catch (Throwable th) {
            throw KafkaExceptionUtils.toKafka(th);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        commitSync(map, INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
        try {
            requireValidConsumer().commit(checkAndTransformOffsets(map)).get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            throw KafkaExceptionUtils.toKafka(th);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> map, final OffsetCommitCallback offsetCommitCallback) {
        ApiFutures.addCallback(requireValidConsumer().commit(checkAndTransformOffsets(map)), new ApiFutureCallback<Void>() { // from class: com.google.cloud.pubsublite.kafka.PubsubLiteConsumer.1
            @Override // com.google.api.core.ApiFutureCallback
            public void onFailure(Throwable th) {
                offsetCommitCallback.onComplete(null, KafkaExceptionUtils.toKafka(th));
            }

            @Override // com.google.api.core.ApiFutureCallback
            public void onSuccess(Void r5) {
                offsetCommitCallback.onComplete(map, null);
            }
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync() {
        commitSync(INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Duration duration) {
        try {
            requireValidConsumer().commitAll().get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            throw KafkaExceptionUtils.toKafka(th);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(final OffsetCommitCallback offsetCommitCallback) {
        ApiFutures.addCallback(requireValidConsumer().commitAll(), new ApiFutureCallback<Map<Partition, Offset>>() { // from class: com.google.cloud.pubsublite.kafka.PubsubLiteConsumer.2
            @Override // com.google.api.core.ApiFutureCallback
            public void onFailure(Throwable th) {
                offsetCommitCallback.onComplete(null, KafkaExceptionUtils.toKafka(th));
            }

            @Override // com.google.api.core.ApiFutureCallback
            public void onSuccess(Map<Partition, Offset> map) {
                ImmutableMap.Builder builder = ImmutableMap.builder();
                map.forEach((partition, offset) -> {
                    builder.put(PubsubLiteConsumer.this.toTopicPartition(partition), new OffsetAndMetadata(offset.value()));
                });
                offsetCommitCallback.onComplete(builder.build(), null);
            }
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync() {
        commitAsync((map, exc) -> {
            if (exc != null) {
                ((GoogleLogger.Api) logger.atWarning().withCause(exc)).log("Failed to commit offsets.");
            }
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seek(TopicPartition topicPartition, long j) {
        requireValidConsumer().doSeek(checkTopicGetPartition(topicPartition), SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(j)).build());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        seek(topicPartition, offsetAndMetadata.offset());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToBeginning(Collection<TopicPartition> collection) {
        collection.forEach(topicPartition -> {
            requireValidConsumer().doSeek(checkTopicGetPartition(topicPartition), SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(0L).build()).build());
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToEnd(Collection<TopicPartition> collection) {
        collection.forEach(topicPartition -> {
            requireValidConsumer().doSeek(checkTopicGetPartition(topicPartition), SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.HEAD).build());
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public long position(TopicPartition topicPartition) {
        return position(topicPartition, INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public long position(TopicPartition topicPartition, Duration duration) {
        Optional<Long> position = requireValidConsumer().position(checkTopicGetPartition(topicPartition));
        return position.isPresent() ? position.get().longValue() : committed(topicPartition, duration).offset();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public OffsetAndMetadata committed(TopicPartition topicPartition) {
        return committed(topicPartition, INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
        return committed(ImmutableSet.of(topicPartition), duration).get(topicPartition);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
        return committed(set, INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set, Duration duration) {
        Set set2 = (Set) set.stream().map(this::checkTopicGetPartition).collect(Collectors.toSet());
        try {
            Map<Partition, Offset> map = this.cursorClient.listPartitionCursors(this.subscriptionPath).get(duration.toMillis(), TimeUnit.MILLISECONDS);
            ImmutableMap.Builder builder = ImmutableMap.builder();
            set2.forEach(partition -> {
                builder.put(toTopicPartition(partition), new OffsetAndMetadata(((Offset) map.getOrDefault(partition, Offset.of(0L))).value()));
            });
            return builder.build();
        } catch (Throwable th) {
            throw KafkaExceptionUtils.toKafka(th);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<MetricName, ? extends Metric> metrics() {
        return ImmutableMap.of();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public List<PartitionInfo> partitionsFor(String str) {
        return partitionsFor(str, INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public List<PartitionInfo> partitionsFor(String str, Duration duration) {
        checkTopic(str);
        return this.shared.partitionsFor(this.topicPath, duration);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<String, List<PartitionInfo>> listTopics() {
        return listTopics(INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
        return ImmutableMap.of(this.topicPath.toString(), partitionsFor(this.topicPath.toString(), duration));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
        return offsetsForTimes(map, INFINITE_DURATION);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
        try {
            Map map2 = (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return this.topicStatsClient.computeCursorForEventTime(this.topicPath, checkTopicGetPartition((TopicPartition) entry.getKey()), Timestamps.fromMillis(((Long) entry.getValue()).longValue()));
            }));
            ApiFutures.allAsList(map2.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);
            HashMap hashMap = new HashMap();
            for (Map.Entry entry2 : map2.entrySet()) {
                OffsetAndTimestamp offsetAndTimestamp = null;
                Optional optional = (Optional) ((ApiFuture) entry2.getValue()).get();
                if (optional.isPresent()) {
                    offsetAndTimestamp = new OffsetAndTimestamp(((Cursor) optional.get()).getOffset(), ((Long) Preconditions.checkNotNull(map.get(entry2.getKey()))).longValue());
                }
                hashMap.put(entry2.getKey(), offsetAndTimestamp);
            }
            return hashMap;
        } catch (Throwable th) {
            throw KafkaExceptionUtils.toKafka(th);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
        return beginningOffsets(collection, INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        collection.forEach(topicPartition -> {
            checkTopic(topicPartition.topic());
            builder.put(topicPartition, 0L);
        });
        return builder.build();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
        return endOffsets(collection, INFINITE_DURATION);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
        try {
            Map map = (Map) collection.stream().collect(Collectors.toMap(topicPartition -> {
                return topicPartition;
            }, topicPartition2 -> {
                return this.topicStatsClient.computeHeadCursor(this.topicPath, checkTopicGetPartition(topicPartition2));
            }));
            ApiFutures.allAsList(map.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (Map.Entry entry : map.entrySet()) {
                builder.put(entry.getKey(), Long.valueOf(((Cursor) ((ApiFuture) entry.getValue()).get()).getOffset()));
            }
            return builder.build();
        } catch (Throwable th) {
            throw KafkaExceptionUtils.toKafka(th);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(INFINITE_DURATION);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void close(Duration duration) {
        unsubscribe();
        for (AutoCloseable autoCloseable : this.toClose) {
            try {
                autoCloseable.close();
            } catch (Exception e) {
                ((GoogleLogger.Api) logger.atSevere().withCause(e)).log("Error closing %s during Consumer shutdown.", autoCloseable.getClass().getSimpleName());
            }
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerGroupMetadata groupMetadata() {
        return new ConsumerGroupMetadata(this.subscriptionPath.toString());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> paused() {
        return ImmutableSet.of();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void pause(Collection<TopicPartition> collection) {
        logger.atWarning().log("Calling pause on a Pub/Sub Lite Consumer is a no-op. Configure the amount of outstanding bytes and messages instead.");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void resume(Collection<TopicPartition> collection) {
        logger.atWarning().log("Calling resume on a Pub/Sub Lite Consumer is a no-op. Configure the amount of outstanding bytes and messages instead.");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void enforceRebalance() {
        logger.atWarning().log("Calling enforceRebalance on a Pub/Sub Lite Consumer is a no-op.");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void enforceRebalance(String str) {
        logger.atWarning().log("Calling enforceRebalance on a Pub/Sub Lite Consumer is a no-op.");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public OptionalLong currentLag(TopicPartition topicPartition) {
        logger.atWarning().log("Calling currentLag on a Pub/Sub Lite Consumer always returns empty.");
        return OptionalLong.empty();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void wakeup() {
        requireValidConsumer().wakeup();
    }
}
