package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.class */
public class UnboundedSourceImpl extends UnboundedSource<SequencedMessage, CheckpointMarkImpl> {
    private final SubscriberOptions subscriberOptions;
    private final SubscriberFactory subscriberFactory;
    private final BacklogReaderFactory readerFactory;
    private final Optional<Partition> partition;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl$BacklogReaderFactory.class */
    public interface BacklogReaderFactory extends Serializable {
        TopicBacklogReader create(SubscriptionPartition subscriptionPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl$SubscriberFactory.class */
    public interface SubscriberFactory extends Serializable {
        MemoryBufferedSubscriber create(SubscriptionPartition subscriptionPartition, Offset offset);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnboundedSourceImpl(SubscriberOptions subscriberOptions, SubscriberFactory subscriberFactory, BacklogReaderFactory backlogReaderFactory) {
        this.subscriberOptions = subscriberOptions;
        this.subscriberFactory = subscriberFactory;
        this.readerFactory = backlogReaderFactory;
        this.partition = Optional.absent();
    }

    private UnboundedSourceImpl(SubscriberOptions subscriberOptions, SubscriberFactory subscriberFactory, BacklogReaderFactory backlogReaderFactory, Partition partition) {
        this.subscriberOptions = subscriberOptions;
        this.subscriberFactory = subscriberFactory;
        this.readerFactory = backlogReaderFactory;
        this.partition = Optional.of(partition);
    }

    public List<? extends UnboundedSource<SequencedMessage, CheckpointMarkImpl>> split(int i, PipelineOptions pipelineOptions) throws Exception {
        Preconditions.checkState(!this.partition.isPresent());
        return (List) IntStream.range(0, PartitionLookupUtils.numPartitions(this.subscriberOptions.subscriptionPath())).mapToObj(i2 -> {
            return new UnboundedSourceImpl(this.subscriberOptions, this.subscriberFactory, this.readerFactory, Partition.of(i2));
        }).collect(Collectors.toList());
    }

    public UnboundedSource.UnboundedReader<SequencedMessage> createReader(PipelineOptions pipelineOptions, CheckpointMarkImpl checkpointMarkImpl) throws IOException {
        Preconditions.checkState(this.partition.isPresent());
        SubscriberAssembler subscriberAssembler = new SubscriberAssembler(this.subscriberOptions, (Partition) this.partition.get());
        Offset read = checkpointMarkImpl == null ? subscriberAssembler.getInitialOffsetReader().read() : checkpointMarkImpl.offset;
        SubscriptionPartition of = SubscriptionPartition.of(this.subscriberOptions.subscriptionPath(), (Partition) this.partition.get());
        return new UnboundedReaderImpl(this, this.subscriberFactory.create(of, read), this.readerFactory.create(of), CloserReference.of(subscriberAssembler.newCommitter()), read);
    }

    public Coder<CheckpointMarkImpl> getCheckpointMarkCoder() {
        return CheckpointMarkImpl.coder();
    }

    public Coder<SequencedMessage> getOutputCoder() {
        return ProtoCoder.of(SequencedMessage.class);
    }
}
