package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.InProcessPipelineRunner;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.class */
public final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>> sourceEvaluators = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory$BoundedReadEvaluator.class */
    public static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
        private final AppliedPTransform<?, PCollection<OutputT>, Read.Bounded<OutputT>> transform;
        private final InProcessEvaluationContext evaluationContext;
        private BoundedSource<OutputT> source;

        public BoundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Bounded<OutputT>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext, BoundedSource<OutputT> boundedSource) {
            this.transform = appliedPTransform;
            this.evaluationContext = inProcessEvaluationContext;
            this.source = boundedSource;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<Object> windowedValue) {
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public InProcessTransformResult finishBundle() throws IOException {
            BoundedSource.BoundedReader createReader = this.source.createReader(this.evaluationContext.getPipelineOptions());
            Throwable th = null;
            try {
                InProcessPipelineRunner.UncommittedBundle<?> createRootBundle = this.evaluationContext.createRootBundle(this.transform.getOutput());
                for (boolean start = createReader.start(); start; start = createReader.advance()) {
                    createRootBundle.add(WindowedValue.timestampedValueInGlobalWindow(createReader.getCurrent(), createReader.getCurrentTimestamp()));
                }
                StepTransformResult build = StepTransformResult.withHold(this.transform, BoundedWindow.TIMESTAMP_MAX_VALUE).addOutput(createRootBundle, new InProcessPipelineRunner.UncommittedBundle[0]).build();
                if (createReader != null) {
                    if (0 != 0) {
                        try {
                            createReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createReader.close();
                    }
                }
                return build;
            } catch (Throwable th3) {
                if (createReader != null) {
                    if (0 != 0) {
                        try {
                            createReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createReader.close();
                    }
                }
                throw th3;
            }
        }
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, @Nullable InProcessPipelineRunner.CommittedBundle<?> committedBundle, InProcessEvaluationContext inProcessEvaluationContext) throws IOException {
        return (TransformEvaluator<InputT>) getTransformEvaluator(appliedPTransform, inProcessEvaluationContext);
    }

    private <OutputT> TransformEvaluator<?> getTransformEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Bounded<OutputT>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext) {
        return getTransformEvaluatorQueue(appliedPTransform, inProcessEvaluationContext).poll();
    }

    private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(AppliedPTransform<?, PCollection<OutputT>, Read.Bounded<OutputT>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext) {
        EvaluatorKey evaluatorKey = new EvaluatorKey(appliedPTransform, inProcessEvaluationContext);
        Queue<? extends BoundedReadEvaluator<?>> queue = this.sourceEvaluators.get(evaluatorKey);
        if (queue == null) {
            queue = new ConcurrentLinkedQueue();
            if (this.sourceEvaluators.putIfAbsent(evaluatorKey, queue) == null) {
                queue.offer(new BoundedReadEvaluator(appliedPTransform, inProcessEvaluationContext, appliedPTransform.getTransform().getSource()));
            } else {
                queue = this.sourceEvaluators.get(evaluatorKey);
            }
        }
        return (Queue<BoundedReadEvaluator<OutputT>>) queue;
    }
}
