package org.apache.beam.runners.direct;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.repackaged.direct_java.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.PCollectionViewTesting;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/EvaluationContextTest.class */
public class EvaluationContextTest implements Serializable {
    private transient EvaluationContext context;
    private transient PCollection<byte[]> impulse;
    private transient PCollection<KV<String, byte[]>> downstream;
    private transient PCollectionView<Iterable<byte[]>> view;
    private transient PCollection<byte[]> unbounded;
    private transient DirectGraph graph;
    private transient AppliedPTransform<?, ?, ?> impulseProducer;
    private transient AppliedPTransform<?, ?, ?> downstreamProducer;
    private transient AppliedPTransform<?, ?, ?> unboundedProducer;

    @Rule
    public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* loaded from: input_file:org/apache/beam/runners/direct/EvaluationContextTest$TestBoundedWindow.class */
    private static class TestBoundedWindow extends BoundedWindow {
        private final Instant ts;

        public TestBoundedWindow(Instant instant) {
            this.ts = instant;
        }

        public Instant maxTimestamp() {
            return this.ts;
        }
    }

    @Before
    public void setup() {
        DirectRunner fromOptions = DirectRunner.fromOptions(PipelineOptionsFactory.create());
        this.impulse = this.p.apply(Impulse.create());
        this.downstream = this.impulse.apply(WithKeys.of("foo"));
        this.view = this.impulse.apply(View.asIterable());
        this.unbounded = this.p.apply(Impulse.create()).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
        this.impulse.apply(ParDo.of(new DoFn<byte[], Void>() { // from class: org.apache.beam.runners.direct.EvaluationContextTest.1
            @DoFn.ProcessElement
            public void process() {
            }
        }).withSideInputs(new PCollectionView[]{this.view}));
        fromOptions.performRewrites(this.p);
        KeyedPValueTrackingVisitor create = KeyedPValueTrackingVisitor.create();
        this.p.traverseTopologically(create);
        ImmutableListBundleFactory create2 = ImmutableListBundleFactory.create();
        DirectGraphs.performDirectOverrides(this.p);
        this.graph = DirectGraphs.getGraph(this.p);
        this.context = EvaluationContext.create(NanosOffsetClock.create(), create2, this.graph, create.getKeyedPValues(), Executors.newSingleThreadExecutor());
        this.impulseProducer = this.graph.getProducer(this.impulse);
        this.downstreamProducer = this.graph.getProducer(this.downstream);
        this.unboundedProducer = this.graph.getProducer(this.unbounded);
    }

    @Test
    public void writeToViewWriterThenReadReads() {
        PCollectionViewWriter createPCollectionViewWriter = this.context.createPCollectionViewWriter(PCollection.createPrimitiveOutputInternal(this.p, WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED, IterableCoder.of(KvCoder.of(VoidCoder.of(), VarIntCoder.of()))), this.view);
        TestBoundedWindow testBoundedWindow = new TestBoundedWindow(new Instant(1024L));
        TestBoundedWindow testBoundedWindow2 = new TestBoundedWindow(new Instant(899999L));
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator it = PCollectionViewTesting.materializeValuesFor(this.view.getPipeline().getOptions(), View.asIterable(), new Object[]{1}).iterator();
        while (it.hasNext()) {
            builder.add(WindowedValue.of(it.next(), new Instant(1222L), testBoundedWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING));
        }
        Iterator it2 = PCollectionViewTesting.materializeValuesFor(this.view.getPipeline().getOptions(), View.asIterable(), new Object[]{2}).iterator();
        while (it2.hasNext()) {
            builder.add(WindowedValue.of(it2.next(), new Instant(8766L), testBoundedWindow2, PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0L, 0L)));
        }
        createPCollectionViewWriter.add(builder.build());
        ReadyCheckingSideInputReader createSideInputReader = this.context.createSideInputReader(ImmutableList.of(this.view));
        Assert.assertThat((Iterable) createSideInputReader.get(this.view, testBoundedWindow), Matchers.containsInAnyOrder(new Serializable[]{1}));
        Assert.assertThat((Iterable) createSideInputReader.get(this.view, testBoundedWindow2), Matchers.containsInAnyOrder(new Serializable[]{2}));
        ImmutableList.Builder builder2 = ImmutableList.builder();
        Iterator it3 = PCollectionViewTesting.materializeValuesFor(this.view.getPipeline().getOptions(), View.asIterable(), new Object[]{4444}).iterator();
        while (it3.hasNext()) {
            builder2.add(WindowedValue.of(it3.next(), new Instant(8677L), testBoundedWindow2, PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 1L, 1L)));
        }
        createPCollectionViewWriter.add(builder2.build());
        Assert.assertThat((Iterable) createSideInputReader.get(this.view, testBoundedWindow2), Matchers.containsInAnyOrder(new Serializable[]{2}));
        Assert.assertThat((Iterable) this.context.createSideInputReader(ImmutableList.of(this.view)).get(this.view, testBoundedWindow2), Matchers.containsInAnyOrder(new Serializable[]{4444}));
    }

    @Test
    public void getExecutionContextSameStepSameKeyState() {
        DirectExecutionContext executionContext = this.context.getExecutionContext(this.impulseProducer, StructuralKey.of("foo", StringUtf8Coder.of()));
        StateTag bag = StateTags.bag("myBag", VarIntCoder.of());
        DirectExecutionContext.DirectStepContext stepContext = executionContext.getStepContext("s1");
        stepContext.stateInternals().state(StateNamespaces.global(), bag).add(1);
        this.context.handleResult(ImmutableListBundleFactory.create().createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), this.impulse).commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(this.impulseProducer).withState(stepContext.commitState()).build());
        Assert.assertThat(this.context.getExecutionContext(this.impulseProducer, StructuralKey.of("foo", StringUtf8Coder.of())).getStepContext("s1").stateInternals().state(StateNamespaces.global(), bag).read(), Matchers.contains(new Integer[]{1}));
    }

    @Test
    public void getExecutionContextDifferentKeysIndependentState() {
        DirectExecutionContext executionContext = this.context.getExecutionContext(this.impulseProducer, StructuralKey.of("foo", StringUtf8Coder.of()));
        StateTag bag = StateTags.bag("myBag", VarIntCoder.of());
        executionContext.getStepContext("s1").stateInternals().state(StateNamespaces.global(), bag).add(1);
        DirectExecutionContext executionContext2 = this.context.getExecutionContext(this.impulseProducer, StructuralKey.of("bar", StringUtf8Coder.of()));
        Assert.assertThat(executionContext2, Matchers.not(Matchers.equalTo(executionContext)));
        Assert.assertThat(executionContext2.getStepContext("s1").stateInternals().state(StateNamespaces.global(), bag).read(), Matchers.emptyIterable());
    }

    @Test
    public void getExecutionContextDifferentStepsIndependentState() {
        StructuralKey of = StructuralKey.of("foo", StringUtf8Coder.of());
        DirectExecutionContext executionContext = this.context.getExecutionContext(this.impulseProducer, of);
        StateTag bag = StateTags.bag("myBag", VarIntCoder.of());
        executionContext.getStepContext("s1").stateInternals().state(StateNamespaces.global(), bag).add(1);
        Assert.assertThat(this.context.getExecutionContext(this.downstreamProducer, of).getStepContext("s1").stateInternals().state(StateNamespaces.global(), bag).read(), Matchers.emptyIterable());
    }

    @Test
    public void handleResultStoresState() {
        StructuralKey of = StructuralKey.of("foo".getBytes(StandardCharsets.UTF_8), ByteArrayCoder.of());
        DirectExecutionContext executionContext = this.context.getExecutionContext(this.downstreamProducer, of);
        StateTag bag = StateTags.bag("myBag", VarIntCoder.of());
        CopyOnAccessInMemoryStateInternals stateInternals = executionContext.getStepContext("s1").stateInternals();
        BagState state = stateInternals.state(StateNamespaces.global(), bag);
        state.add(1);
        state.add(2);
        state.add(4);
        this.context.handleResult(this.context.createKeyedBundle(of, this.impulse).commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(this.downstreamProducer).withState(stateInternals).build());
        Assert.assertThat(this.context.getExecutionContext(this.downstreamProducer, of).getStepContext("s1").stateInternals().state(StateNamespaces.global(), bag).read(), Matchers.contains(new Integer[]{1, 2, 4}));
    }

    @Test
    public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Objects.requireNonNull(countDownLatch);
        this.context.scheduleAfterOutputWouldBeProduced(this.downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), countDownLatch::countDown);
        this.context.handleResult((CommittedBundle) null, ImmutableList.of(), StepTransformResult.withHold(this.impulseProducer, new Instant(0L)).build());
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(500L, TimeUnit.MILLISECONDS)), Matchers.is(false));
        this.context.handleResult((CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.impulseProducer).build());
        this.context.forceRefresh();
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(1L, TimeUnit.SECONDS)), Matchers.is(true));
    }

    @Test
    public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception {
        this.context.handleResult((CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.impulseProducer).build());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.context.extractFiredTimers();
        Objects.requireNonNull(countDownLatch);
        this.context.scheduleAfterOutputWouldBeProduced(this.downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), countDownLatch::countDown);
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(1L, TimeUnit.SECONDS)), Matchers.is(true));
    }

    @Test
    public void extractFiredTimersExtractsTimers() {
        this.context.handleResult((CommittedBundle) null, ImmutableList.of(), StepTransformResult.withHold(this.impulseProducer, new Instant(0L)).build());
        StructuralKey of = StructuralKey.of(Integer.valueOf("foo".length()), VarIntCoder.of());
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME);
        StepTransformResult build = StepTransformResult.withoutHold(this.downstreamProducer).withState(CopyOnAccessInMemoryStateInternals.withUnderlying(of, (CopyOnAccessInMemoryStateInternals) null)).withTimerUpdate(WatermarkManager.TimerUpdate.builder(of).setTimer(of2).build()).build();
        Assert.assertThat(this.context.extractFiredTimers(), Matchers.emptyIterable());
        this.context.handleResult(this.context.createKeyedBundle(of, this.impulse).commit(Instant.now()), ImmutableList.of(), build);
        Assert.assertThat(this.context.extractFiredTimers(), Matchers.emptyIterable());
        this.context.handleResult((CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.impulseProducer).build());
        Collection extractFiredTimers = this.context.extractFiredTimers();
        Assert.assertThat(((WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers)).getKey(), Matchers.equalTo(of));
        Assert.assertThat(((WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers)).getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of2}));
        Assert.assertThat(this.context.extractFiredTimers(), Matchers.emptyIterable());
    }

    @Test
    public void createKeyedBundleKeyed() {
        StructuralKey of = StructuralKey.of("foo", StringUtf8Coder.of());
        Assert.assertThat(this.context.createKeyedBundle(of, this.downstream).commit(Instant.now()).getKey(), Matchers.equalTo(of));
    }

    @Test
    public void isDoneWithUnboundedPCollection() {
        Assert.assertThat(Boolean.valueOf(this.context.isDone(this.unboundedProducer)), Matchers.is(false));
        this.context.handleResult((CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.unboundedProducer).build());
        this.context.extractFiredTimers();
        Assert.assertThat(Boolean.valueOf(this.context.isDone(this.unboundedProducer)), Matchers.is(true));
    }

    @Test
    public void isDoneWithPartiallyDone() {
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(false));
        UncommittedBundle createBundle = this.context.createBundle(this.impulse);
        createBundle.add(WindowedValue.valueInGlobalWindow(new byte[0]));
        CommittedBundle committedBundle = (CommittedBundle) Iterables.getOnlyElement(this.context.handleResult((CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.impulseProducer).addOutput(createBundle, new UncommittedBundle[0]).build()).getOutputs());
        this.context.handleResult((CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.unboundedProducer).build());
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(false));
        Iterator it = this.graph.getPerElementConsumers(this.impulse).iterator();
        while (it.hasNext()) {
            this.context.handleResult(committedBundle, ImmutableList.of(), StepTransformResult.withoutHold((AppliedPTransform) it.next()).build());
        }
        this.context.extractFiredTimers();
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(true));
    }
}
