/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window;

import java.util.Arrays;
import java.util.Collections;
import java.util.NavigableSet;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.WindowOperator;
import org.apache.flink.table.runtime.operators.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowKeySelector;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

public class WindowOperatorContractTest {
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testAssignerIsInvokedOncePerElement() throws Exception {
        WindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow> testHarness = this.createWindowOperator(mockAssigner, mockTrigger, mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((BaseRow)Matchers.any(), Matchers.anyLong())).thenReturn(Collections.singletonList(new TimeWindow(0L, 0L)));
        testHarness.processElement(StreamRecordUtils.record("String", 1, 0L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)1))).assignWindows((BaseRow)Matchers.eq((Object)StreamRecordUtils.baserow("String", 1, 0L)), Matchers.eq((long)0L));
        testHarness.processElement(StreamRecordUtils.record("String", 1, 0L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)2))).assignWindows((BaseRow)Matchers.eq((Object)StreamRecordUtils.baserow("String", 1, 0L)), Matchers.eq((long)0L));
    }

    @Test
    public void testAssignerWithMultipleWindows() throws Exception {
        WindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow> testHarness = this.createWindowOperator(mockAssigner, mockTrigger, mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((BaseRow)Matchers.any(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processElement(StreamRecordUtils.record("String", 1, 0L));
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)2))).getValue((Object)WindowOperatorContractTest.anyTimeWindow());
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).getValue(Matchers.eq((Object)new TimeWindow(0L, 2L)));
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).getValue(Matchers.eq((Object)new TimeWindow(2L, 4L)));
    }

    @Test
    public void testOnElementCalledPerWindow() throws Exception {
        WindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow> testHarness = this.createWindowOperator(mockAssigner, mockTrigger, mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((BaseRow)WindowOperatorContractTest.anyGenericRow(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        testHarness.processElement(StreamRecordUtils.record("String", 42, 1L));
        ((Trigger)Mockito.verify(mockTrigger)).onElement(Matchers.eq((Object)StreamRecordUtils.baserow("String", 42, 1L)), Matchers.eq((long)1L), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)));
        ((Trigger)Mockito.verify(mockTrigger)).onElement(Matchers.eq((Object)StreamRecordUtils.baserow("String", 42, 1L)), Matchers.eq((long)1L), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)2))).onElement(Matchers.any(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow());
    }

    @Test
    public void testMergeWindowsIsCalled() throws Exception {
        MergingWindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow> testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, mockTrigger, mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((BaseRow)WindowOperatorContractTest.anyGenericRow(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        testHarness.processElement(StreamRecordUtils.record("String", 42, 0L));
        ((MergingWindowAssigner)Mockito.verify(mockAssigner)).mergeWindows((Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
        ((MergingWindowAssigner)Mockito.verify(mockAssigner)).mergeWindows((Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
        ((MergingWindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)2))).mergeWindows((Window)WindowOperatorContractTest.anyTimeWindow(), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
    }

    private <W extends Window> KeyedOneInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow> createWindowOperator(WindowAssigner<W> assigner, Trigger<W> trigger, NamespaceAggsHandleFunction<W> aggregationsFunction, long allowedLateness) throws Exception {
        LogicalType[] inputTypes = new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new IntType()};
        BinaryRowKeySelector keySelector = new BinaryRowKeySelector(new int[]{0}, inputTypes);
        BaseRowTypeInfo keyType = keySelector.getProducedType();
        LogicalType[] accTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
        LogicalType[] windowTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
        LogicalType[] outputTypeWithoutKeys = new LogicalType[]{new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()};
        boolean sendRetraction = allowedLateness > 0L;
        WindowOperator operator = new WindowOperator(aggregationsFunction, (RecordEqualiser)Mockito.mock(RecordEqualiser.class), assigner, trigger, assigner.getWindowSerializer(new ExecutionConfig()), inputTypes, outputTypeWithoutKeys, accTypes, windowTypes, 2, sendRetraction, allowedLateness);
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keyType);
    }

    private static <W extends Window> NamespaceAggsHandleFunction<W> mockAggsHandleFunction() throws Exception {
        return (NamespaceAggsHandleFunction)Mockito.mock(NamespaceAggsHandleFunction.class);
    }

    private <W extends Window> Trigger<W> mockTrigger() throws Exception {
        Trigger mockTrigger = (Trigger)Mockito.mock(Trigger.class);
        PowerMockito.when((Object)mockTrigger.onElement(Matchers.any(), Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        PowerMockito.when((Object)mockTrigger.onEventTime(Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        PowerMockito.when((Object)mockTrigger.onProcessingTime(Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        return mockTrigger;
    }

    private static TimeWindow anyTimeWindow() {
        return (TimeWindow)Mockito.any();
    }

    private static GenericRow anyGenericRow() {
        return (GenericRow)Mockito.any();
    }

    private static WindowAssigner<TimeWindow> mockTimeWindowAssigner() throws Exception {
        WindowAssigner mockAssigner = (WindowAssigner)Mockito.mock(WindowAssigner.class);
        PowerMockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new TimeWindow.Serializer());
        PowerMockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        return mockAssigner;
    }

    private static MergingWindowAssigner<TimeWindow> mockMergingAssigner() throws Exception {
        MergingWindowAssigner mockAssigner = (MergingWindowAssigner)Mockito.mock(MergingWindowAssigner.class);
        PowerMockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new TimeWindow.Serializer());
        PowerMockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        return mockAssigner;
    }

    private static MergingWindowAssigner.MergeCallback<TimeWindow> anyMergeCallback() {
        return (MergingWindowAssigner.MergeCallback)Mockito.any();
    }

    private static <T> void shouldFireOnElement(Trigger<TimeWindow> mockTrigger) throws Exception {
        PowerMockito.when((Object)mockTrigger.onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow())).thenReturn((Object)true);
    }
}

