package org.apache.beam.sdk.io;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Verify;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource.class */
public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
    private final ValueProvider<String> fileOrPatternSpec;
    private final EmptyMatchTreatment emptyMatchTreatment;

    @Nullable
    private MatchResult.Metadata singleFileMetadata;
    private final Mode mode;

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource$FileBasedReader.class */
    public static abstract class FileBasedReader<T> extends OffsetBasedSource.OffsetBasedReader<T> {

        @Nullable
        private ReadableByteChannel channel;

        public FileBasedReader(FileBasedSource<T> fileBasedSource) {
            super(fileBasedSource);
            this.channel = null;
            Preconditions.checkArgument(fileBasedSource.getMode() != Mode.FILEPATTERN, "FileBasedReader does not support reading file patterns");
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader, org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
        public synchronized FileBasedSource<T> getCurrentSource() {
            return (FileBasedSource) super.getCurrentSource();
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected final boolean startImpl() throws IOException {
            FileBasedSource<T> currentSource = getCurrentSource();
            this.channel = FileSystems.open(currentSource.getSingleFileMetadata().resourceId());
            if (this.channel instanceof SeekableByteChannel) {
                ((SeekableByteChannel) this.channel).position(currentSource.getStartOffset());
            } else {
                Preconditions.checkArgument(((FileBasedSource) currentSource).mode != Mode.SINGLE_FILE_OR_SUBRANGE, "Subrange-based sources must only be defined for file types that support seekable  read channels");
                Preconditions.checkArgument(currentSource.getStartOffset() == 0, "Start offset %s is not zero but channel for reading the file is not seekable.", currentSource.getStartOffset());
            }
            startReading(this.channel);
            return advanceImpl();
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected final boolean advanceImpl() throws IOException {
            return readNextRecord();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.channel != null) {
                this.channel.close();
            }
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
        public boolean allowsDynamicSplitting() {
            try {
                return getCurrentSource().isSplittable();
            } catch (Exception e) {
                throw new RuntimeException(String.format("Error determining if %s allows dynamic splitting", this), e);
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract void startReading(ReadableByteChannel readableByteChannel) throws IOException;

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract boolean readNextRecord() throws IOException;
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource$FilePatternReader.class */
    private class FilePatternReader extends BoundedSource.BoundedReader<T> {
        private final FileBasedSource<T> source;
        private final List<FileBasedReader<T>> fileReaders;
        final ListIterator<FileBasedReader<T>> fileReadersIterator;

        @Nullable
        FileBasedReader<T> currentReader = null;

        public FilePatternReader(FileBasedSource<T> fileBasedSource, List<FileBasedReader<T>> list) {
            this.source = fileBasedSource;
            this.fileReaders = list;
            this.fileReadersIterator = list.listIterator();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public boolean start() throws IOException {
            return startNextNonemptyReader();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public boolean advance() throws IOException {
            Preconditions.checkState(this.currentReader != null, "Call start() before advance()");
            if (this.currentReader.advance()) {
                return true;
            }
            return startNextNonemptyReader();
        }

        private boolean startNextNonemptyReader() throws IOException {
            while (this.fileReadersIterator.hasNext()) {
                this.currentReader = this.fileReadersIterator.next();
                if (this.currentReader.start()) {
                    return true;
                }
                this.currentReader.close();
            }
            return false;
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public T getCurrent() throws NoSuchElementException {
            return this.currentReader.getCurrent();
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.currentReader.getCurrentTimestamp();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
            }
            while (this.fileReadersIterator.hasNext()) {
                this.fileReadersIterator.next().close();
            }
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
        public FileBasedSource<T> getCurrentSource() {
            return this.source;
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public FileBasedSource<T> splitAtFraction(double d) {
            FileBasedSource.LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
            return null;
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public Double getFractionConsumed() {
            int previousIndex;
            int size;
            if (this.currentReader == null) {
                return Double.valueOf(0.0d);
            }
            if (!this.fileReaders.isEmpty() && (previousIndex = this.fileReadersIterator.previousIndex()) != (size = this.fileReaders.size())) {
                double d = (1.0d * previousIndex) / size;
                double d2 = (1.0d * (previousIndex + 1)) / size;
                Double fractionConsumed = this.currentReader.getFractionConsumed();
                return fractionConsumed == null ? Double.valueOf(d) : Double.valueOf(d + (fractionConsumed.doubleValue() * (d2 - d)));
            }
            return Double.valueOf(1.0d);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource$Mode.class */
    public enum Mode {
        FILEPATTERN,
        SINGLE_FILE_OR_SUBRANGE
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileBasedSource(ValueProvider<String> valueProvider, EmptyMatchTreatment emptyMatchTreatment, long j) {
        super(0L, OffsetRangeTracker.OFFSET_INFINITY, j);
        this.mode = Mode.FILEPATTERN;
        this.emptyMatchTreatment = emptyMatchTreatment;
        this.fileOrPatternSpec = valueProvider;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileBasedSource(ValueProvider<String> valueProvider, long j) {
        this(valueProvider, EmptyMatchTreatment.DISALLOW, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileBasedSource(MatchResult.Metadata metadata, long j, long j2, long j3) {
        super(j2, j3, j);
        this.mode = Mode.SINGLE_FILE_OR_SUBRANGE;
        this.singleFileMetadata = (MatchResult.Metadata) Preconditions.checkNotNull(metadata, "fileMetadata");
        this.fileOrPatternSpec = ValueProvider.StaticValueProvider.of(metadata.resourceId().toString());
        this.emptyMatchTreatment = EmptyMatchTreatment.DISALLOW;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final MatchResult.Metadata getSingleFileMetadata() {
        Preconditions.checkArgument(this.mode == Mode.SINGLE_FILE_OR_SUBRANGE, "This function should only be called for a single file, not %s", this);
        Preconditions.checkState(this.singleFileMetadata != null, "It should not be possible to construct a %s in mode %s with null metadata: %s", FileBasedSource.class, this.mode, this);
        return this.singleFileMetadata;
    }

    public final String getFileOrPatternSpec() {
        return this.fileOrPatternSpec.get();
    }

    public final ValueProvider<String> getFileOrPatternSpecProvider() {
        return this.fileOrPatternSpec;
    }

    public final EmptyMatchTreatment getEmptyMatchTreatment() {
        return this.emptyMatchTreatment;
    }

    public final Mode getMode() {
        return this.mode;
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource
    public final FileBasedSource<T> createSourceForSubrange(long j, long j2) {
        Preconditions.checkArgument(this.mode != Mode.FILEPATTERN, "Cannot split a file pattern based source based on positions");
        Preconditions.checkArgument(j >= getStartOffset(), "Start offset value %s of the subrange cannot be smaller than the start offset value %s of the parent source", j, getStartOffset());
        Preconditions.checkArgument(j2 <= getEndOffset(), "End offset value %s of the subrange cannot be larger than the end offset value %s", j2, getEndOffset());
        Preconditions.checkState(this.singleFileMetadata != null, "A single file source should not have null metadata: %s", this);
        FileBasedSource<T> createForSubrangeOfFile = createForSubrangeOfFile(this.singleFileMetadata, j, j2);
        if (j > 0 || j2 != OffsetRangeTracker.OFFSET_INFINITY) {
            Preconditions.checkArgument(createForSubrangeOfFile.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "Source created for the range [%s,%s) must be a subrange source", j, j2);
        }
        return createForSubrangeOfFile;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract FileBasedSource<T> createForSubrangeOfFile(MatchResult.Metadata metadata, long j, long j2);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract FileBasedReader<T> createSingleFileReader(PipelineOptions pipelineOptions);

    @Override // org.apache.beam.sdk.io.OffsetBasedSource, org.apache.beam.sdk.io.BoundedSource
    public final long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws IOException {
        String str = this.fileOrPatternSpec.get();
        if (this.mode != Mode.FILEPATTERN) {
            return Math.min(getEndOffset(), getMaxEndOffset(pipelineOptions)) - getStartOffset();
        }
        long j = 0;
        List<MatchResult.Metadata> metadata = FileSystems.match(str, this.emptyMatchTreatment).metadata();
        Iterator<MatchResult.Metadata> it = metadata.iterator();
        while (it.hasNext()) {
            j += it.next().sizeBytes();
        }
        LOG.info("Filepattern {} matched {} files with total size {}", new Object[]{str, Integer.valueOf(metadata.size()), Long.valueOf(j)});
        return j;
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource, org.apache.beam.sdk.io.Source, org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        if (this.mode == Mode.FILEPATTERN) {
            builder.add(DisplayData.item("filePattern", getFileOrPatternSpecProvider()).withLabel("File Pattern"));
        }
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource, org.apache.beam.sdk.io.BoundedSource
    public final List<? extends FileBasedSource<T>> split(long j, PipelineOptions pipelineOptions) throws Exception {
        String str = this.fileOrPatternSpec.get();
        if (this.mode != Mode.FILEPATTERN) {
            if (isSplittable()) {
                return super.split(j, pipelineOptions);
            }
            LOG.debug("The source for file {} is not split into sub-range based sources since the file is not seekable", str);
            return ImmutableList.of(this);
        }
        long currentTimeMillis = System.currentTimeMillis();
        List<MatchResult.Metadata> metadata = FileSystems.match(str, this.emptyMatchTreatment).metadata();
        ArrayList arrayList = new ArrayList(metadata.size());
        for (MatchResult.Metadata metadata2 : metadata) {
            FileBasedSource<T> createForSubrangeOfFile = createForSubrangeOfFile(metadata2, 0L, metadata2.sizeBytes());
            Verify.verify(createForSubrangeOfFile.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "%s.createForSubrangeOfFile must return a source in mode %s", new Object[]{createForSubrangeOfFile, Mode.SINGLE_FILE_OR_SUBRANGE});
            arrayList.addAll(createForSubrangeOfFile.split(j, pipelineOptions));
        }
        LOG.info("Splitting filepattern {} into bundles of size {} took {} ms and produced {} files and {} bundles", new Object[]{str, Long.valueOf(j), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(metadata.size()), Integer.valueOf(arrayList.size())});
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isSplittable() throws Exception {
        if (this.mode == Mode.FILEPATTERN) {
            return true;
        }
        return getSingleFileMetadata().isReadSeekEfficient();
    }

    @Override // org.apache.beam.sdk.io.BoundedSource
    public final BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) throws IOException {
        validate();
        String str = this.fileOrPatternSpec.get();
        if (this.mode != Mode.FILEPATTERN) {
            return createSingleFileReader(pipelineOptions);
        }
        long currentTimeMillis = System.currentTimeMillis();
        List<MatchResult.Metadata> metadata = FileSystems.match(str, this.emptyMatchTreatment).metadata();
        LOG.info("Matched {} files for pattern {}", Integer.valueOf(metadata.size()), str);
        ArrayList arrayList = new ArrayList();
        for (MatchResult.Metadata metadata2 : metadata) {
            arrayList.add(createForSubrangeOfFile(metadata2, 0L, metadata2.sizeBytes()).createSingleFileReader(pipelineOptions));
        }
        LOG.debug("Creating a reader for file pattern {} took {} ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return arrayList.size() == 1 ? (BoundedSource.BoundedReader) arrayList.get(0) : new FilePatternReader(this, arrayList);
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource
    public String toString() {
        switch (this.mode) {
            case FILEPATTERN:
                return this.fileOrPatternSpec.toString();
            case SINGLE_FILE_OR_SUBRANGE:
                return this.fileOrPatternSpec + " range " + super.toString();
            default:
                throw new IllegalStateException("Unexpected mode: " + this.mode);
        }
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource, org.apache.beam.sdk.io.Source
    public void validate() {
        super.validate();
        switch (this.mode) {
            case FILEPATTERN:
                Preconditions.checkArgument(getStartOffset() == 0, "FileBasedSource is based on a file pattern or a full single file but the starting offset proposed %s is not zero", getStartOffset());
                Preconditions.checkArgument(getEndOffset() == OffsetRangeTracker.OFFSET_INFINITY, "FileBasedSource is based on a file pattern or a full single file but the ending offset proposed %s is not Long.MAX_VALUE", getEndOffset());
                return;
            case SINGLE_FILE_OR_SUBRANGE:
                return;
            default:
                throw new IllegalStateException("Unknown mode: " + this.mode);
        }
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource
    public final long getMaxEndOffset(PipelineOptions pipelineOptions) throws IOException {
        Preconditions.checkArgument(this.mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
        return getSingleFileMetadata().sizeBytes();
    }
}
