package org.apache.beam.sdk.io;

import java.io.IOException;
import java.util.Iterator;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/ReadAllViaFileBasedSource.class */
public class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
    private final long desiredBundleSizeBytes;
    private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource;
    private final Coder<T> coder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/ReadAllViaFileBasedSource$ReadFileRangesFn.class */
    public static class ReadFileRangesFn<T> extends DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> {
        private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource;

        private ReadFileRangesFn(SerializableFunction<String, ? extends FileBasedSource<T>> serializableFunction) {
            this.createSource = serializableFunction;
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<FileIO.ReadableFile, OffsetRange>, T>.ProcessContext processContext) throws IOException {
            FileIO.ReadableFile key = processContext.element().getKey();
            OffsetRange value = processContext.element().getValue();
            BoundedSource.BoundedReader<T> createReader = CompressedSource.from(this.createSource.apply(key.getMetadata().resourceId().toString())).withCompression(key.getCompression()).createForSubrangeOfFile(key.getMetadata(), value.getFrom(), value.getTo()).createReader(processContext.getPipelineOptions());
            Throwable th = null;
            try {
                try {
                    for (boolean start = createReader.start(); start; start = createReader.advance()) {
                        processContext.output(createReader.getCurrent());
                    }
                    if (createReader != null) {
                        if (0 == 0) {
                            createReader.close();
                            return;
                        }
                        try {
                            createReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (createReader != null) {
                    if (th != null) {
                        try {
                            createReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createReader.close();
                    }
                }
                throw th4;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/ReadAllViaFileBasedSource$SplitIntoRangesFn.class */
    public static class SplitIntoRangesFn extends DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>> {
        private final long desiredBundleSizeBytes;

        private SplitIntoRangesFn(long j) {
            this.desiredBundleSizeBytes = j;
        }

        @DoFn.ProcessElement
        public void process(DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>>.ProcessContext processContext) {
            MatchResult.Metadata metadata = processContext.element().getMetadata();
            if (!metadata.isReadSeekEfficient()) {
                processContext.output(KV.of(processContext.element(), new OffsetRange(0L, metadata.sizeBytes())));
                return;
            }
            Iterator<OffsetRange> it = new OffsetRange(0L, metadata.sizeBytes()).split(this.desiredBundleSizeBytes, 0L).iterator();
            while (it.hasNext()) {
                processContext.output(KV.of(processContext.element(), it.next()));
            }
        }
    }

    public ReadAllViaFileBasedSource(long j, SerializableFunction<String, ? extends FileBasedSource<T>> serializableFunction, Coder<T> coder) {
        this.desiredBundleSizeBytes = j;
        this.createSource = serializableFunction;
        this.coder = coder;
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    /* renamed from: expand, reason: avoid collision after fix types in other method and merged with bridge method [inline-methods] */
    public PCollection<T> mo3110expand(PCollection<FileIO.ReadableFile> pCollection) {
        return ((PCollection) ((PCollection) ((PCollection) pCollection.apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(this.desiredBundleSizeBytes)))).apply("Reshuffle", Reshuffle.viaRandomKey())).apply("Read ranges", ParDo.of(new ReadFileRangesFn(this.createSource)))).setCoder(this.coder);
    }
}
