package org.apache.beam.sdk.io;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Iterator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/* loaded from: input_file:org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.class */
public abstract class ReadAllViaFileBasedSourceTransform<InT, T> extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
    public static final boolean DEFAULT_USES_RESHUFFLE = true;
    protected final long desiredBundleSizeBytes;
    protected final SerializableFunction<String, ? extends FileBasedSource<InT>> createSource;
    protected final Coder<T> coder;
    protected final ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler;
    protected final boolean usesReshuffle;

    /* loaded from: input_file:org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform$AbstractReadFileRangesFn.class */
    public static abstract class AbstractReadFileRangesFn<InT, T> extends DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> {
        private final SerializableFunction<String, ? extends FileBasedSource<InT>> createSource;
        private final ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler;

        public AbstractReadFileRangesFn(SerializableFunction<String, ? extends FileBasedSource<InT>> serializableFunction, ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler readFileRangesFnExceptionHandler) {
            this.createSource = serializableFunction;
            this.exceptionHandler = readFileRangesFnExceptionHandler;
        }

        protected abstract T makeOutput(FileIO.ReadableFile readableFile, OffsetRange offsetRange, FileBasedSource<InT> fileBasedSource, BoundedSource.BoundedReader<InT> boundedReader);

        /* JADX WARN: Finally extract failed */
        @DoFn.ProcessElement
        @SuppressFBWarnings(value = {"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"}, justification = "https://github.com/spotbugs/spotbugs/issues/756")
        public void process(DoFn<KV<FileIO.ReadableFile, OffsetRange>, T>.ProcessContext processContext) throws IOException {
            FileIO.ReadableFile key = processContext.element().getKey();
            OffsetRange value = processContext.element().getValue();
            CompressedSource<T> withCompression = CompressedSource.from(this.createSource.apply(key.getMetadata().resourceId().toString())).withCompression(key.getCompression());
            try {
                BoundedSource.BoundedReader<InT> createReader = withCompression.createForSubrangeOfFile(key.getMetadata(), value.getFrom(), value.getTo()).createReader(processContext.getPipelineOptions());
                Throwable th = null;
                try {
                    for (boolean start = createReader.start(); start; start = createReader.advance()) {
                        processContext.output(makeOutput(key, value, withCompression, createReader));
                    }
                    if (createReader != null) {
                        if (0 != 0) {
                            try {
                                createReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createReader.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (createReader != null) {
                        if (0 != 0) {
                            try {
                                createReader.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createReader.close();
                        }
                    }
                    throw th3;
                }
            } catch (RuntimeException e) {
                if (this.exceptionHandler.apply(key, value, e)) {
                    throw e;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform$SplitIntoRangesFn.class */
    public static class SplitIntoRangesFn extends DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>> {
        private final long desiredBundleSizeBytes;

        public SplitIntoRangesFn(long j) {
            this.desiredBundleSizeBytes = j;
        }

        @DoFn.ProcessElement
        public void process(DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>>.ProcessContext processContext) {
            MatchResult.Metadata metadata = processContext.element().getMetadata();
            if (!metadata.isReadSeekEfficient()) {
                processContext.output(KV.of(processContext.element(), new OffsetRange(0L, metadata.sizeBytes())));
                return;
            }
            Iterator<OffsetRange> it = new OffsetRange(0L, metadata.sizeBytes()).split(this.desiredBundleSizeBytes, 0L).iterator();
            while (it.hasNext()) {
                processContext.output(KV.of(processContext.element(), it.next()));
            }
        }
    }

    public ReadAllViaFileBasedSourceTransform(long j, SerializableFunction<String, ? extends FileBasedSource<InT>> serializableFunction, Coder<T> coder) {
        this(j, serializableFunction, coder, true, new ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler());
    }

    public ReadAllViaFileBasedSourceTransform(long j, SerializableFunction<String, ? extends FileBasedSource<InT>> serializableFunction, Coder<T> coder, boolean z, ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler readFileRangesFnExceptionHandler) {
        this.desiredBundleSizeBytes = j;
        this.createSource = serializableFunction;
        this.coder = coder;
        this.usesReshuffle = z;
        this.exceptionHandler = readFileRangesFnExceptionHandler;
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    /* renamed from: expand, reason: avoid collision after fix types in other method and merged with bridge method [inline-methods] */
    public PCollection<T> mo321expand(PCollection<FileIO.ReadableFile> pCollection) {
        PCollection pCollection2 = (PCollection) pCollection.apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(this.desiredBundleSizeBytes)));
        if (this.usesReshuffle) {
            pCollection2 = (PCollection) pCollection2.apply("Reshuffle", Reshuffle.viaRandomKey());
        }
        return ((PCollection) pCollection2.apply("Read ranges", ParDo.of(readRangesFn()))).setCoder(this.coder);
    }

    protected abstract DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> readRangesFn();
}
