/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collections;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class WindowDoFnOperatorTest {
    @Test
    public void testRestore() throws @UnknownKeyFor @NonNull @Initialized Exception {
        KeyedOneInputStreamOperatorTestHarness<ByteBuffer, WindowedValue<KeyedWorkItem<Long, Long>>, WindowedValue<KV<Long, Long>>> testHarness = this.createTestHarness(this.getWindowDoFnOperator());
        testHarness.open();
        IntervalWindow window = new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.millis((long)10000L));
        testHarness.processWatermark(0L);
        testHarness.processElement(Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord());
        testHarness.processElement(Item.builder().key(1L).timestamp(2L).value(20L).window(window).build().toStreamRecord());
        testHarness.processElement(Item.builder().key(2L).timestamp(3L).value(77L).window(window).build().toStreamRecord());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.createTestHarness(this.getWindowDoFnOperator());
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(10000L);
        Iterable output = StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput());
        Assert.assertEquals((long)2L, (long)Iterables.size(output));
        MatcherAssert.assertThat(output, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.of((Object)KV.of((Object)1L, (Object)120L), (Instant)new Instant(9999L), (BoundedWindow)window, (PaneInfo)PaneInfo.createPane((boolean)true, (boolean)true, (PaneInfo.Timing)PaneInfo.Timing.ON_TIME)), WindowedValue.of((Object)KV.of((Object)2L, (Object)77L), (Instant)new Instant(9999L), (BoundedWindow)window, (PaneInfo)PaneInfo.createPane((boolean)true, (boolean)true, (PaneInfo.Timing)PaneInfo.Timing.ON_TIME))}));
        testHarness.close();
    }

    @Test
    public void testTimerCleanupOfPendingTimerList() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowDoFnOperator<Long, Long, Long> windowDoFnOperator = this.getWindowDoFnOperator();
        KeyedOneInputStreamOperatorTestHarness<ByteBuffer, WindowedValue<KeyedWorkItem<Long, Long>>, WindowedValue<KV<Long, Long>>> testHarness = this.createTestHarness(windowDoFnOperator);
        testHarness.open();
        DoFnOperator.FlinkTimerInternals timerInternals = windowDoFnOperator.timerInternals;
        IntervalWindow window = new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.millis((long)100L));
        IntervalWindow window2 = new IntervalWindow(new Instant(100L), (ReadableDuration)Duration.millis((long)100L));
        testHarness.processWatermark(0L);
        testHarness.processElement(Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord());
        testHarness.processElement(Item.builder().key(2L).timestamp(150L).value(150L).window(window2).build().toStreamRecord());
        testHarness.processWatermark(1L);
        MatcherAssert.assertThat((Object)Iterables.size((Iterable)timerInternals.pendingTimersById.keys()), (Matcher)Is.is((Object)1));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)Is.is((Object)6));
        MatcherAssert.assertThat((Object)windowDoFnOperator.getCurrentOutputWatermark(), (Matcher)Is.is((Object)1L));
        testHarness.processWatermark(100L);
        MatcherAssert.assertThat((Object)Iterables.size((Iterable)timerInternals.pendingTimersById.keys()), (Matcher)Is.is((Object)0));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)Is.is((Object)3));
        MatcherAssert.assertThat((Object)windowDoFnOperator.getCurrentOutputWatermark(), (Matcher)Is.is((Object)100L));
        testHarness.processWatermark(200L);
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)Is.is((Object)0));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.of((Object)KV.of((Object)1L, (Object)100L), (Instant)new Instant(99L), (BoundedWindow)window, (PaneInfo)PaneInfo.createPane((boolean)true, (boolean)true, (PaneInfo.Timing)PaneInfo.Timing.ON_TIME)), WindowedValue.of((Object)KV.of((Object)2L, (Object)150L), (Instant)new Instant(199L), (BoundedWindow)window2, (PaneInfo)PaneInfo.createPane((boolean)true, (boolean)true, (PaneInfo.Timing)PaneInfo.Timing.ON_TIME))}));
        testHarness.close();
    }

    private @UnknownKeyFor @NonNull @Initialized WindowDoFnOperator<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long> getWindowDoFnOperator() {
        WindowingStrategy windowingStrategy = WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L)));
        TupleTag outputTag = new TupleTag("main-output");
        SystemReduceFn reduceFn = SystemReduceFn.combining((Coder)VarLongCoder.of(), (AppliedCombineFn)AppliedCombineFn.withInputCoder((CombineFnBase.GlobalCombineFn)Sum.ofLongs(), (CoderRegistry)CoderRegistry.createDefault(), (KvCoder)KvCoder.of((Coder)VarLongCoder.of(), (Coder)VarLongCoder.of())));
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        SingletonKeyedWorkItemCoder workItemCoder = SingletonKeyedWorkItemCoder.of((Coder)VarLongCoder.of(), (Coder)VarLongCoder.of(), (Coder)windowCoder);
        WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)workItemCoder, (Coder)windowCoder);
        WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.getFullCoder((Coder)KvCoder.of((Coder)VarLongCoder.of(), (Coder)VarLongCoder.of()), (Coder)windowCoder);
        return new WindowDoFnOperator(reduceFn, "stepName", (Coder)inputCoder, outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)outputCoder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), windowingStrategy, Collections.emptyMap(), Collections.emptyList(), (PipelineOptions)FlinkPipelineOptions.defaults(), (Coder)VarLongCoder.of(), (KeySelector)new WorkItemKeySelector((Coder)VarLongCoder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())));
    }

    private @UnknownKeyFor @NonNull @Initialized KeyedOneInputStreamOperatorTestHarness<@UnknownKeyFor @NonNull @Initialized ByteBuffer, @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long>>, @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long>>> createTestHarness(@UnknownKeyFor @NonNull @Initialized WindowDoFnOperator<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long> windowDoFnOperator) throws @UnknownKeyFor @NonNull @Initialized Exception {
        return new KeyedOneInputStreamOperatorTestHarness(windowDoFnOperator, (KeySelector & Serializable)o -> {
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
                VarLongCoder.of().encode((Long)((KeyedWorkItem)o.getValue()).key(), (OutputStream)baos);
                ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
                return byteBuffer;
            }
        }, (TypeInformation)new GenericTypeInfo(ByteBuffer.class));
    }

    private static class Item {
        private @UnknownKeyFor @NonNull @Initialized long key;
        private @UnknownKeyFor @NonNull @Initialized long value;
        private @UnknownKeyFor @NonNull @Initialized long timestamp;
        private @UnknownKeyFor @NonNull @Initialized IntervalWindow window;

        private Item() {
        }

        static @UnknownKeyFor @NonNull @Initialized ItemBuilder builder() {
            return new ItemBuilder();
        }

        @UnknownKeyFor @NonNull @Initialized StreamRecord<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long>>> toStreamRecord() {
            WindowedValue item = WindowedValue.of((Object)this.value, (Instant)new Instant(this.timestamp), (BoundedWindow)this.window, (PaneInfo)PaneInfo.NO_FIRING);
            WindowedValue keyedItem = WindowedValue.of((Object)new SingletonKeyedWorkItem((Object)this.key, item), (Instant)new Instant(this.timestamp), (BoundedWindow)this.window, (PaneInfo)PaneInfo.NO_FIRING);
            return new StreamRecord((Object)keyedItem);
        }

        private static final class ItemBuilder {
            private @UnknownKeyFor @NonNull @Initialized long key;
            private @UnknownKeyFor @NonNull @Initialized long value;
            private @UnknownKeyFor @NonNull @Initialized long timestamp;
            private @UnknownKeyFor @NonNull @Initialized IntervalWindow window;

            private ItemBuilder() {
            }

            @UnknownKeyFor @NonNull @Initialized ItemBuilder key(@UnknownKeyFor @NonNull @Initialized long key) {
                this.key = key;
                return this;
            }

            @UnknownKeyFor @NonNull @Initialized ItemBuilder value(@UnknownKeyFor @NonNull @Initialized long value) {
                this.value = value;
                return this;
            }

            @UnknownKeyFor @NonNull @Initialized ItemBuilder timestamp(@UnknownKeyFor @NonNull @Initialized long timestamp) {
                this.timestamp = timestamp;
                return this;
            }

            @UnknownKeyFor @NonNull @Initialized ItemBuilder window(@UnknownKeyFor @NonNull @Initialized IntervalWindow window) {
                this.window = window;
                return this;
            }

            @UnknownKeyFor @NonNull @Initialized Item build() {
                Item item = new Item();
                item.key = this.key;
                item.value = this.value;
                item.window = this.window;
                item.timestamp = this.timestamp;
                return item;
            }
        }
    }
}

