package org.apache.flink.table.runtime.operators.window;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.NavigableSet;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.generated.NamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.class */
public class WindowOperatorContractTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");

    @Test
    public void testAssignerIsInvokedOncePerElement() throws Exception {
        WindowAssigner<TimeWindow> mockTimeWindowAssigner = mockTimeWindowAssigner();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOperator = createWindowOperator(mockTimeWindowAssigner, mockTrigger(), mockAggsHandleFunction(), 0L);
        createWindowOperator.open();
        PowerMockito.when(mockTimeWindowAssigner.assignWindows((RowData) Matchers.any(), Matchers.anyLong())).thenReturn(Collections.singletonList(new TimeWindow(0L, 0L)));
        createWindowOperator.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((WindowAssigner) Mockito.verify(mockTimeWindowAssigner, Mockito.times(1))).assignWindows((RowData) Matchers.eq(StreamRecordUtils.row("String", 1, 0L)), Matchers.eq(0L));
        createWindowOperator.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((WindowAssigner) Mockito.verify(mockTimeWindowAssigner, Mockito.times(2))).assignWindows((RowData) Matchers.eq(StreamRecordUtils.row("String", 1, 0L)), Matchers.eq(0L));
    }

    @Test
    public void testAssignerWithMultipleWindowsForAggregate() throws Exception {
        WindowAssigner<TimeWindow> mockTimeWindowAssigner = mockTimeWindowAssigner();
        Trigger mockTrigger = mockTrigger();
        NamespaceAggsHandleFunction mockAggsHandleFunction = mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOperator = createWindowOperator(mockTimeWindowAssigner, mockTrigger, mockAggsHandleFunction, 0L);
        createWindowOperator.open();
        PowerMockito.when(mockTimeWindowAssigner.assignWindows((RowData) Matchers.any(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        shouldFireOnElement(mockTrigger);
        createWindowOperator.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((NamespaceAggsHandleFunction) Mockito.verify(mockAggsHandleFunction, Mockito.times(2))).getValue(anyTimeWindow());
        ((NamespaceAggsHandleFunction) Mockito.verify(mockAggsHandleFunction, Mockito.times(1))).getValue(Matchers.eq(new TimeWindow(0L, 2L)));
        ((NamespaceAggsHandleFunction) Mockito.verify(mockAggsHandleFunction, Mockito.times(1))).getValue(Matchers.eq(new TimeWindow(2L, 4L)));
    }

    @Test
    public void testAssignerWithMultipleWindowsForTableAggregate() throws Exception {
        WindowAssigner<TimeWindow> mockTimeWindowAssigner = mockTimeWindowAssigner();
        Trigger mockTrigger = mockTrigger();
        NamespaceTableAggsHandleFunction mockTableAggsHandleFunction = mockTableAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOperator = createWindowOperator(mockTimeWindowAssigner, mockTrigger, mockTableAggsHandleFunction, 0L);
        createWindowOperator.open();
        PowerMockito.when(mockTimeWindowAssigner.assignWindows((RowData) Matchers.any(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        shouldFireOnElement(mockTrigger);
        createWindowOperator.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((NamespaceTableAggsHandleFunction) Mockito.verify(mockTableAggsHandleFunction, Mockito.times(2))).emitValue(anyTimeWindow(), (RowData) Matchers.any(), (Collector) Matchers.any());
        ((NamespaceTableAggsHandleFunction) Mockito.verify(mockTableAggsHandleFunction, Mockito.times(1))).emitValue(Matchers.eq(new TimeWindow(0L, 2L)), (RowData) Matchers.any(), (Collector) Matchers.any());
        ((NamespaceTableAggsHandleFunction) Mockito.verify(mockTableAggsHandleFunction, Mockito.times(1))).emitValue(Matchers.eq(new TimeWindow(2L, 4L)), (RowData) Matchers.any(), (Collector) Matchers.any());
    }

    @Test
    public void testOnElementCalledPerWindow() throws Exception {
        WindowAssigner<TimeWindow> mockTimeWindowAssigner = mockTimeWindowAssigner();
        Trigger mockTrigger = mockTrigger();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOperator = createWindowOperator(mockTimeWindowAssigner, mockTrigger, mockAggsHandleFunction(), 0L);
        createWindowOperator.open();
        PowerMockito.when(mockTimeWindowAssigner.assignWindows(anyGenericRow(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        createWindowOperator.processElement(StreamRecordUtils.insertRecord("String", 42, 1L));
        ((Trigger) Mockito.verify(mockTrigger)).onElement(Matchers.eq(StreamRecordUtils.row("String", 42, 1L)), Matchers.eq(1L), (Window) Matchers.eq(new TimeWindow(2L, 4L)));
        ((Trigger) Mockito.verify(mockTrigger)).onElement(Matchers.eq(StreamRecordUtils.row("String", 42, 1L)), Matchers.eq(1L), (Window) Matchers.eq(new TimeWindow(0L, 2L)));
        ((Trigger) Mockito.verify(mockTrigger, Mockito.times(2))).onElement(Matchers.any(), Matchers.anyLong(), anyTimeWindow());
    }

    @Test
    public void testMergeWindowsIsCalled() throws Exception {
        MergingWindowAssigner<TimeWindow> mockMergingAssigner = mockMergingAssigner();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOperator = createWindowOperator(mockMergingAssigner, mockTrigger(), mockAggsHandleFunction(), 0L);
        createWindowOperator.open();
        PowerMockito.when(mockMergingAssigner.assignWindows(anyGenericRow(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assertions.assertThat(createWindowOperator.getOutput()).isEmpty();
        createWindowOperator.processElement(StreamRecordUtils.insertRecord("String", 42, 0L));
        ((MergingWindowAssigner) Mockito.verify(mockMergingAssigner)).mergeWindows((Window) Matchers.eq(new TimeWindow(2L, 4L)), (NavigableSet) Matchers.any(), anyMergeCallback());
        ((MergingWindowAssigner) Mockito.verify(mockMergingAssigner)).mergeWindows((Window) Matchers.eq(new TimeWindow(0L, 2L)), (NavigableSet) Matchers.any(), anyMergeCallback());
        ((MergingWindowAssigner) Mockito.verify(mockMergingAssigner, Mockito.times(2))).mergeWindows(anyTimeWindow(), (NavigableSet) Matchers.any(), anyMergeCallback());
    }

    private <W extends Window> KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOperator(WindowAssigner<W> windowAssigner, Trigger<W> trigger, NamespaceAggsHandleFunctionBase<W> namespaceAggsHandleFunctionBase, long j) throws Exception {
        LogicalType[] logicalTypeArr = {VarCharType.STRING_TYPE, new IntType()};
        RowDataKeySelector rowDataSelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, logicalTypeArr);
        InternalTypeInfo producedType = rowDataSelector.getProducedType();
        LogicalType[] logicalTypeArr2 = {new BigIntType(), new BigIntType()};
        LogicalType[] logicalTypeArr3 = {new BigIntType(), new BigIntType()};
        LogicalType[] logicalTypeArr4 = {new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()};
        boolean z = j > 0;
        return namespaceAggsHandleFunctionBase instanceof NamespaceAggsHandleFunction ? new KeyedOneInputStreamOperatorTestHarness<>((OneInputStreamOperator) new AggregateWindowOperator((NamespaceAggsHandleFunction) namespaceAggsHandleFunctionBase, (RecordEqualiser) Mockito.mock(RecordEqualiser.class), windowAssigner, trigger, windowAssigner.getWindowSerializer(new ExecutionConfig()), logicalTypeArr, logicalTypeArr4, logicalTypeArr2, logicalTypeArr3, 2, z, j, UTC_ZONE_ID, -1), (KeySelector) rowDataSelector, (TypeInformation) producedType) : new KeyedOneInputStreamOperatorTestHarness<>((OneInputStreamOperator) new TableAggregateWindowOperator((NamespaceTableAggsHandleFunction) namespaceAggsHandleFunctionBase, windowAssigner, trigger, windowAssigner.getWindowSerializer(new ExecutionConfig()), logicalTypeArr, logicalTypeArr4, logicalTypeArr2, logicalTypeArr3, 2, z, j, UTC_ZONE_ID, -1), (KeySelector) rowDataSelector, (TypeInformation) producedType);
    }

    private static <W extends Window> NamespaceAggsHandleFunction<W> mockAggsHandleFunction() throws Exception {
        return (NamespaceAggsHandleFunction) Mockito.mock(NamespaceAggsHandleFunction.class);
    }

    private static <W extends Window> NamespaceTableAggsHandleFunction<W> mockTableAggsHandleFunction() throws Exception {
        NamespaceTableAggsHandleFunction<W> namespaceTableAggsHandleFunction = (NamespaceTableAggsHandleFunction) Mockito.mock(NamespaceTableAggsHandleFunction.class);
        PowerMockito.when(namespaceTableAggsHandleFunction.getAccumulators()).thenReturn(GenericRowData.of(new Object[0]));
        return namespaceTableAggsHandleFunction;
    }

    private <W extends Window> Trigger<W> mockTrigger() throws Exception {
        Trigger<W> trigger = (Trigger) Mockito.mock(Trigger.class);
        PowerMockito.when(Boolean.valueOf(trigger.onElement(Matchers.any(), Matchers.anyLong(), (Window) Matchers.any()))).thenReturn(false);
        PowerMockito.when(Boolean.valueOf(trigger.onEventTime(Matchers.anyLong(), (Window) Matchers.any()))).thenReturn(false);
        PowerMockito.when(Boolean.valueOf(trigger.onProcessingTime(Matchers.anyLong(), (Window) Matchers.any()))).thenReturn(false);
        return trigger;
    }

    private static TimeWindow anyTimeWindow() {
        return (TimeWindow) Mockito.any();
    }

    private static GenericRowData anyGenericRow() {
        return (GenericRowData) Mockito.any();
    }

    private static WindowAssigner<TimeWindow> mockTimeWindowAssigner() throws Exception {
        WindowAssigner<TimeWindow> windowAssigner = (WindowAssigner) Mockito.mock(WindowAssigner.class);
        PowerMockito.when(windowAssigner.getWindowSerializer((ExecutionConfig) Mockito.any())).thenReturn(new TimeWindow.Serializer());
        PowerMockito.when(Boolean.valueOf(windowAssigner.isEventTime())).thenReturn(true);
        return windowAssigner;
    }

    private static MergingWindowAssigner<TimeWindow> mockMergingAssigner() throws Exception {
        MergingWindowAssigner<TimeWindow> mergingWindowAssigner = (MergingWindowAssigner) Mockito.mock(MergingWindowAssigner.class);
        PowerMockito.when(mergingWindowAssigner.getWindowSerializer((ExecutionConfig) Mockito.any())).thenReturn(new TimeWindow.Serializer());
        PowerMockito.when(Boolean.valueOf(mergingWindowAssigner.isEventTime())).thenReturn(true);
        return mergingWindowAssigner;
    }

    private static MergingWindowAssigner.MergeCallback<TimeWindow> anyMergeCallback() {
        return (MergingWindowAssigner.MergeCallback) Mockito.any();
    }

    private static <T> void shouldFireOnElement(Trigger<TimeWindow> trigger) throws Exception {
        PowerMockito.when(Boolean.valueOf(trigger.onElement(Matchers.anyObject(), Matchers.anyLong(), anyTimeWindow()))).thenReturn(true);
    }
}
