package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ReplacementOutputs;
import org.apache.beam.repackaged.direct_java.runners.core.construction.TestStreamTranslation;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory.class */
class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$DirectTestStreamFactory.class */
    static class DirectTestStreamFactory<T> implements PTransformOverrideFactory<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
        private final DirectRunner runner;
        static final String DIRECT_TEST_STREAM_URN = "beam:directrunner:transforms:test_stream:v1";

        /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$DirectTestStreamFactory$DirectTestStream.class */
        static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
            private final transient DirectRunner runner;
            private final TestStream<T> original;

            @VisibleForTesting
            DirectTestStream(DirectRunner directRunner, TestStream<T> testStream) {
                this.runner = directRunner;
                this.original = testStream;
            }

            @Override // org.apache.beam.sdk.transforms.PTransform
            public PCollection<T> expand(PBegin pBegin) {
                this.runner.setClockSupplier(new TestClockSupplier());
                return PCollection.createPrimitiveOutputInternal(pBegin.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED, this.original.getValueCoder());
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DirectTestStreamFactory(DirectRunner directRunner) {
            this.runner = directRunner;
        }

        @Override // org.apache.beam.sdk.runners.PTransformOverrideFactory
        public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> appliedPTransform) {
            try {
                return PTransformOverrideFactory.PTransformReplacement.of(appliedPTransform.getPipeline().begin(), new DirectTestStream(this.runner, TestStreamTranslation.getTestStream(appliedPTransform)));
            } catch (IOException e) {
                throw new RuntimeException(String.format("Transform could not be converted to %s", TestStream.class.getSimpleName()), e);
            }
        }

        public Map<PCollection<?>, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PCollection<?>> map, PCollection<T> pCollection) {
            return ReplacementOutputs.singleton(map, pCollection);
        }

        @Override // org.apache.beam.sdk.runners.PTransformOverrideFactory
        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PCollection<?>>) map, (PCollection) pOutput);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$Evaluator.class */
    public static class Evaluator<T> implements TransformEvaluator<TestStreamIndex<T>> {
        private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
        private final EvaluationContext context;
        private final StepTransformResult.Builder resultBuilder;

        private Evaluator(AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> appliedPTransform, EvaluationContext evaluationContext) {
            this.application = appliedPTransform;
            this.context = evaluationContext;
            this.resultBuilder = StepTransformResult.withoutHold(appliedPTransform);
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<TestStreamIndex<T>> windowedValue) throws Exception {
            TestStreamIndex<T> value = windowedValue.getValue();
            List<TestStream.Event<T>> events = value.getTestStream().getEvents();
            int index = value.getIndex();
            Instant timestamp = windowedValue.getTimestamp();
            TestStream.Event<T> event = events.get(index);
            if (event.getType().equals(TestStream.EventType.ELEMENT)) {
                UncommittedBundle<T> createBundle = this.context.createBundle((PCollection) Iterables.getOnlyElement(this.application.getOutputs().values()));
                for (TimestampedValue<T> timestampedValue : ((TestStream.ElementEvent) event).getElements()) {
                    createBundle.add(WindowedValue.timestampedValueInGlobalWindow(timestampedValue.getValue(), timestampedValue.getTimestamp()));
                }
                this.resultBuilder.addOutput(createBundle, new UncommittedBundle[0]);
            }
            if (event.getType().equals(TestStream.EventType.WATERMARK)) {
                timestamp = ((TestStream.WatermarkEvent) event).getWatermark();
            }
            if (event.getType().equals(TestStream.EventType.PROCESSING_TIME)) {
                ((TestClock) this.context.getClock()).advance(((TestStream.ProcessingTimeEvent) event).getProcessingTimeAdvance());
            }
            TestStreamIndex<T> next = value.next();
            if (next.getIndex() < events.size()) {
                this.resultBuilder.addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(next, timestamp)));
            }
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<TestStreamIndex<T>> finishBundle() throws Exception {
            return this.resultBuilder.build();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$InputProvider.class */
    static class InputProvider<T> implements RootInputProvider<T, TestStreamIndex<T>, PBegin> {
        private final EvaluationContext evaluationContext;

        /* JADX INFO: Access modifiers changed from: package-private */
        public InputProvider(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override // org.apache.beam.runners.direct.RootInputProvider
        public Collection<CommittedBundle<TestStreamIndex<T>>> getInitialInputs(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> appliedPTransform, int i) {
            return Collections.singleton(this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(((DirectTestStreamFactory.DirectTestStream) appliedPTransform.getTransform()).original))).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$TestClock.class */
    public static class TestClock implements Clock {
        private Instant currentTime = BoundedWindow.TIMESTAMP_MIN_VALUE;

        TestClock() {
        }

        public synchronized void advance(Duration duration) {
            this.currentTime = this.currentTime.plus(duration);
        }

        @Override // org.apache.beam.runners.direct.Clock
        public synchronized Instant now() {
            return this.currentTime;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$TestClockSupplier.class */
    public static class TestClockSupplier implements Supplier<Clock> {
        private TestClockSupplier() {
        }

        @Override // org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier, java.util.function.Supplier
        public Clock get() {
            return new TestClock();
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactory$TestStreamIndex.class */
    static abstract class TestStreamIndex<T> {
        static <T> TestStreamIndex<T> of(TestStream<T> testStream) {
            return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex(testStream, 0);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract TestStream<T> getTestStream();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getIndex();

        TestStreamIndex<T> next() {
            return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex(getTestStream(), getIndex() + 1);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) {
        return createEvaluator(appliedPTransform);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() throws Exception {
    }

    private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> appliedPTransform) {
        return new Evaluator(appliedPTransform, this.evaluationContext);
    }
}
