package org.apache.beam.runners.core.construction;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;

/* loaded from: input_file:org/apache/beam/runners/core/construction/JavaReadViaImpulse.class */
public class JavaReadViaImpulse {
    private static final long DEFAULT_BUNDLE_SIZE_BYTES = 67108864;

    /* loaded from: input_file:org/apache/beam/runners/core/construction/JavaReadViaImpulse$BoundedOverrideFactory.class */
    private static class BoundedOverrideFactory<T> implements PTransformOverrideFactory<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
        private BoundedOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> appliedPTransform) {
            try {
                return PTransformOverrideFactory.PTransformReplacement.of(PBegin.in(appliedPTransform.getPipeline()), JavaReadViaImpulse.bounded(ReadTranslation.boundedSourceFromTransform(appliedPTransform)));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> map, PCollection<T> pCollection) {
            return ReplacementOutputs.singleton(map, pCollection);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PValue>) map, (PCollection) pOutput);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/construction/JavaReadViaImpulse$BoundedReadViaImpulse.class */
    public static class BoundedReadViaImpulse<T> extends PTransform<PBegin, PCollection<T>> {
        private final BoundedSource<T> source;

        private BoundedReadViaImpulse(BoundedSource<T> boundedSource) {
            this.source = boundedSource;
        }

        public PCollection<T> expand(PBegin pBegin) {
            return pBegin.apply(Impulse.create()).apply(ParDo.of(new SplitBoundedSourceFn(this.source, JavaReadViaImpulse.DEFAULT_BUNDLE_SIZE_BYTES))).setCoder(new BoundedSourceCoder()).apply(Reshuffle.viaRandomKey()).apply(ParDo.of(new ReadFromBoundedSourceFn())).setCoder(this.source.getOutputCoder());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/core/construction/JavaReadViaImpulse$BoundedSourceCoder.class */
    public static class BoundedSourceCoder<T> extends CustomCoder<BoundedSource<T>> {
        private final Coder<BoundedSource<T>> coder = SerializableCoder.of(BoundedSource.class);

        BoundedSourceCoder() {
        }

        public void encode(BoundedSource<T> boundedSource, OutputStream outputStream) throws CoderException, IOException {
            this.coder.encode(boundedSource, outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public BoundedSource<T> m7decode(InputStream inputStream) throws CoderException, IOException {
            return (BoundedSource) this.coder.decode(inputStream);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/core/construction/JavaReadViaImpulse$ReadFromBoundedSourceFn.class */
    public static class ReadFromBoundedSourceFn<T> extends DoFn<BoundedSource<T>, T> {
        ReadFromBoundedSourceFn() {
        }

        @DoFn.ProcessElement
        public void readSource(DoFn<BoundedSource<T>, T>.ProcessContext processContext) throws IOException {
            BoundedSource.BoundedReader createReader = ((BoundedSource) processContext.element()).createReader(processContext.getPipelineOptions());
            Throwable th = null;
            try {
                try {
                    for (boolean start = createReader.start(); start; start = createReader.advance()) {
                        processContext.outputWithTimestamp(createReader.getCurrent(), createReader.getCurrentTimestamp());
                    }
                    if (createReader != null) {
                        if (0 == 0) {
                            createReader.close();
                            return;
                        }
                        try {
                            createReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (createReader != null) {
                    if (th != null) {
                        try {
                            createReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createReader.close();
                    }
                }
                throw th4;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/core/construction/JavaReadViaImpulse$SplitBoundedSourceFn.class */
    public static class SplitBoundedSourceFn<T> extends DoFn<byte[], BoundedSource<T>> {
        private final BoundedSource<T> source;
        private final long bundleSize;

        public SplitBoundedSourceFn(BoundedSource<T> boundedSource, long j) {
            this.source = boundedSource;
            this.bundleSize = j;
        }

        @DoFn.ProcessElement
        public void splitSource(DoFn<byte[], BoundedSource<T>>.ProcessContext processContext) throws Exception {
            Iterator it = this.source.split(this.bundleSize, processContext.getPipelineOptions()).iterator();
            while (it.hasNext()) {
                processContext.output((BoundedSource) it.next());
            }
        }
    }

    public static <T> PTransform<PBegin, PCollection<T>> bounded(BoundedSource<T> boundedSource) {
        return new BoundedReadViaImpulse(boundedSource);
    }

    public static PTransformOverride boundedOverride() {
        return PTransformOverride.of(boundedMatcher(), new BoundedOverrideFactory());
    }

    private static PTransformMatcher boundedMatcher() {
        return PTransformMatchers.urnEqualTo(PTransformTranslation.READ_TRANSFORM_URN).and(appliedPTransform -> {
            return ReadTranslation.sourceIsBounded(appliedPTransform) == PCollection.IsBounded.BOUNDED;
        });
    }
}
