package org.apache.beam.sdk.io;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.repackaged.com.google.common.base.Ascii;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.repackaged.com.google.common.util.concurrent.Futures;
import org.apache.beam.sdk.repackaged.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.sdk.repackaged.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.sdk.repackaged.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource.class */
public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
    private static final float FRACTION_OF_FILES_TO_STAT = 0.01f;
    static final int MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT = 100;
    static final int THREAD_POOL_SIZE = 128;
    private final ValueProvider<String> fileOrPatternSpec;
    private final Mode mode;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.sdk.io.FileBasedSource$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$io$FileBasedSource$Mode = new int[Mode.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$sdk$io$FileBasedSource$Mode[Mode.FILEPATTERN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$io$FileBasedSource$Mode[Mode.SINGLE_FILE_OR_SUBRANGE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource$FileBasedReader.class */
    public static abstract class FileBasedReader<T> extends OffsetBasedSource.OffsetBasedReader<T> {
        private ReadableByteChannel channel;

        public FileBasedReader(FileBasedSource<T> fileBasedSource) {
            super(fileBasedSource);
            this.channel = null;
            Preconditions.checkArgument(fileBasedSource.getMode() != Mode.FILEPATTERN, "FileBasedReader does not support reading file patterns");
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader, org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
        public synchronized FileBasedSource<T> getCurrentSource() {
            return (FileBasedSource) super.getCurrentSource();
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected final boolean startImpl() throws IOException {
            FileBasedSource<T> currentSource = getCurrentSource();
            this.channel = IOChannelUtils.getFactory(currentSource.getFileOrPatternSpecProvider().get()).open(currentSource.getFileOrPatternSpecProvider().get());
            if (this.channel instanceof SeekableByteChannel) {
                ((SeekableByteChannel) this.channel).position(currentSource.getStartOffset());
            } else {
                Preconditions.checkArgument(((FileBasedSource) currentSource).mode != Mode.SINGLE_FILE_OR_SUBRANGE, "Subrange-based sources must only be defined for file types that support seekable  read channels");
                Preconditions.checkArgument(currentSource.getStartOffset() == 0, "Start offset %s is not zero but channel for reading the file is not seekable.", Long.valueOf(currentSource.getStartOffset()));
            }
            startReading(this.channel);
            return advanceImpl();
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected final boolean advanceImpl() throws IOException {
            return readNextRecord();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.channel != null) {
                this.channel.close();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract void startReading(ReadableByteChannel readableByteChannel) throws IOException;

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract boolean readNextRecord() throws IOException;
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource$FilePatternReader.class */
    private class FilePatternReader extends BoundedSource.BoundedReader<T> {
        private final FileBasedSource<T> source;
        private final List<FileBasedReader<T>> fileReaders;
        final ListIterator<FileBasedReader<T>> fileReadersIterator;
        FileBasedReader<T> currentReader = null;

        public FilePatternReader(FileBasedSource<T> fileBasedSource, List<FileBasedReader<T>> list) {
            this.source = fileBasedSource;
            this.fileReaders = list;
            this.fileReadersIterator = list.listIterator();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public boolean start() throws IOException {
            return startNextNonemptyReader();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public boolean advance() throws IOException {
            Preconditions.checkState(this.currentReader != null, "Call start() before advance()");
            if (this.currentReader.advance()) {
                return true;
            }
            return startNextNonemptyReader();
        }

        private boolean startNextNonemptyReader() throws IOException {
            while (this.fileReadersIterator.hasNext()) {
                this.currentReader = this.fileReadersIterator.next();
                if (this.currentReader.start()) {
                    return true;
                }
                this.currentReader.close();
            }
            return false;
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public T getCurrent() throws NoSuchElementException {
            return this.currentReader.getCurrent();
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.currentReader.getCurrentTimestamp();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
            }
            while (this.fileReadersIterator.hasNext()) {
                this.fileReadersIterator.next().close();
            }
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
        public FileBasedSource<T> getCurrentSource() {
            return this.source;
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public FileBasedSource<T> splitAtFraction(double d) {
            FileBasedSource.LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
            return null;
        }

        @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public Double getFractionConsumed() {
            int previousIndex;
            int size;
            if (this.currentReader == null) {
                return Double.valueOf(0.0d);
            }
            if (!this.fileReaders.isEmpty() && (previousIndex = this.fileReadersIterator.previousIndex()) != (size = this.fileReaders.size())) {
                double d = (1.0d * previousIndex) / size;
                double d2 = (1.0d * (previousIndex + 1)) / size;
                Double fractionConsumed = this.currentReader.getFractionConsumed();
                return fractionConsumed == null ? Double.valueOf(d) : Double.valueOf(d + (fractionConsumed.doubleValue() * (d2 - d)));
            }
            return Double.valueOf(1.0d);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSource$Mode.class */
    public enum Mode {
        FILEPATTERN,
        SINGLE_FILE_OR_SUBRANGE
    }

    public FileBasedSource(String str, long j) {
        this(ValueProvider.StaticValueProvider.of(str), j);
    }

    public FileBasedSource(ValueProvider<String> valueProvider, long j) {
        super(0L, OffsetRangeTracker.OFFSET_INFINITY, j);
        this.mode = Mode.FILEPATTERN;
        this.fileOrPatternSpec = valueProvider;
    }

    public FileBasedSource(String str, long j, long j2, long j3) {
        super(j2, j3, j);
        this.mode = Mode.SINGLE_FILE_OR_SUBRANGE;
        this.fileOrPatternSpec = ValueProvider.StaticValueProvider.of(str);
    }

    public final String getFileOrPatternSpec() {
        return this.fileOrPatternSpec.get();
    }

    public final ValueProvider<String> getFileOrPatternSpecProvider() {
        return this.fileOrPatternSpec;
    }

    public final Mode getMode() {
        return this.mode;
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource
    public final FileBasedSource<T> createSourceForSubrange(long j, long j2) {
        Preconditions.checkArgument(this.mode != Mode.FILEPATTERN, "Cannot split a file pattern based source based on positions");
        Preconditions.checkArgument(j >= getStartOffset(), "Start offset value %s of the subrange cannot be smaller than the start offset value %s of the parent source", Long.valueOf(j), Long.valueOf(getStartOffset()));
        Preconditions.checkArgument(j2 <= getEndOffset(), "End offset value %s of the subrange cannot be larger than the end offset value %s", Long.valueOf(j2), Long.valueOf(getEndOffset()));
        Preconditions.checkState(this.fileOrPatternSpec.isAccessible(), "Subrange creation should only happen at execution time.");
        FileBasedSource<T> createForSubrangeOfFile = createForSubrangeOfFile(this.fileOrPatternSpec.get(), j, j2);
        if (j > 0 || j2 != OffsetRangeTracker.OFFSET_INFINITY) {
            Preconditions.checkArgument(createForSubrangeOfFile.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "Source created for the range [%s,%s) must be a subrange source", Long.valueOf(j), Long.valueOf(j2));
        }
        return createForSubrangeOfFile;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract FileBasedSource<T> createForSubrangeOfFile(String str, long j, long j2);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract FileBasedReader<T> createSingleFileReader(PipelineOptions pipelineOptions);

    @Override // org.apache.beam.sdk.io.OffsetBasedSource, org.apache.beam.sdk.io.BoundedSource
    public final long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws IOException {
        long estimatedSizeOfFilesBySampling;
        if (this.mode != Mode.FILEPATTERN) {
            return Math.min(getEndOffset(), getMaxEndOffset(pipelineOptions)) - getStartOffset();
        }
        Preconditions.checkState(this.fileOrPatternSpec.isAccessible(), "Size estimation should be done at execution time.");
        IOChannelFactory factory = IOChannelUtils.getFactory(this.fileOrPatternSpec.get());
        long currentTimeMillis = System.currentTimeMillis();
        Collection<String> match = factory.match(this.fileOrPatternSpec.get());
        if (match.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
            estimatedSizeOfFilesBySampling = getExactTotalSizeOfFiles(match, factory);
            LOG.debug("Size estimation of all files of pattern {} took {} ms", this.fileOrPatternSpec, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } else {
            estimatedSizeOfFilesBySampling = getEstimatedSizeOfFilesBySampling(match, factory);
            LOG.debug("Size estimation of pattern {} by sampling took {} ms", this.fileOrPatternSpec, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
        return estimatedSizeOfFilesBySampling;
    }

    private static long getExactTotalSizeOfFiles(Collection<String> collection, IOChannelFactory iOChannelFactory) throws IOException {
        ArrayList arrayList = new ArrayList();
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
        try {
            try {
                long j = 0;
                Iterator<String> it = collection.iterator();
                while (it.hasNext()) {
                    arrayList.add(createFutureForSizeEstimation(it.next(), iOChannelFactory, listeningDecorator));
                }
                Iterator it2 = ((List) Futures.allAsList(arrayList).get()).iterator();
                while (it2.hasNext()) {
                    j += ((Long) it2.next()).longValue();
                }
                return j;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException(e);
            } catch (ExecutionException e2) {
                throw new IOException(e2.getCause());
            }
        } finally {
            listeningDecorator.shutdown();
        }
    }

    private static ListenableFuture<Long> createFutureForSizeEstimation(final String str, final IOChannelFactory iOChannelFactory, ListeningExecutorService listeningExecutorService) {
        return listeningExecutorService.submit((Callable) new Callable<Long>() { // from class: org.apache.beam.sdk.io.FileBasedSource.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws IOException {
                return Long.valueOf(IOChannelFactory.this.getSizeBytes(str));
            }
        });
    }

    private static long getEstimatedSizeOfFilesBySampling(Collection<String> collection, IOChannelFactory iOChannelFactory) throws IOException {
        int max = Math.max(MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT, (int) (FRACTION_OF_FILES_TO_STAT * collection.size()));
        ArrayList arrayList = new ArrayList(collection);
        Collections.shuffle(arrayList);
        return (collection.size() * getExactTotalSizeOfFiles(arrayList.subList(0, max), iOChannelFactory)) / r0.size();
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource, org.apache.beam.sdk.io.Source, org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("filePattern", getFileOrPatternSpecProvider().isAccessible() ? getFileOrPatternSpecProvider().get() : getFileOrPatternSpecProvider().toString()).withLabel("File Pattern"));
    }

    private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(final String str, final long j, final PipelineOptions pipelineOptions, ListeningExecutorService listeningExecutorService) {
        return listeningExecutorService.submit((Callable) new Callable<List<? extends FileBasedSource<T>>>() { // from class: org.apache.beam.sdk.io.FileBasedSource.2
            @Override // java.util.concurrent.Callable
            public List<? extends FileBasedSource<T>> call() throws Exception {
                return FileBasedSource.this.createForSubrangeOfFile(str, 0L, OffsetRangeTracker.OFFSET_INFINITY).splitIntoBundles(j, pipelineOptions);
            }
        });
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource, org.apache.beam.sdk.io.BoundedSource
    public final List<? extends FileBasedSource<T>> splitIntoBundles(long j, PipelineOptions pipelineOptions) throws Exception {
        if (this.mode != Mode.FILEPATTERN) {
            if (!isSplittable()) {
                LOG.debug("The source for file {} is not split into sub-range based sources since the file is not seekable", this.fileOrPatternSpec);
                return ImmutableList.of(this);
            }
            ArrayList arrayList = new ArrayList();
            Iterator<? extends OffsetBasedSource<T>> it = super.splitIntoBundles(j, pipelineOptions).iterator();
            while (it.hasNext()) {
                arrayList.add((FileBasedSource) it.next());
            }
            return arrayList;
        }
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList2 = new ArrayList();
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
        try {
            Preconditions.checkState(this.fileOrPatternSpec.isAccessible(), "Bundle splitting should only happen at execution time.");
            Iterator<String> it2 = expandFilePattern(this.fileOrPatternSpec.get()).iterator();
            while (it2.hasNext()) {
                arrayList2.add(createFutureForFileSplit(it2.next(), j, pipelineOptions, listeningDecorator));
            }
            ImmutableList copyOf = ImmutableList.copyOf(Iterables.concat((Iterable) Futures.allAsList(arrayList2).get()));
            LOG.debug("Splitting the source based on file pattern {} took {} ms", this.fileOrPatternSpec, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            listeningDecorator.shutdown();
            return copyOf;
        } catch (Throwable th) {
            listeningDecorator.shutdown();
            throw th;
        }
    }

    protected boolean isSplittable() throws Exception {
        Preconditions.checkState(this.fileOrPatternSpec.isAccessible(), "isSplittable should only be called at runtime.");
        return IOChannelUtils.getFactory(this.fileOrPatternSpec.get()).isReadSeekEfficient(this.fileOrPatternSpec.get());
    }

    @Override // org.apache.beam.sdk.io.BoundedSource
    public final BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) throws IOException {
        long j;
        validate();
        if (this.mode != Mode.FILEPATTERN) {
            return createSingleFileReader(pipelineOptions);
        }
        long currentTimeMillis = System.currentTimeMillis();
        Collection<String> expandFilePattern = expandFilePattern(this.fileOrPatternSpec.get());
        ArrayList arrayList = new ArrayList();
        for (String str : expandFilePattern) {
            try {
                j = IOChannelUtils.getFactory(str).getSizeBytes(str);
            } catch (IOException e) {
                LOG.warn("Failed to get size of {}", str, e);
                j = Long.MAX_VALUE;
            }
            arrayList.add(createForSubrangeOfFile(str, 0L, j).createSingleFileReader(pipelineOptions));
        }
        LOG.debug("Creating a reader for file pattern {} took {} ms", this.fileOrPatternSpec, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return arrayList.size() == 1 ? (BoundedSource.BoundedReader) arrayList.get(0) : new FilePatternReader(this, arrayList);
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource
    public String toString() {
        String obj = this.fileOrPatternSpec.isAccessible() ? this.fileOrPatternSpec.get() : this.fileOrPatternSpec.toString();
        switch (AnonymousClass3.$SwitchMap$org$apache$beam$sdk$io$FileBasedSource$Mode[this.mode.ordinal()]) {
            case Ascii.SOH /* 1 */:
                return obj;
            case 2:
                return obj + " range " + super.toString();
            default:
                throw new IllegalStateException("Unexpected mode: " + this.mode);
        }
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource, org.apache.beam.sdk.io.Source
    public void validate() {
        super.validate();
        switch (AnonymousClass3.$SwitchMap$org$apache$beam$sdk$io$FileBasedSource$Mode[this.mode.ordinal()]) {
            case Ascii.SOH /* 1 */:
                Preconditions.checkArgument(getStartOffset() == 0, "FileBasedSource is based on a file pattern or a full single file but the starting offset proposed %s is not zero", Long.valueOf(getStartOffset()));
                Preconditions.checkArgument(getEndOffset() == OffsetRangeTracker.OFFSET_INFINITY, "FileBasedSource is based on a file pattern or a full single file but the ending offset proposed %s is not Long.MAX_VALUE", Long.valueOf(getEndOffset()));
                return;
            case 2:
                return;
            default:
                throw new IllegalStateException("Unknown mode: " + this.mode);
        }
    }

    @Override // org.apache.beam.sdk.io.OffsetBasedSource
    public final long getMaxEndOffset(PipelineOptions pipelineOptions) throws IOException {
        Preconditions.checkArgument(this.mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
        return getEndOffset() == OffsetRangeTracker.OFFSET_INFINITY ? IOChannelUtils.getFactory(this.fileOrPatternSpec.get()).getSizeBytes(this.fileOrPatternSpec.get()) : getEndOffset();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static final Collection<String> expandFilePattern(String str) throws IOException {
        Collection<String> match = IOChannelUtils.getFactory(str).match(str);
        LOG.info("Matched {} files for pattern {}", Integer.valueOf(match.size()), str);
        return match;
    }
}
