package org.apache.beam.runners.direct;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Supplier;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory.class */
public class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$DirectTestStreamFactory.class */
    static class DirectTestStreamFactory implements PTransformOverrideFactory {

        /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$DirectTestStreamFactory$DirectTestStream.class */
        private static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
            private final TestStream<T> original;

            private DirectTestStream(TestStream<T> testStream) {
                this.original = testStream;
            }

            public PCollection<T> apply(PBegin pBegin) {
                PipelineRunner runner = pBegin.getPipeline().getRunner();
                Preconditions.checkState(runner instanceof DirectRunner, "%s can only be used when running with the %s", getClass().getSimpleName(), DirectRunner.class.getSimpleName());
                ((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
                return PCollection.createPrimitiveOutputInternal(pBegin.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED).setCoder(this.original.getValueCoder());
            }
        }

        @Override // org.apache.beam.runners.direct.PTransformOverrideFactory
        public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> pTransform) {
            return pTransform instanceof TestStream ? new DirectTestStream((TestStream) pTransform) : pTransform;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$Evaluator.class */
    public static class Evaluator<T> implements TransformEvaluator<TestStreamIndex<T>> {
        private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
        private final EvaluationContext context;
        private final StepTransformResult.Builder resultBuilder;

        private Evaluator(AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> appliedPTransform, EvaluationContext evaluationContext) {
            this.application = appliedPTransform;
            this.context = evaluationContext;
            this.resultBuilder = StepTransformResult.withoutHold(appliedPTransform);
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<TestStreamIndex<T>> windowedValue) throws Exception {
            TestStreamIndex testStreamIndex = (TestStreamIndex) windowedValue.getValue();
            List events = testStreamIndex.getTestStream().getEvents();
            int index = testStreamIndex.getIndex();
            Instant timestamp = windowedValue.getTimestamp();
            TestStream.ElementEvent elementEvent = (TestStream.Event) events.get(index);
            if (elementEvent.getType().equals(TestStream.EventType.ELEMENT)) {
                DirectRunner.UncommittedBundle<T> createBundle = this.context.createBundle((PCollection) this.application.getOutput());
                for (TimestampedValue timestampedValue : elementEvent.getElements()) {
                    createBundle.add(WindowedValue.timestampedValueInGlobalWindow(timestampedValue.getValue(), timestampedValue.getTimestamp()));
                }
                this.resultBuilder.addOutput(createBundle, new DirectRunner.UncommittedBundle[0]);
            }
            if (elementEvent.getType().equals(TestStream.EventType.WATERMARK)) {
                timestamp = ((TestStream.WatermarkEvent) elementEvent).getWatermark();
            }
            if (elementEvent.getType().equals(TestStream.EventType.PROCESSING_TIME)) {
                ((TestClock) this.context.getClock()).advance(((TestStream.ProcessingTimeEvent) elementEvent).getProcessingTimeAdvance());
            }
            TestStreamIndex<T> next = testStreamIndex.next();
            if (next.getIndex() < events.size()) {
                this.resultBuilder.addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(next, timestamp)));
            }
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult finishBundle() throws Exception {
            return this.resultBuilder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$InputProvider.class */
    public static class InputProvider implements RootInputProvider {
        private final EvaluationContext evaluationContext;

        /* JADX INFO: Access modifiers changed from: package-private */
        public InputProvider(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override // org.apache.beam.runners.direct.RootInputProvider
        public Collection<DirectRunner.CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> appliedPTransform, int i) {
            return createInputBundle(appliedPTransform);
        }

        private <T> Collection<DirectRunner.CommittedBundle<?>> createInputBundle(AppliedPTransform<PBegin, ?, TestStream<T>> appliedPTransform) {
            return Collections.singleton(this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(appliedPTransform.getTransform()))).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$TestClock.class */
    public static class TestClock implements Clock {
        private final AtomicReference<Instant> currentTime = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);

        TestClock() {
        }

        public void advance(Duration duration) {
            Instant instant = this.currentTime.get();
            this.currentTime.compareAndSet(instant, instant.plus(duration));
        }

        @Override // org.apache.beam.runners.direct.Clock
        public Instant now() {
            return this.currentTime.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$TestClockSupplier.class */
    public static class TestClockSupplier implements Supplier<Clock> {
        private TestClockSupplier() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.runners.direct.repackaged.com.google.common.base.Supplier
        public Clock get() {
            return new TestClock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$TestStreamIndex.class */
    public static abstract class TestStreamIndex<T> {
        static <T> TestStreamIndex<T> of(TestStream<T> testStream) {
            return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex(testStream, 0);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract TestStream<T> getTestStream();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getIndex();

        TestStreamIndex<T> next() {
            return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex(getTestStream(), getIndex() + 1);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<?> committedBundle) {
        return createEvaluator(appliedPTransform);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() throws Exception {
    }

    private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> appliedPTransform) {
        return new Evaluator(appliedPTransform, this.evaluationContext);
    }
}
