package org.apache.beam.sdk.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/OffsetBasedSource.class */
public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
    private final long startOffset;
    private final long endOffset;
    private final long minBundleSize;

    /* loaded from: input_file:org/apache/beam/sdk/io/OffsetBasedSource$OffsetBasedReader.class */
    public static abstract class OffsetBasedReader<T> extends BoundedSource.BoundedReader<T> {
        private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class);
        private OffsetBasedSource<T> source;
        private final OffsetRangeTracker rangeTracker;

        public final boolean isDone() {
            return this.rangeTracker.isDone();
        }

        public final boolean isStarted() {
            return this.rangeTracker.isStarted();
        }

        public OffsetBasedReader(OffsetBasedSource<T> offsetBasedSource) {
            this.source = offsetBasedSource;
            this.rangeTracker = new OffsetRangeTracker(offsetBasedSource.getStartOffset(), offsetBasedSource.getEndOffset());
        }

        protected abstract long getCurrentOffset() throws NoSuchElementException;

        protected boolean isAtSplitPoint() throws NoSuchElementException {
            return true;
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public final boolean start() throws IOException {
            return (startImpl() && this.rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset())) || this.rangeTracker.markDone();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public final boolean advance() throws IOException {
            return (advanceImpl() && this.rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset())) || this.rangeTracker.markDone();
        }

        protected abstract boolean startImpl() throws IOException;

        protected abstract boolean advanceImpl() throws IOException;

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
        public synchronized OffsetBasedSource<T> getCurrentSource() {
            return this.source;
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public Double getFractionConsumed() {
            return Double.valueOf(this.rangeTracker.getFractionConsumed());
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public long getSplitPointsConsumed() {
            return this.rangeTracker.getSplitPointsProcessed();
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public long getSplitPointsRemaining() {
            if (isDone()) {
                return 0L;
            }
            if (!isStarted()) {
                return -1L;
            }
            if (allowsDynamicSplitting() && getCurrentOffset() < this.rangeTracker.getStopPosition().longValue() - 1) {
                return super.getSplitPointsRemaining();
            }
            return 1L;
        }

        public boolean allowsDynamicSplitting() {
            return true;
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public final synchronized OffsetBasedSource<T> splitAtFraction(double d) {
            if (!allowsDynamicSplitting()) {
                return null;
            }
            if (this.rangeTracker.getStopPosition().longValue() == OffsetRangeTracker.OFFSET_INFINITY) {
                LOG.debug("Refusing to split unbounded OffsetBasedReader {} at fraction {}", this.rangeTracker, Double.valueOf(d));
                return null;
            }
            long positionForFractionConsumed = this.rangeTracker.getPositionForFractionConsumed(d);
            LOG.debug("Proposing to split OffsetBasedReader {} at fraction {} (offset {})", new Object[]{this.rangeTracker, Double.valueOf(d), Long.valueOf(positionForFractionConsumed)});
            long startOffset = this.source.getStartOffset();
            long endOffset = this.source.getEndOffset();
            OffsetBasedSource<T> createSourceForSubrange = this.source.createSourceForSubrange(startOffset, positionForFractionConsumed);
            OffsetBasedSource<T> createSourceForSubrange2 = this.source.createSourceForSubrange(positionForFractionConsumed, endOffset);
            if (!this.rangeTracker.trySplitAtPosition(positionForFractionConsumed)) {
                return null;
            }
            this.source = createSourceForSubrange;
            return createSourceForSubrange2;
        }
    }

    public OffsetBasedSource(long j, long j2, long j3) {
        this.startOffset = j;
        this.endOffset = j2;
        this.minBundleSize = j3;
    }

    public long getStartOffset() {
        return this.startOffset;
    }

    public long getEndOffset() {
        return this.endOffset;
    }

    public long getMinBundleSize() {
        return this.minBundleSize;
    }

    @Override // org.apache.beam.sdk.io.BoundedSource
    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
        return getBytesPerOffset() * ((this.endOffset == OffsetRangeTracker.OFFSET_INFINITY ? getMaxEndOffset(pipelineOptions) : this.endOffset) - getStartOffset());
    }

    @Override // org.apache.beam.sdk.io.BoundedSource
    public List<? extends OffsetBasedSource<T>> split(long j, PipelineOptions pipelineOptions) throws Exception {
        long max = Math.max(Math.max(1L, j / getBytesPerOffset()), this.minBundleSize);
        ArrayList arrayList = new ArrayList();
        for (OffsetRange offsetRange : new OffsetRange(this.startOffset, Math.min(this.endOffset, getMaxEndOffset(pipelineOptions))).split(max, this.minBundleSize)) {
            arrayList.add(createSourceForSubrange(offsetRange.getFrom(), offsetRange.getTo()));
        }
        return arrayList;
    }

    @Override // org.apache.beam.sdk.io.Source
    public void validate() {
        Preconditions.checkArgument(this.startOffset >= 0, "Start offset has value %s, must be non-negative", this.startOffset);
        Preconditions.checkArgument(this.endOffset >= 0, "End offset has value %s, must be non-negative", this.endOffset);
        Preconditions.checkArgument(this.startOffset <= this.endOffset, "Start offset %s may not be larger than end offset %s", this.startOffset, this.endOffset);
        Preconditions.checkArgument(this.minBundleSize >= 0, "minBundleSize has value %s, must be non-negative", this.minBundleSize);
    }

    public String toString() {
        return "[" + this.startOffset + ", " + this.endOffset + ")";
    }

    public long getBytesPerOffset() {
        return 1L;
    }

    public abstract long getMaxEndOffset(PipelineOptions pipelineOptions) throws Exception;

    public abstract OffsetBasedSource<T> createSourceForSubrange(long j, long j2);

    @Override // org.apache.beam.sdk.io.Source, org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.addIfNotDefault(DisplayData.item("minBundleSize", Long.valueOf(this.minBundleSize)).withLabel("Minimum Bundle Size"), 1L).addIfNotDefault(DisplayData.item("startOffset", Long.valueOf(this.startOffset)).withLabel("Start Read Offset"), 0L).addIfNotDefault(DisplayData.item("endOffset", Long.valueOf(this.endOffset)).withLabel("End Read Offset"), Long.valueOf(OffsetRangeTracker.OFFSET_INFINITY));
    }
}
