package org.apache.beam.runners.direct;

import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.InProcessPipelineRunner;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.class */
public class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    private final ConcurrentMap<EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?>>> sourceEvaluators = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.class */
    public static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
        private static final int ARBITRARY_MAX_ELEMENTS = 10;
        private final AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> transform;
        private final InProcessEvaluationContext evaluationContext;
        private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
        private final UnboundedSource<OutputT, ?> source;
        private UnboundedSource.CheckpointMark checkpointMark = null;

        public UnboundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext, UnboundedSource<OutputT, ?> unboundedSource, ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> concurrentLinkedQueue) {
            this.transform = appliedPTransform;
            this.evaluationContext = inProcessEvaluationContext;
            this.evaluatorQueue = concurrentLinkedQueue;
            this.source = unboundedSource;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<Object> windowedValue) {
        }

        /* JADX WARN: Code restructure failed: missing block: B:12:0x0051, code lost:
        
            r4.checkpointMark = r0.getCheckpointMark();
            r4.checkpointMark.finalizeCheckpoint();
            r0 = org.apache.beam.runners.direct.StepTransformResult.withHold(r4.transform, r0.getWatermark()).addOutput(r0, new org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle[0]).build();
            r4.evaluatorQueue.offer(r4);
         */
        /* JADX WARN: Code restructure failed: missing block: B:13:0x0088, code lost:
        
            if (r0 == null) goto L17;
         */
        /* JADX WARN: Code restructure failed: missing block: B:15:0x008c, code lost:
        
            if (0 == 0) goto L16;
         */
        /* JADX WARN: Code restructure failed: missing block: B:16:0x00a1, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:18:0x008f, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:20:0x0096, code lost:
        
            r11 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:21:0x0098, code lost:
        
            r0.addSuppressed(r11);
         */
        /* JADX WARN: Code restructure failed: missing block: B:4:0x002b, code lost:
        
            if (r0.start() != false) goto L5;
         */
        /* JADX WARN: Code restructure failed: missing block: B:5:0x002e, code lost:
        
            r0.add(org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow(r0.getCurrent(), r0.getCurrentTimestamp()));
            r8 = r8 + 1;
         */
        /* JADX WARN: Code restructure failed: missing block: B:6:0x0047, code lost:
        
            if (r8 >= 10) goto L39;
         */
        /* JADX WARN: Code restructure failed: missing block: B:8:0x004e, code lost:
        
            if (r0.advance() != false) goto L41;
         */
        @Override // org.apache.beam.runners.direct.TransformEvaluator
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public org.apache.beam.runners.direct.InProcessTransformResult finishBundle() throws java.io.IOException {
            /*
                r4 = this;
                r0 = r4
                org.apache.beam.runners.direct.InProcessEvaluationContext r0 = r0.evaluationContext
                r1 = r4
                org.apache.beam.sdk.transforms.AppliedPTransform<?, org.apache.beam.sdk.values.PCollection<OutputT>, org.apache.beam.sdk.io.Read$Unbounded<OutputT>> r1 = r1.transform
                org.apache.beam.sdk.values.POutput r1 = r1.getOutput()
                org.apache.beam.sdk.values.PCollection r1 = (org.apache.beam.sdk.values.PCollection) r1
                org.apache.beam.runners.direct.InProcessPipelineRunner$UncommittedBundle r0 = r0.createRootBundle(r1)
                r5 = r0
                r0 = r4
                r1 = r4
                org.apache.beam.sdk.io.UnboundedSource<OutputT, ?> r1 = r1.source
                r2 = r4
                org.apache.beam.runners.direct.InProcessEvaluationContext r2 = r2.evaluationContext
                org.apache.beam.runners.direct.InProcessPipelineOptions r2 = r2.getPipelineOptions()
                org.apache.beam.sdk.io.UnboundedSource$UnboundedReader r0 = r0.createReader(r1, r2)
                r6 = r0
                r0 = 0
                r7 = r0
                r0 = 0
                r8 = r0
                r0 = r6
                boolean r0 = r0.start()     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                if (r0 == 0) goto L51
            L2e:
                r0 = r5
                r1 = r6
                java.lang.Object r1 = r1.getCurrent()     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r2 = r6
                org.joda.time.Instant r2 = r2.getCurrentTimestamp()     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                org.apache.beam.sdk.util.WindowedValue r1 = org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow(r1, r2)     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                org.apache.beam.runners.direct.InProcessPipelineRunner$UncommittedBundle r0 = r0.add(r1)     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                int r8 = r8 + 1
                r0 = r8
                r1 = 10
                if (r0 >= r1) goto L51
                r0 = r6
                boolean r0 = r0.advance()     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                if (r0 != 0) goto L2e
            L51:
                r0 = r4
                r1 = r6
                org.apache.beam.sdk.io.UnboundedSource$CheckpointMark r1 = r1.getCheckpointMark()     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r0.checkpointMark = r1     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r0 = r4
                org.apache.beam.sdk.io.UnboundedSource$CheckpointMark r0 = r0.checkpointMark     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r0.finalizeCheckpoint()     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r0 = r4
                org.apache.beam.sdk.transforms.AppliedPTransform<?, org.apache.beam.sdk.values.PCollection<OutputT>, org.apache.beam.sdk.io.Read$Unbounded<OutputT>> r0 = r0.transform     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r1 = r6
                org.joda.time.Instant r1 = r1.getWatermark()     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                org.apache.beam.runners.direct.StepTransformResult$Builder r0 = org.apache.beam.runners.direct.StepTransformResult.withHold(r0, r1)     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r1 = r5
                r2 = 0
                org.apache.beam.runners.direct.InProcessPipelineRunner$UncommittedBundle[] r2 = new org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle[r2]     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                org.apache.beam.runners.direct.StepTransformResult$Builder r0 = r0.addOutput(r1, r2)     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                org.apache.beam.runners.direct.StepTransformResult r0 = r0.build()     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r9 = r0
                r0 = r4
                java.util.concurrent.ConcurrentLinkedQueue<org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator<OutputT>> r0 = r0.evaluatorQueue     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r1 = r4
                boolean r0 = r0.offer(r1)     // Catch: java.lang.Throwable -> La8 java.lang.Throwable -> Lb0
                r0 = r9
                r10 = r0
                r0 = r6
                if (r0 == 0) goto La5
                r0 = r7
                if (r0 == 0) goto La1
                r0 = r6
                r0.close()     // Catch: java.lang.Throwable -> L96
                goto La5
            L96:
                r11 = move-exception
                r0 = r7
                r1 = r11
                r0.addSuppressed(r1)
                goto La5
            La1:
                r0 = r6
                r0.close()
            La5:
                r0 = r10
                return r0
            La8:
                r8 = move-exception
                r0 = r8
                r7 = r0
                r0 = r8
                throw r0     // Catch: java.lang.Throwable -> Lb0
            Lb0:
                r12 = move-exception
                r0 = r6
                if (r0 == 0) goto Ld0
                r0 = r7
                if (r0 == 0) goto Lcc
                r0 = r6
                r0.close()     // Catch: java.lang.Throwable -> Lc1
                goto Ld0
            Lc1:
                r13 = move-exception
                r0 = r7
                r1 = r13
                r0.addSuppressed(r1)
                goto Ld0
            Lcc:
                r0 = r6
                r0.close()
            Ld0:
                r0 = r12
                throw r0
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator.finishBundle():org.apache.beam.runners.direct.InProcessTransformResult");
        }

        private <CheckpointMarkT extends UnboundedSource.CheckpointMark> UnboundedSource.UnboundedReader<OutputT> createReader(UnboundedSource<OutputT, CheckpointMarkT> unboundedSource, PipelineOptions pipelineOptions) {
            return unboundedSource.createReader(pipelineOptions, this.checkpointMark);
        }
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, @Nullable InProcessPipelineRunner.CommittedBundle<?> committedBundle, InProcessEvaluationContext inProcessEvaluationContext) {
        return (TransformEvaluator<InputT>) getTransformEvaluator(appliedPTransform, inProcessEvaluationContext);
    }

    private <OutputT> TransformEvaluator<?> getTransformEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext) {
        return getTransformEvaluatorQueue(appliedPTransform, inProcessEvaluationContext).poll();
    }

    private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext) {
        EvaluatorKey evaluatorKey = new EvaluatorKey(appliedPTransform, inProcessEvaluationContext);
        ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?>> concurrentLinkedQueue = this.sourceEvaluators.get(evaluatorKey);
        if (concurrentLinkedQueue == null) {
            concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
            if (this.sourceEvaluators.putIfAbsent(evaluatorKey, concurrentLinkedQueue) == null) {
                concurrentLinkedQueue.offer(new UnboundedReadEvaluator(appliedPTransform, inProcessEvaluationContext, appliedPTransform.getTransform().getSource(), concurrentLinkedQueue));
            } else {
                concurrentLinkedQueue = this.sourceEvaluators.get(evaluatorKey);
            }
        }
        return concurrentLinkedQueue;
    }
}
