package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.class */
public final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(BoundedReadEvaluatorFactory.class);
    private final EvaluationContext evaluationContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory$BoundedReadEvaluator.class */
    public static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<BoundedSourceShard<OutputT>> {
        private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
        private final EvaluationContext evaluationContext;
        private StepTransformResult.Builder resultBuilder;

        public BoundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, ?> appliedPTransform, EvaluationContext evaluationContext) {
            this.transform = appliedPTransform;
            this.evaluationContext = evaluationContext;
            this.resultBuilder = StepTransformResult.withoutHold(appliedPTransform);
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<BoundedSourceShard<OutputT>> windowedValue) throws IOException {
            BoundedSource.BoundedReader createReader = ((BoundedSourceShard) windowedValue.getValue()).getSource().createReader(this.evaluationContext.getPipelineOptions());
            Throwable th = null;
            try {
                try {
                    DirectRunner.UncommittedBundle<?> createBundle = this.evaluationContext.createBundle(this.transform.getOutput());
                    for (boolean start = createReader.start(); start; start = createReader.advance()) {
                        createBundle.add(WindowedValue.timestampedValueInGlobalWindow(createReader.getCurrent(), createReader.getCurrentTimestamp()));
                    }
                    this.resultBuilder.addOutput(createBundle, new DirectRunner.UncommittedBundle[0]);
                    if (createReader != null) {
                        if (0 == 0) {
                            createReader.close();
                            return;
                        }
                        try {
                            createReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (createReader != null) {
                    if (th != null) {
                        try {
                            createReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createReader.close();
                    }
                }
                throw th4;
            }
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult finishBundle() {
            return this.resultBuilder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory$BoundedSourceShard.class */
    public static abstract class BoundedSourceShard<T> {
        static <T> BoundedSourceShard<T> of(BoundedSource<T> boundedSource) {
            return new AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard(boundedSource);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BoundedSource<T> getSource();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory$InputProvider.class */
    public static class InputProvider implements RootInputProvider {
        private final EvaluationContext evaluationContext;

        /* JADX INFO: Access modifiers changed from: package-private */
        public InputProvider(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override // org.apache.beam.runners.direct.RootInputProvider
        public Collection<DirectRunner.CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> appliedPTransform, int i) throws Exception {
            return createInitialSplits(appliedPTransform, i);
        }

        private <OutputT> Collection<DirectRunner.CommittedBundle<BoundedSourceShard<OutputT>>> createInitialSplits(AppliedPTransform<PBegin, ?, Read.Bounded<OutputT>> appliedPTransform, int i) throws Exception {
            BoundedSource source = appliedPTransform.getTransform().getSource();
            DirectOptions pipelineOptions = this.evaluationContext.getPipelineOptions();
            List splitIntoBundles = source.splitIntoBundles(source.getEstimatedSizeBytes(pipelineOptions) / i, pipelineOptions);
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator it = splitIntoBundles.iterator();
            while (it.hasNext()) {
                builder.add((ImmutableList.Builder) this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of((BoundedSource) it.next()))).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
            }
            return builder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<?> committedBundle) throws IOException {
        return (TransformEvaluator<InputT>) createEvaluator(appliedPTransform);
    }

    private <OutputT> TransformEvaluator<?> createEvaluator(AppliedPTransform<?, PCollection<OutputT>, ?> appliedPTransform) {
        return new BoundedReadEvaluator(appliedPTransform, this.evaluationContext);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() {
    }
}
