package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult;
import com.google.cloud.dataflow.sdk.testing.TestStream;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/inprocess/TestStreamEvaluatorFactory.class */
public class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
    private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators = LockedKeyedResourcePool.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/inprocess/TestStreamEvaluatorFactory$CreateEvaluator.class */
    public static class CreateEvaluator<OutputT> implements Callable<Evaluator<?>> {
        private final AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application;
        private final InProcessEvaluationContext evaluationContext;
        private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators;

        public CreateEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext, KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> keyedResourcePool) {
            this.application = appliedPTransform;
            this.evaluationContext = inProcessEvaluationContext;
            this.evaluators = keyedResourcePool;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Evaluator<?> call() throws Exception {
            return new Evaluator<>(this.application, this.evaluationContext, this.evaluators);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/inprocess/TestStreamEvaluatorFactory$Evaluator.class */
    public static class Evaluator<T> implements TransformEvaluator<Object> {
        private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
        private final InProcessEvaluationContext context;
        private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache;
        private final List<TestStream.Event<T>> events;
        private int index;
        private Instant currentWatermark;

        private Evaluator(AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext, KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> keyedResourcePool) {
            this.application = appliedPTransform;
            this.context = inProcessEvaluationContext;
            this.cache = keyedResourcePool;
            this.events = appliedPTransform.getTransform().getEvents();
            this.index = 0;
            this.currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        @Override // com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator
        public void processElement(WindowedValue<Object> windowedValue) throws Exception {
        }

        @Override // com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator
        public InProcessTransformResult finishBundle() throws Exception {
            try {
                if (this.index >= this.events.size()) {
                    StepTransformResult build = StepTransformResult.withoutHold(this.application).build();
                    this.cache.release(this.application, this);
                    return build;
                }
                TestStream.Event<T> event = this.events.get(this.index);
                if (event.getType().equals(TestStream.EventType.WATERMARK)) {
                    this.currentWatermark = ((TestStream.WatermarkEvent) event).getWatermark();
                }
                StepTransformResult.Builder withHold = StepTransformResult.withHold(this.application, this.currentWatermark);
                if (event.getType().equals(TestStream.EventType.ELEMENT)) {
                    InProcessPipelineRunner.UncommittedBundle<?> createRootBundle = this.context.createRootBundle(this.application.getOutput());
                    for (TimestampedValue<T> timestampedValue : ((TestStream.ElementEvent) event).getElements()) {
                        createRootBundle.add(WindowedValue.timestampedValueInGlobalWindow(timestampedValue.getValue(), timestampedValue.getTimestamp()));
                    }
                    withHold.addOutput(createRootBundle, new InProcessPipelineRunner.UncommittedBundle[0]);
                }
                if (event.getType().equals(TestStream.EventType.PROCESSING_TIME)) {
                    ((TestClock) this.context.getClock()).advance(((TestStream.ProcessingTimeEvent) event).getProcessingTimeAdvance());
                }
                this.index++;
                StepTransformResult build2 = withHold.build();
                this.cache.release(this.application, this);
                return build2;
            } catch (Throwable th) {
                this.cache.release(this.application, this);
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/inprocess/TestStreamEvaluatorFactory$InProcessTestStreamFactory.class */
    static class InProcessTestStreamFactory implements PTransformOverrideFactory {

        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/inprocess/TestStreamEvaluatorFactory$InProcessTestStreamFactory$DirectTestStream.class */
        private static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
            private final TestStream<T> original;

            private DirectTestStream(TestStream testStream) {
                this.original = testStream;
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.PTransform
            public PCollection<T> apply(PBegin pBegin) {
                PipelineRunner<?> runner = pBegin.getPipeline().getRunner();
                Preconditions.checkState(runner instanceof InProcessPipelineRunner, "%s can only be used when running with the %s", getClass().getSimpleName(), InProcessPipelineRunner.class.getSimpleName());
                ((InProcessPipelineOptions) pBegin.getPipeline().getOptions().as(InProcessPipelineOptions.class)).setShutdownUnboundedProducersWithMaxWatermark(true);
                ((InProcessPipelineRunner) runner).setClockSupplier(new TestClockSupplier());
                return PCollection.createPrimitiveOutputInternal(pBegin.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED).setCoder((Coder) this.original.getValueCoder());
            }
        }

        @Override // com.google.cloud.dataflow.sdk.runners.inprocess.PTransformOverrideFactory
        public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> pTransform) {
            return pTransform instanceof TestStream ? new DirectTestStream((TestStream) pTransform) : pTransform;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/inprocess/TestStreamEvaluatorFactory$TestClock.class */
    public static class TestClock implements Clock {
        private final AtomicReference<Instant> currentTime;

        private TestClock() {
            this.currentTime = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        public void advance(Duration duration) {
            Instant instant = this.currentTime.get();
            this.currentTime.compareAndSet(instant, instant.plus(duration));
        }

        @Override // com.google.cloud.dataflow.sdk.runners.inprocess.Clock
        public Instant now() {
            return this.currentTime.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/inprocess/TestStreamEvaluatorFactory$TestClockSupplier.class */
    public static class TestClockSupplier implements Supplier<Clock> {
        private TestClockSupplier() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier
        public Clock get() {
            return new TestClock();
        }
    }

    @Override // com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, @Nullable InProcessPipelineRunner.CommittedBundle<?> committedBundle, InProcessEvaluationContext inProcessEvaluationContext) throws Exception {
        return createEvaluator(appliedPTransform, inProcessEvaluationContext);
    }

    private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> appliedPTransform, InProcessEvaluationContext inProcessEvaluationContext) throws ExecutionException {
        return this.evaluators.tryAcquire(appliedPTransform, new CreateEvaluator(appliedPTransform, inProcessEvaluationContext, this.evaluators)).orNull();
    }
}
