package org.apache.beam.runners.direct;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.direct.AggregatorContainer;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/EvaluationContextTest.class */
public class EvaluationContextTest {
    private TestPipeline p;
    private EvaluationContext context;
    private PCollection<Integer> created;
    private PCollection<KV<String, Integer>> downstream;
    private PCollectionView<Iterable<Integer>> view;
    private PCollection<Long> unbounded;
    private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
    private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
    private BundleFactory bundleFactory;

    /* loaded from: input_file:org/apache/beam/runners/direct/EvaluationContextTest$TestBoundedWindow.class */
    private static class TestBoundedWindow extends BoundedWindow {
        private final Instant ts;

        public TestBoundedWindow(Instant instant) {
            this.ts = instant;
        }

        public Instant maxTimestamp() {
            return this.ts;
        }
    }

    @Before
    public void setup() {
        DirectRunner fromOptions = DirectRunner.fromOptions(PipelineOptionsFactory.create());
        this.p = TestPipeline.create();
        this.created = this.p.apply(Create.of(new Integer[]{1, 2, 3}));
        this.downstream = this.created.apply(WithKeys.of("foo"));
        this.view = this.created.apply(View.asIterable());
        this.unbounded = this.p.apply(CountingInput.unbounded());
        ConsumerTrackingPipelineVisitor consumerTrackingPipelineVisitor = new ConsumerTrackingPipelineVisitor();
        this.p.traverseTopologically(consumerTrackingPipelineVisitor);
        this.rootTransforms = consumerTrackingPipelineVisitor.getRootTransforms();
        this.valueToConsumers = consumerTrackingPipelineVisitor.getValueToConsumers();
        this.bundleFactory = ImmutableListBundleFactory.create();
        this.context = EvaluationContext.create(fromOptions.getPipelineOptions(), NanosOffsetClock.create(), ImmutableListBundleFactory.create(), this.rootTransforms, this.valueToConsumers, consumerTrackingPipelineVisitor.getStepNames(), consumerTrackingPipelineVisitor.getViews());
    }

    @Test
    public void writeToViewWriterThenReadReads() {
        DirectRunner.PCollectionViewWriter createPCollectionViewWriter = this.context.createPCollectionViewWriter(PCollection.createPrimitiveOutputInternal(this.p, WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED), this.view);
        TestBoundedWindow testBoundedWindow = new TestBoundedWindow(new Instant(1024L));
        TestBoundedWindow testBoundedWindow2 = new TestBoundedWindow(new Instant(899999L));
        createPCollectionViewWriter.add(ImmutableList.of(WindowedValue.of(1, new Instant(1222L), testBoundedWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.of(2, new Instant(8766L), testBoundedWindow2, PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0L, 0L))));
        ReadyCheckingSideInputReader createSideInputReader = this.context.createSideInputReader(ImmutableList.of(this.view));
        Assert.assertThat(createSideInputReader.get(this.view, testBoundedWindow), Matchers.containsInAnyOrder(new Integer[]{1}));
        Assert.assertThat(createSideInputReader.get(this.view, testBoundedWindow2), Matchers.containsInAnyOrder(new Integer[]{2}));
        createPCollectionViewWriter.add(Collections.singleton(WindowedValue.of(4444, new Instant(8677L), testBoundedWindow2, PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 1L, 1L))));
        Assert.assertThat(createSideInputReader.get(this.view, testBoundedWindow2), Matchers.containsInAnyOrder(new Integer[]{2}));
        Assert.assertThat(this.context.createSideInputReader(ImmutableList.of(this.view)).get(this.view, testBoundedWindow2), Matchers.containsInAnyOrder(new Integer[]{4444}));
    }

    @Test
    public void getExecutionContextSameStepSameKeyState() {
        DirectExecutionContext executionContext = this.context.getExecutionContext(this.created.getProducingTransformInternal(), StructuralKey.of("foo", StringUtf8Coder.of()));
        StateTag bag = StateTags.bag("myBag", VarIntCoder.of());
        DirectExecutionContext.DirectStepContext orCreateStepContext = executionContext.getOrCreateStepContext("s1", "s1");
        orCreateStepContext.stateInternals().state(StateNamespaces.global(), bag).add(1);
        this.context.handleResult(ImmutableListBundleFactory.create().createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), this.created).commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).withState(orCreateStepContext.commitState()).build());
        Assert.assertThat(this.context.getExecutionContext(this.created.getProducingTransformInternal(), StructuralKey.of("foo", StringUtf8Coder.of())).getOrCreateStepContext("s1", "s1").stateInternals().state(StateNamespaces.global(), bag).read(), Matchers.contains(new Integer[]{1}));
    }

    @Test
    public void getExecutionContextDifferentKeysIndependentState() {
        DirectExecutionContext executionContext = this.context.getExecutionContext(this.created.getProducingTransformInternal(), StructuralKey.of("foo", StringUtf8Coder.of()));
        StateTag bag = StateTags.bag("myBag", VarIntCoder.of());
        executionContext.getOrCreateStepContext("s1", "s1").stateInternals().state(StateNamespaces.global(), bag).add(1);
        DirectExecutionContext executionContext2 = this.context.getExecutionContext(this.created.getProducingTransformInternal(), StructuralKey.of("bar", StringUtf8Coder.of()));
        Assert.assertThat(executionContext2, Matchers.not(Matchers.equalTo(executionContext)));
        Assert.assertThat(executionContext2.getOrCreateStepContext("s1", "s1").stateInternals().state(StateNamespaces.global(), bag).read(), Matchers.emptyIterable());
    }

    @Test
    public void getExecutionContextDifferentStepsIndependentState() {
        StructuralKey of = StructuralKey.of("foo", StringUtf8Coder.of());
        DirectExecutionContext executionContext = this.context.getExecutionContext(this.created.getProducingTransformInternal(), of);
        StateTag bag = StateTags.bag("myBag", VarIntCoder.of());
        executionContext.getOrCreateStepContext("s1", "s1").stateInternals().state(StateNamespaces.global(), bag).add(1);
        Assert.assertThat(this.context.getExecutionContext(this.downstream.getProducingTransformInternal(), of).getOrCreateStepContext("s1", "s1").stateInternals().state(StateNamespaces.global(), bag).read(), Matchers.emptyIterable());
    }

    @Test
    public void handleResultCommitsAggregators() {
        Class<?> cls = getClass();
        DirectExecutionContext.DirectStepContext createStepContext = this.context.getExecutionContext(this.created.getProducingTransformInternal(), (StructuralKey) null).createStepContext("STEP", this.created.getProducingTransformInternal().getTransform().getName());
        AggregatorContainer aggregatorContainer = this.context.getAggregatorContainer();
        AggregatorContainer.Mutator createMutator = aggregatorContainer.createMutator();
        createMutator.createAggregatorForDoFn(cls, createStepContext, "foo", new Sum.SumLongFn()).addValue(4L);
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).withAggregatorChanges(createMutator).build());
        Assert.assertThat((Long) this.context.getAggregatorContainer().getAggregate("STEP", "foo"), Matchers.equalTo(4L));
        AggregatorContainer.Mutator createMutator2 = aggregatorContainer.createMutator();
        createMutator2.createAggregatorForDoFn(cls, createStepContext, "foo", new Sum.SumLongFn()).addValue(12L);
        this.context.handleResult(this.context.createBundle(this.created).commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(this.downstream.getProducingTransformInternal()).withAggregatorChanges(createMutator2).build());
        Assert.assertThat((Long) this.context.getAggregatorContainer().getAggregate("STEP", "foo"), Matchers.equalTo(16L));
    }

    @Test
    public void handleResultStoresState() {
        StructuralKey of = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of());
        DirectExecutionContext executionContext = this.context.getExecutionContext(this.downstream.getProducingTransformInternal(), of);
        StateTag bag = StateTags.bag("myBag", VarIntCoder.of());
        CopyOnAccessInMemoryStateInternals stateInternals = executionContext.getOrCreateStepContext("s1", "s1").stateInternals();
        BagState state = stateInternals.state(StateNamespaces.global(), bag);
        state.add(1);
        state.add(2);
        state.add(4);
        this.context.handleResult(this.context.createKeyedBundle(of, this.created).commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(this.downstream.getProducingTransformInternal()).withState(stateInternals).build());
        Assert.assertThat(this.context.getExecutionContext(this.downstream.getProducingTransformInternal(), of).getOrCreateStepContext("s1", "s1").stateInternals().state(StateNamespaces.global(), bag).read(), Matchers.contains(new Integer[]{1, 2, 4}));
    }

    @Test
    public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.context.scheduleAfterOutputWouldBeProduced(this.downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), new Runnable() { // from class: org.apache.beam.runners.direct.EvaluationContextTest.1
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        });
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withHold(this.created.getProducingTransformInternal(), new Instant(0L)).build());
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(500L, TimeUnit.MILLISECONDS)), Matchers.is(false));
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).build());
        this.context.forceRefresh();
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(1L, TimeUnit.SECONDS)), Matchers.is(true));
    }

    @Test
    public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception {
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).build());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.context.extractFiredTimers();
        this.context.scheduleAfterOutputWouldBeProduced(this.downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), new Runnable() { // from class: org.apache.beam.runners.direct.EvaluationContextTest.2
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        });
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(1L, TimeUnit.SECONDS)), Matchers.is(true));
    }

    @Test
    public void extractFiredTimersExtractsTimers() {
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withHold(this.created.getProducingTransformInternal(), new Instant(0L)).build());
        StructuralKey of = StructuralKey.of(Integer.valueOf("foo".length()), VarIntCoder.of());
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
        StepTransformResult build = StepTransformResult.withoutHold(this.downstream.getProducingTransformInternal()).withState(CopyOnAccessInMemoryStateInternals.withUnderlying(of, (CopyOnAccessInMemoryStateInternals) null)).withTimerUpdate(WatermarkManager.TimerUpdate.builder(of).setTimer(of2).build()).build();
        Assert.assertThat(this.context.extractFiredTimers().entrySet(), Matchers.emptyIterable());
        this.context.handleResult(this.context.createKeyedBundle(of, this.created).commit(Instant.now()), ImmutableList.of(), build);
        Assert.assertThat(this.context.extractFiredTimers().entrySet(), Matchers.emptyIterable());
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).build());
        Map extractFiredTimers = this.context.extractFiredTimers();
        Assert.assertThat(extractFiredTimers, Matchers.hasKey(this.downstream.getProducingTransformInternal()));
        Map map = (Map) extractFiredTimers.get(this.downstream.getProducingTransformInternal());
        Assert.assertThat(map, Matchers.hasKey(of));
        WatermarkManager.FiredTimers firedTimers = (WatermarkManager.FiredTimers) map.get(of);
        Assert.assertThat(firedTimers.getTimers(TimeDomain.PROCESSING_TIME), Matchers.emptyIterable());
        Assert.assertThat(firedTimers.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), Matchers.emptyIterable());
        Assert.assertThat(firedTimers.getTimers(TimeDomain.EVENT_TIME), Matchers.contains(new TimerInternals.TimerData[]{of2}));
        Assert.assertThat(this.context.extractFiredTimers().entrySet(), Matchers.emptyIterable());
    }

    @Test
    public void createKeyedBundleKeyed() {
        StructuralKey of = StructuralKey.of("foo", StringUtf8Coder.of());
        Assert.assertThat(this.context.createKeyedBundle(of, this.downstream).commit(Instant.now()).getKey(), Matchers.equalTo(of));
    }

    @Test
    public void isDoneWithUnboundedPCollectionAndShutdown() {
        this.context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
        Assert.assertThat(Boolean.valueOf(this.context.isDone(this.unbounded.getProducingTransformInternal())), Matchers.is(false));
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.unbounded.getProducingTransformInternal()).build());
        this.context.extractFiredTimers();
        Assert.assertThat(Boolean.valueOf(this.context.isDone(this.unbounded.getProducingTransformInternal())), Matchers.is(true));
    }

    @Test
    public void isDoneWithUnboundedPCollectionAndNotShutdown() {
        this.context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
        Assert.assertThat(Boolean.valueOf(this.context.isDone(this.unbounded.getProducingTransformInternal())), Matchers.is(false));
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.unbounded.getProducingTransformInternal()).build());
        Assert.assertThat(Boolean.valueOf(this.context.isDone(this.unbounded.getProducingTransformInternal())), Matchers.is(false));
    }

    @Test
    public void isDoneWithOnlyBoundedPCollections() {
        this.context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
        Assert.assertThat(Boolean.valueOf(this.context.isDone(this.created.getProducingTransformInternal())), Matchers.is(false));
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).build());
        this.context.extractFiredTimers();
        Assert.assertThat(Boolean.valueOf(this.context.isDone(this.created.getProducingTransformInternal())), Matchers.is(true));
    }

    @Test
    public void isDoneWithPartiallyDone() {
        this.context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(false));
        DirectRunner.UncommittedBundle createBundle = this.context.createBundle(this.created);
        createBundle.add(WindowedValue.valueInGlobalWindow(1));
        DirectRunner.CommittedBundle committedBundle = (DirectRunner.CommittedBundle) Iterables.getOnlyElement(this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).addOutput(createBundle, new DirectRunner.UncommittedBundle[0]).build()).getOutputs());
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.unbounded.getProducingTransformInternal()).build());
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(false));
        Iterator<AppliedPTransform<?, ?, ?>> it = this.valueToConsumers.get(this.created).iterator();
        while (it.hasNext()) {
            this.context.handleResult(committedBundle, ImmutableList.of(), StepTransformResult.withoutHold(it.next()).build());
        }
        this.context.extractFiredTimers();
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(true));
    }

    @Test
    public void isDoneWithUnboundedAndNotShutdown() {
        this.context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(false));
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).build());
        this.context.handleResult((DirectRunner.CommittedBundle) null, ImmutableList.of(), StepTransformResult.withoutHold(this.unbounded.getProducingTransformInternal()).build());
        this.context.handleResult(this.context.createBundle(this.created).commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(this.downstream.getProducingTransformInternal()).build());
        this.context.extractFiredTimers();
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(false));
        this.context.handleResult(this.context.createBundle(this.created).commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(this.view.getProducingTransformInternal()).build());
        this.context.extractFiredTimers();
        Assert.assertThat(Boolean.valueOf(this.context.isDone()), Matchers.is(false));
    }
}
