package org.apache.flink.state.api.input;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.functions.WindowReaderFunction;
import org.apache.flink.state.api.input.operator.WindowReaderOperator;
import org.apache.flink.state.api.input.operator.window.PassThroughReader;
import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.state.api.utils.AggregateSum;
import org.apache.flink.state.api.utils.ReduceSum;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/state/api/input/WindowReaderTest.class */
public class WindowReaderTest {
    private static final int MAX_PARALLELISM = 128;
    private static final String UID = "uid";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/api/input/WindowReaderTest$AlwaysFireTrigger.class */
    public static class AlwaysFireTrigger<W extends Window> extends Trigger<Object, W> {
        private AlwaysFireTrigger() {
        }

        public TriggerResult onElement(Object obj, long j, W w, Trigger.TriggerContext triggerContext) throws Exception {
            return TriggerResult.FIRE;
        }

        public TriggerResult onProcessingTime(long j, W w, Trigger.TriggerContext triggerContext) throws Exception {
            return TriggerResult.FIRE;
        }

        public TriggerResult onEventTime(long j, W w, Trigger.TriggerContext triggerContext) throws Exception {
            return TriggerResult.FIRE;
        }

        public void clear(W w, Trigger.TriggerContext triggerContext) throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/api/input/WindowReaderTest$IdentityKeySelector.class */
    public static class IdentityKeySelector<T> implements KeySelector<T, T> {
        private IdentityKeySelector() {
        }

        public T getKey(T t) {
            return t;
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/input/WindowReaderTest$MultiFireReaderFunction.class */
    private static class MultiFireReaderFunction extends WindowReaderFunction<Integer, Tuple2<Integer, Integer>, Integer, TimeWindow> {
        private MultiFireReaderFunction() {
        }

        public void readWindow(Integer num, WindowReaderFunction.Context<TimeWindow> context, Iterable<Integer> iterable, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
            collector.collect(Tuple2.of((Integer) context.globalState().getReducingState(new ReducingStateDescriptor("per-key", new ReduceSum(), Types.INT)).get(), (Integer) context.windowState().getReducingState(new ReducingStateDescriptor("per-pane", new ReduceSum(), Types.INT)).get()));
        }

        public /* bridge */ /* synthetic */ void readWindow(Object obj, WindowReaderFunction.Context context, Iterable iterable, Collector collector) throws Exception {
            readWindow((Integer) obj, (WindowReaderFunction.Context<TimeWindow>) context, (Iterable<Integer>) iterable, (Collector<Tuple2<Integer, Integer>>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/api/input/WindowReaderTest$MultiFireWindow.class */
    public static class MultiFireWindow extends ProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
        private MultiFireWindow() {
        }

        public void process(Integer num, ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context context, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
            Integer next = iterable.iterator().next();
            context.globalState().getReducingState(new ReducingStateDescriptor("per-key", new ReduceSum(), Types.INT)).add(next);
            context.windowState().getReducingState(new ReducingStateDescriptor("per-pane", new ReduceSum(), Types.INT)).add(next);
        }

        public /* bridge */ /* synthetic */ void process(Object obj, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
            process((Integer) obj, (ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context) context, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
        }
    }

    @Test
    public void testReducingWindow() throws Exception {
        Assert.assertEquals(Arrays.asList(1, 1), readState(new KeyedStateInputFormat(getOperatorState(getWindowOperator(keyedStream -> {
            return keyedStream.window(TumblingEventTimeWindows.of(Time.milliseconds(1L))).reduce(new ReduceSum());
        })), new MemoryStateBackend(), new Configuration(), WindowReaderOperator.reduce(new ReduceSum(), new PassThroughReader(), Types.INT, new TimeWindow.Serializer(), Types.INT), new ExecutionConfig())));
    }

    @Test
    public void testSessionWindow() throws Exception {
        Assert.assertEquals(Collections.singletonList(2), readState(new KeyedStateInputFormat(getOperatorState(getWindowOperator(keyedStream -> {
            return keyedStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(3L))).reduce(new ReduceSum());
        })), new MemoryStateBackend(), new Configuration(), WindowReaderOperator.reduce(new ReduceSum(), new PassThroughReader(), Types.INT, new TimeWindow.Serializer(), Types.INT), new ExecutionConfig())));
    }

    @Test
    public void testAggregateWindow() throws Exception {
        Assert.assertEquals(Arrays.asList(1, 1), readState(new KeyedStateInputFormat(getOperatorState(getWindowOperator(keyedStream -> {
            return keyedStream.window(TumblingEventTimeWindows.of(Time.milliseconds(1L))).aggregate(new AggregateSum());
        })), new MemoryStateBackend(), new Configuration(), WindowReaderOperator.aggregate(new AggregateSum(), new PassThroughReader(), Types.INT, new TimeWindow.Serializer(), Types.INT), new ExecutionConfig())));
    }

    @Test
    public void testProcessReader() throws Exception {
        Assert.assertEquals(Arrays.asList(1, 1), readState(new KeyedStateInputFormat(getOperatorState(getWindowOperator(keyedStream -> {
            return keyedStream.window(TumblingEventTimeWindows.of(Time.milliseconds(1L))).process(mockProcessWindowFunction(), Types.INT);
        })), new MemoryStateBackend(), new Configuration(), WindowReaderOperator.process(new PassThroughReader(), Types.INT, new TimeWindow.Serializer(), Types.INT), new ExecutionConfig())));
    }

    @Test
    public void testPerPaneAndPerKeyState() throws Exception {
        Assert.assertEquals(Arrays.asList(Tuple2.of(2, 1), Tuple2.of(2, 1)), readState(new KeyedStateInputFormat(getOperatorState(getWindowOperator(keyedStream -> {
            return keyedStream.window(TumblingEventTimeWindows.of(Time.milliseconds(1L))).trigger(new AlwaysFireTrigger()).process(new MultiFireWindow(), Types.INT);
        })), new MemoryStateBackend(), new Configuration(), WindowReaderOperator.process(new MultiFireReaderFunction(), Types.INT, new TimeWindow.Serializer(), Types.INT), new ExecutionConfig())));
    }

    private static WindowOperator<Integer, Integer, ?, Void, ?> getWindowOperator(Function<KeyedStream<Integer, Integer>, SingleOutputStreamOperator<Integer>> function) {
        return getLastOperator(function.apply(StreamExecutionEnvironment.getExecutionEnvironment().addSource(mockSourceFunction()).returns(Integer.class).keyBy(new IdentityKeySelector())).uid(UID));
    }

    private static SourceFunction<Integer> mockSourceFunction() {
        return (SourceFunction) Mockito.mock(SourceFunction.class);
    }

    private static <W extends Window> ProcessWindowFunction<Integer, Integer, Integer, W> mockProcessWindowFunction() {
        return (ProcessWindowFunction) Mockito.mock(ProcessWindowFunction.class);
    }

    private static OperatorState getOperatorState(WindowOperator<Integer, Integer, ?, Void, ?> windowOperator) throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new IdentityKeySelector(), Types.INT, MAX_PARALLELISM, 1, 0);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(1, 0L);
        keyedOneInputStreamOperatorTestHarness.processElement(1, 1L);
        OperatorSubtaskState snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        keyedOneInputStreamOperatorTestHarness.close();
        OperatorState operatorState = new OperatorState(OperatorIDGenerator.fromUid(UID), 1, MAX_PARALLELISM);
        operatorState.putState(0, snapshot);
        return operatorState;
    }

    private static <T> WindowOperator<Integer, Integer, ?, Void, ?> getLastOperator(DataStream<T> dataStream) {
        OneInputTransformation transformation = dataStream.getTransformation();
        if (!(transformation instanceof OneInputTransformation)) {
            Assert.fail("This test only supports window operators");
        }
        WindowOperator<Integer, Integer, ?, Void, ?> operator = transformation.getOperator();
        if (!(operator instanceof WindowOperator)) {
            Assert.fail("This test only supports window operators");
        }
        return operator;
    }

    @Nonnull
    private <OUT> List<OUT> readState(KeyedStateInputFormat<Integer, TimeWindow, OUT> keyedStateInputFormat) throws IOException {
        KeyGroupRangeInputSplit keyGroupRangeInputSplit = keyedStateInputFormat.createInputSplits(1)[0];
        ArrayList arrayList = new ArrayList();
        keyedStateInputFormat.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0));
        keyedStateInputFormat.openInputFormat();
        keyedStateInputFormat.open(keyGroupRangeInputSplit);
        while (!keyedStateInputFormat.reachedEnd()) {
            arrayList.add(keyedStateInputFormat.nextRecord((Object) null));
        }
        keyedStateInputFormat.close();
        keyedStateInputFormat.closeInputFormat();
        return arrayList;
    }
}
