package org.apache.beam.runners.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.LateDataDroppingDoFnRunner;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;

/* loaded from: input_file:org/apache/beam/runners/core/ReduceFnTester.class */
public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    private static final String KEY = "TEST_KEY";
    private final WindowFn<Object, W> windowFn;
    private final SideInputReader sideInputReader;
    private final Coder<OutputT> outputCoder;
    private final WindowingStrategy<Object, W> objectStrategy;
    private final ExecutableTriggerStateMachine executableTriggerStateMachine;
    private final ReduceFn<String, InputT, OutputT, W> reduceFn;
    private final PipelineOptions options;
    private final TestInMemoryStateInternals<String> stateInternals = new TestInMemoryStateInternals<>(KEY);
    private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
    private boolean autoAdvanceOutputWatermark = true;
    private final ReduceFnTester<InputT, OutputT, W>.TestOutputWindowedValue testOutputter = new TestOutputWindowedValue();

    /* loaded from: input_file:org/apache/beam/runners/core/ReduceFnTester$TestAssignContext.class */
    private static class TestAssignContext<W extends BoundedWindow> extends WindowFn<Object, W>.AssignContext {
        private Object element;
        private Instant timestamp;
        private BoundedWindow window;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public TestAssignContext(WindowFn<Object, W> windowFn, Object obj, Instant instant, BoundedWindow boundedWindow) {
            super(windowFn);
            Objects.requireNonNull(windowFn);
            this.element = obj;
            this.timestamp = instant;
            this.window = boundedWindow;
        }

        public Object element() {
            return this.element;
        }

        public Instant timestamp() {
            return this.timestamp;
        }

        public BoundedWindow window() {
            return this.window;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/ReduceFnTester$TestOutputWindowedValue.class */
    public class TestOutputWindowedValue implements OutputWindowedValue<KV<String, OutputT>> {
        private List<WindowedValue<KV<String, OutputT>>> outputs;

        private TestOutputWindowedValue() {
            this.outputs = new ArrayList();
        }

        public void outputWindowedValue(KV<String, OutputT> kv, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.outputs.add(WindowedValue.of((KV) SerializableUtils.ensureSerializableByCoder(KvCoder.of(StringUtf8Coder.of(), ReduceFnTester.this.outputCoder), kv, "outputForWindow"), instant, collection, paneInfo));
        }

        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs");
        }

        public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
            outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
        }
    }

    public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception {
        return new ReduceFnTester<>(windowingStrategy, TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger())), SystemReduceFn.buffering(VarIntCoder.of()), IterableCoder.of(VarIntCoder.of()), PipelineOptionsFactory.create(), NullSideInputReader.empty());
    }

    public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(WindowingStrategy<?, W> windowingStrategy, TriggerStateMachine triggerStateMachine) throws Exception {
        return new ReduceFnTester<>(windowingStrategy, triggerStateMachine, SystemReduceFn.buffering(VarIntCoder.of()), IterableCoder.of(VarIntCoder.of()), PipelineOptionsFactory.create(), NullSideInputReader.empty());
    }

    public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(WindowFn<?, W> windowFn, TriggerStateMachine triggerStateMachine, WindowingStrategy.AccumulationMode accumulationMode, Duration duration, Window.ClosingBehavior closingBehavior) throws Exception {
        return nonCombining(WindowingStrategy.of(windowFn).withTimestampCombiner(TimestampCombiner.EARLIEST).withMode(accumulationMode).withAllowedLateness(duration).withClosingBehavior(closingBehavior), triggerStateMachine);
    }

    public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining(WindowingStrategy<?, W> windowingStrategy, Combine.CombineFn<Integer, AccumT, OutputT> combineFn, Coder<OutputT> coder) throws Exception {
        AppliedCombineFn.withInputCoder(combineFn, CoderRegistry.createDefault(), KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
        return combining(windowingStrategy, TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger())), combineFn, coder);
    }

    public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining(WindowingStrategy<?, W> windowingStrategy, TriggerStateMachine triggerStateMachine, Combine.CombineFn<Integer, AccumT, OutputT> combineFn, Coder<OutputT> coder) throws Exception {
        return new ReduceFnTester<>(windowingStrategy, triggerStateMachine, SystemReduceFn.combining(StringUtf8Coder.of(), AppliedCombineFn.withInputCoder(combineFn, CoderRegistry.createDefault(), KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))), coder, PipelineOptionsFactory.create(), NullSideInputReader.empty());
    }

    public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining(WindowingStrategy<?, W> windowingStrategy, CombineWithContext.CombineFnWithContext<Integer, AccumT, OutputT> combineFnWithContext, Coder<OutputT> coder, PipelineOptions pipelineOptions, SideInputReader sideInputReader) throws Exception {
        AppliedCombineFn.withInputCoder(combineFnWithContext, CoderRegistry.createDefault(), KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
        return combining(windowingStrategy, TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger())), combineFnWithContext, coder, pipelineOptions, sideInputReader);
    }

    public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining(WindowingStrategy<?, W> windowingStrategy, TriggerStateMachine triggerStateMachine, CombineWithContext.CombineFnWithContext<Integer, AccumT, OutputT> combineFnWithContext, Coder<OutputT> coder, PipelineOptions pipelineOptions, SideInputReader sideInputReader) throws Exception {
        return new ReduceFnTester<>(windowingStrategy, triggerStateMachine, SystemReduceFn.combining(StringUtf8Coder.of(), AppliedCombineFn.withInputCoder(combineFnWithContext, CoderRegistry.createDefault(), KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))), coder, pipelineOptions, sideInputReader);
    }

    private ReduceFnTester(WindowingStrategy<?, W> windowingStrategy, TriggerStateMachine triggerStateMachine, ReduceFn<String, InputT, OutputT, W> reduceFn, Coder<OutputT> coder, PipelineOptions pipelineOptions, SideInputReader sideInputReader) throws Exception {
        this.objectStrategy = windowingStrategy;
        this.reduceFn = reduceFn;
        this.windowFn = windowingStrategy.getWindowFn();
        this.sideInputReader = sideInputReader;
        this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine);
        this.outputCoder = coder;
        this.options = pipelineOptions;
    }

    public void setAutoAdvanceOutputWatermark(boolean z) {
        this.autoAdvanceOutputWatermark = z;
    }

    @Nullable
    public Instant getNextTimer(TimeDomain timeDomain) {
        return this.timerInternals.getNextTimer(timeDomain);
    }

    ReduceFnRunner<String, InputT, OutputT, W> createRunner() {
        return new ReduceFnRunner<>(KEY, this.objectStrategy, this.executableTriggerStateMachine, this.stateInternals, this.timerInternals, this.testOutputter, this.sideInputReader, this.reduceFn, this.options);
    }

    public boolean isMarkedFinished(W w) {
        return createRunner().isFinished(w);
    }

    public boolean hasNoActiveWindows() {
        return createRunner().hasNoActiveWindows();
    }

    @SafeVarargs
    public final void assertHasOnlyGlobalAndFinishedSetsFor(W... wArr) {
        assertHasOnlyGlobalAndAllowedTags(ImmutableSet.copyOf(wArr), ImmutableSet.of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
    }

    @SafeVarargs
    public final void assertHasOnlyGlobalAndStateFor(W... wArr) {
        assertHasOnlyGlobalAndAllowedTags(ImmutableSet.copyOf(wArr), ImmutableSet.of(this.reduceFn.getBufferTag(), TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG, WatermarkHold.watermarkHoldTagForTimestampCombiner(this.objectStrategy.getTimestampCombiner()), WatermarkHold.EXTRA_HOLD_TAG));
    }

    @SafeVarargs
    public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... wArr) {
        assertHasOnlyGlobalAndAllowedTags(ImmutableSet.copyOf(wArr), ImmutableSet.of(TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG, WatermarkHold.watermarkHoldTagForTimestampCombiner(this.objectStrategy.getTimestampCombiner()), WatermarkHold.EXTRA_HOLD_TAG));
    }

    public final void assertHasOnlyGlobalState() {
        assertHasOnlyGlobalAndAllowedTags(Collections.emptySet(), Collections.emptySet());
    }

    @SafeVarargs
    public final void assertHasOnlyGlobalAndPaneInfoFor(W... wArr) {
        assertHasOnlyGlobalAndAllowedTags(ImmutableSet.copyOf(wArr), ImmutableSet.of(PaneInfoTracker.PANE_INFO_TAG, WatermarkHold.watermarkHoldTagForTimestampCombiner(this.objectStrategy.getTimestampCombiner()), WatermarkHold.EXTRA_HOLD_TAG));
    }

    private void assertHasOnlyGlobalAndAllowedTags(Set<W> set, Set<StateTag<?>> set2) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Iterator<StateTag<?>> it = set2.iterator();
        while (it.hasNext()) {
            hashSet2.add(StateTags.ID_EQUIVALENCE.wrap(it.next()));
        }
        Iterator<W> it2 = set.iterator();
        while (it2.hasNext()) {
            hashSet.add(windowNamespace(it2.next()));
        }
        HashMap hashMap = new HashMap();
        for (StateNamespace stateNamespace : this.stateInternals.getNamespacesInUse()) {
            if (!(stateNamespace instanceof StateNamespaces.GlobalNamespace)) {
                if (stateNamespace instanceof StateNamespaces.WindowNamespace) {
                    HashSet hashSet3 = new HashSet();
                    Iterator it3 = this.stateInternals.getTagsInUse(stateNamespace).iterator();
                    while (it3.hasNext()) {
                        hashSet3.add(StateTags.ID_EQUIVALENCE.wrap((StateTag) it3.next()));
                    }
                    if (!hashSet3.isEmpty()) {
                        hashMap.put(stateNamespace, hashSet3);
                        if (!Sets.difference(hashSet3, hashSet2).isEmpty()) {
                            Assert.fail(stateNamespace + " has unexpected states: " + hashSet3);
                        }
                    }
                } else if (stateNamespace instanceof StateNamespaces.WindowAndTriggerNamespace) {
                    HashSet hashSet4 = new HashSet();
                    Iterator it4 = this.stateInternals.getTagsInUse(stateNamespace).iterator();
                    while (it4.hasNext()) {
                        hashSet4.add(StateTags.ID_EQUIVALENCE.wrap((StateTag) it4.next()));
                    }
                    Assert.assertTrue(stateNamespace + " contains " + hashSet4, hashSet4.isEmpty());
                } else {
                    Assert.fail("Unrecognized namespace " + stateNamespace);
                }
            }
        }
        Assert.assertEquals("Still in use: " + hashMap.toString(), hashSet, hashMap.keySet());
    }

    private StateNamespace windowNamespace(W w) {
        return StateNamespaces.window(this.windowFn.windowCoder(), w);
    }

    public Instant getWatermarkHold() {
        return this.stateInternals.earliestWatermarkHold();
    }

    public Instant getOutputWatermark() {
        return this.timerInternals.currentOutputWatermarkTime();
    }

    public int getOutputSize() {
        return ((TestOutputWindowedValue) this.testOutputter).outputs.size();
    }

    public List<WindowedValue<OutputT>> extractOutput() {
        ImmutableList list = FluentIterable.from(((TestOutputWindowedValue) this.testOutputter).outputs).transform(windowedValue -> {
            return windowedValue.withValue(((KV) windowedValue.getValue()).getValue());
        }).toList();
        ((TestOutputWindowedValue) this.testOutputter).outputs.clear();
        return list;
    }

    public void advanceInputWatermarkNoTimers(Instant instant) throws Exception {
        this.timerInternals.advanceInputWatermark(instant);
    }

    public void advanceInputWatermark(Instant instant) throws Exception {
        this.timerInternals.advanceInputWatermark(instant);
        ReduceFnRunner<String, InputT, OutputT, W> createRunner = createRunner();
        while (true) {
            ArrayList arrayList = new ArrayList();
            while (true) {
                TimerInternals.TimerData removeNextEventTimer = this.timerInternals.removeNextEventTimer();
                if (removeNextEventTimer == null) {
                    break;
                } else {
                    arrayList.add(removeNextEventTimer);
                }
            }
            if (arrayList.isEmpty()) {
                break;
            } else {
                createRunner.onTimers(arrayList);
            }
        }
        if (this.autoAdvanceOutputWatermark) {
            Instant earliestWatermarkHold = this.stateInternals.earliestWatermarkHold();
            if (earliestWatermarkHold == null) {
                WindowTracing.trace("TestInMemoryTimerInternals.advanceInputWatermark: no holds, so output watermark = input watermark", new Object[0]);
                earliestWatermarkHold = this.timerInternals.currentInputWatermarkTime();
            }
            advanceOutputWatermark(earliestWatermarkHold);
        }
        createRunner.persist();
    }

    public void advanceProcessingTimeNoTimers(Instant instant) throws Exception {
        this.timerInternals.advanceProcessingTime(instant);
    }

    public void advanceOutputWatermark(Instant instant) throws Exception {
        this.timerInternals.advanceOutputWatermark(instant);
    }

    public void advanceProcessingTime(Instant instant) throws Exception {
        this.timerInternals.advanceProcessingTime(instant);
        ReduceFnRunner<String, InputT, OutputT, W> createRunner = createRunner();
        while (true) {
            ArrayList arrayList = new ArrayList();
            while (true) {
                TimerInternals.TimerData removeNextProcessingTimer = this.timerInternals.removeNextProcessingTimer();
                if (removeNextProcessingTimer == null) {
                    break;
                } else {
                    arrayList.add(removeNextProcessingTimer);
                }
            }
            if (arrayList.isEmpty()) {
                createRunner.persist();
                return;
            }
            createRunner.onTimers(arrayList);
        }
    }

    public void advanceSynchronizedProcessingTime(Instant instant) throws Exception {
        this.timerInternals.advanceSynchronizedProcessingTime(instant);
        ReduceFnRunner<String, InputT, OutputT, W> createRunner = createRunner();
        while (true) {
            ArrayList arrayList = new ArrayList();
            while (true) {
                TimerInternals.TimerData removeNextSynchronizedProcessingTimer = this.timerInternals.removeNextSynchronizedProcessingTimer();
                if (removeNextSynchronizedProcessingTimer == null) {
                    break;
                } else {
                    arrayList.add(removeNextSynchronizedProcessingTimer);
                }
            }
            if (arrayList.isEmpty()) {
                createRunner.persist();
                return;
            }
            createRunner.onTimers(arrayList);
        }
    }

    @SafeVarargs
    public final void injectElements(TimestampedValue<InputT>... timestampedValueArr) throws Exception {
        injectElements(Arrays.asList(timestampedValueArr));
    }

    public final void injectElements(List<TimestampedValue<InputT>> list) throws Exception {
        Iterator<TimestampedValue<InputT>> it = list.iterator();
        while (it.hasNext()) {
            WindowTracing.trace("TriggerTester.injectElements: {}", new Object[]{it.next()});
        }
        Iterable iterable = (Iterable) list.stream().map(timestampedValue -> {
            try {
                Object value = timestampedValue.getValue();
                Instant timestamp = timestampedValue.getTimestamp();
                return WindowedValue.of(value, timestamp, this.windowFn.assignWindows(new TestAssignContext(this.windowFn, value, timestamp, GlobalWindow.INSTANCE)), PaneInfo.NO_FIRING);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
        ReduceFnRunner<String, InputT, OutputT, W> createRunner = createRunner();
        createRunner.processElements(new LateDataDroppingDoFnRunner.LateDataFilter(this.objectStrategy, this.timerInternals).filter(KEY, iterable));
        createRunner.persist();
    }

    public void fireTimer(W w, Instant instant, TimeDomain timeDomain) throws Exception {
        ReduceFnRunner<String, InputT, OutputT, W> createRunner = createRunner();
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(TimerInternals.TimerData.of(StateNamespaces.window(this.windowFn.windowCoder(), w), instant, timeDomain));
        createRunner.onTimers(arrayList);
        createRunner.persist();
    }

    public void fireTimers(W w, TimestampedValue<TimeDomain>... timestampedValueArr) throws Exception {
        ReduceFnRunner<String, InputT, OutputT, W> createRunner = createRunner();
        ArrayList arrayList = new ArrayList(timestampedValueArr.length);
        for (TimestampedValue<TimeDomain> timestampedValue : timestampedValueArr) {
            arrayList.add(TimerInternals.TimerData.of(StateNamespaces.window(this.windowFn.windowCoder(), w), timestampedValue.getTimestamp(), (TimeDomain) timestampedValue.getValue()));
        }
        createRunner.onTimers(arrayList);
        createRunner.persist();
    }
}
