package com.google.pubsublite.kafka.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:com/google/pubsublite/kafka/source/PollerImpl.class */
class PollerImpl implements Poller {

    @VisibleForTesting
    static final Duration POLL_DURATION = Duration.ofSeconds(10);
    private final String kafkaTopic;
    private final Consumer<byte[], byte[]> consumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PollerImpl(String str, Consumer<byte[], byte[]> consumer) {
        this.kafkaTopic = str;
        this.consumer = consumer;
    }

    @Override // com.google.pubsublite.kafka.source.Poller
    @Nullable
    public List<SourceRecord> poll() {
        try {
            ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(POLL_DURATION);
            ImmutableList.Builder builder = ImmutableList.builder();
            poll.forEach(consumerRecord -> {
                ConnectHeaders connectHeaders = new ConnectHeaders();
                for (Header header : consumerRecord.headers()) {
                    connectHeaders.addBytes(header.key(), header.value());
                }
                builder.add((ImmutableList.Builder) new SourceRecord(ImmutableMap.of("topic", (Integer) consumerRecord.topic(), "partition", Integer.valueOf(consumerRecord.partition())), ImmutableMap.of("offset", Long.valueOf(consumerRecord.offset())), this.kafkaTopic, (Integer) null, Schema.OPTIONAL_BYTES_SCHEMA, consumerRecord.key() == null || ((byte[]) consumerRecord.key()).length == 0 ? null : consumerRecord.key(), Schema.BYTES_SCHEMA, consumerRecord.value(), Long.valueOf(consumerRecord.timestamp()), connectHeaders));
            });
            return builder.build();
        } catch (TimeoutException | WakeupException e) {
            return null;
        }
    }

    @Override // com.google.pubsublite.kafka.source.Poller, java.lang.AutoCloseable
    public void close() {
        this.consumer.wakeup();
        this.consumer.unsubscribe();
    }
}
