/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.NavigableSet;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.generated.NamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.AggregateWindowOperator;
import org.apache.flink.table.runtime.operators.window.TableAggregateWindowOperator;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

public class WindowOperatorContractTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testAssignerIsInvokedOncePerElement() throws Exception {
        WindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((RowData)Matchers.any(), Matchers.anyLong())).thenReturn(Collections.singletonList(new TimeWindow(0L, 0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)1))).assignWindows((RowData)Matchers.eq((Object)StreamRecordUtils.row("String", 1, 0L)), Matchers.eq((long)0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)2))).assignWindows((RowData)Matchers.eq((Object)StreamRecordUtils.row("String", 1, 0L)), Matchers.eq((long)0L));
    }

    @Test
    public void testAssignerWithMultipleWindowsForAggregate() throws Exception {
        WindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((RowData)Matchers.any(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)2))).getValue((Object)WindowOperatorContractTest.anyTimeWindow());
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).getValue(Matchers.eq((Object)new TimeWindow(0L, 2L)));
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).getValue(Matchers.eq((Object)new TimeWindow(2L, 4L)));
    }

    @Test
    public void testAssignerWithMultipleWindowsForTableAggregate() throws Exception {
        WindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceTableAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockTableAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((RowData)Matchers.any(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((NamespaceTableAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)2))).emitValue((Object)WindowOperatorContractTest.anyTimeWindow(), (RowData)Matchers.any(), (Collector)Matchers.any());
        ((NamespaceTableAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).emitValue(Matchers.eq((Object)new TimeWindow(0L, 2L)), (RowData)Matchers.any(), (Collector)Matchers.any());
        ((NamespaceTableAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).emitValue(Matchers.eq((Object)new TimeWindow(2L, 4L)), (RowData)Matchers.any(), (Collector)Matchers.any());
    }

    @Test
    public void testOnElementCalledPerWindow() throws Exception {
        WindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((RowData)WindowOperatorContractTest.anyGenericRow(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 42, 1L));
        ((Trigger)Mockito.verify(mockTrigger)).onElement(Matchers.eq((Object)StreamRecordUtils.row("String", 42, 1L)), Matchers.eq((long)1L), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)));
        ((Trigger)Mockito.verify(mockTrigger)).onElement(Matchers.eq((Object)StreamRecordUtils.row("String", 42, 1L)), Matchers.eq((long)1L), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)2))).onElement(Matchers.any(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow());
    }

    @Test
    public void testMergeWindowsIsCalled() throws Exception {
        MergingWindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        PowerMockito.when((Object)mockAssigner.assignWindows((RowData)WindowOperatorContractTest.anyGenericRow(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 42, 0L));
        ((MergingWindowAssigner)Mockito.verify(mockAssigner)).mergeWindows((Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
        ((MergingWindowAssigner)Mockito.verify(mockAssigner)).mergeWindows((Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
        ((MergingWindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)2))).mergeWindows((Window)WindowOperatorContractTest.anyTimeWindow(), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
    }

    private <W extends Window> KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOperator(WindowAssigner<W> assigner, Trigger<W> trigger, NamespaceAggsHandleFunctionBase<W> aggregationsFunction, long allowedLateness) throws Exception {
        boolean sendRetraction;
        LogicalType[] inputTypes = new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new IntType()};
        RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, inputTypes);
        InternalTypeInfo keyType = keySelector.getProducedType();
        LogicalType[] accTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
        LogicalType[] windowTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
        LogicalType[] outputTypeWithoutKeys = new LogicalType[]{new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()};
        boolean bl = sendRetraction = allowedLateness > 0L;
        if (aggregationsFunction instanceof NamespaceAggsHandleFunction) {
            AggregateWindowOperator operator = new AggregateWindowOperator((NamespaceAggsHandleFunction)aggregationsFunction, (RecordEqualiser)Mockito.mock(RecordEqualiser.class), assigner, trigger, assigner.getWindowSerializer(new ExecutionConfig()), inputTypes, outputTypeWithoutKeys, accTypes, windowTypes, 2, sendRetraction, allowedLateness, UTC_ZONE_ID);
            return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keyType);
        }
        TableAggregateWindowOperator operator = new TableAggregateWindowOperator((NamespaceTableAggsHandleFunction)aggregationsFunction, assigner, trigger, assigner.getWindowSerializer(new ExecutionConfig()), inputTypes, outputTypeWithoutKeys, accTypes, windowTypes, 2, sendRetraction, allowedLateness, UTC_ZONE_ID);
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keyType);
    }

    private static <W extends Window> NamespaceAggsHandleFunction<W> mockAggsHandleFunction() throws Exception {
        return (NamespaceAggsHandleFunction)Mockito.mock(NamespaceAggsHandleFunction.class);
    }

    private static <W extends Window> NamespaceTableAggsHandleFunction<W> mockTableAggsHandleFunction() throws Exception {
        return (NamespaceTableAggsHandleFunction)Mockito.mock(NamespaceTableAggsHandleFunction.class);
    }

    private <W extends Window> Trigger<W> mockTrigger() throws Exception {
        Trigger mockTrigger = (Trigger)Mockito.mock(Trigger.class);
        PowerMockito.when((Object)mockTrigger.onElement(Matchers.any(), Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        PowerMockito.when((Object)mockTrigger.onEventTime(Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        PowerMockito.when((Object)mockTrigger.onProcessingTime(Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        return mockTrigger;
    }

    private static TimeWindow anyTimeWindow() {
        return (TimeWindow)Mockito.any();
    }

    private static GenericRowData anyGenericRow() {
        return (GenericRowData)Mockito.any();
    }

    private static WindowAssigner<TimeWindow> mockTimeWindowAssigner() throws Exception {
        WindowAssigner mockAssigner = (WindowAssigner)Mockito.mock(WindowAssigner.class);
        PowerMockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new TimeWindow.Serializer());
        PowerMockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        return mockAssigner;
    }

    private static MergingWindowAssigner<TimeWindow> mockMergingAssigner() throws Exception {
        MergingWindowAssigner mockAssigner = (MergingWindowAssigner)Mockito.mock(MergingWindowAssigner.class);
        PowerMockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new TimeWindow.Serializer());
        PowerMockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        return mockAssigner;
    }

    private static MergingWindowAssigner.MergeCallback<TimeWindow> anyMergeCallback() {
        return (MergingWindowAssigner.MergeCallback)Mockito.any();
    }

    private static <T> void shouldFireOnElement(Trigger<TimeWindow> mockTrigger) throws Exception {
        PowerMockito.when((Object)mockTrigger.onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow())).thenReturn((Object)true);
    }
}

