/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.input;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.functions.WindowReaderFunction;
import org.apache.flink.state.api.input.KeyedStateInputFormat;
import org.apache.flink.state.api.input.operator.StateReaderOperator;
import org.apache.flink.state.api.input.operator.WindowReaderOperator;
import org.apache.flink.state.api.input.operator.window.PassThroughReader;
import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.state.api.utils.AggregateSum;
import org.apache.flink.state.api.utils.ReduceSum;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class WindowReaderTest {
    private static final int MAX_PARALLELISM = 128;
    private static final String UID = "uid";

    @Test
    public void testReducingWindow() throws Exception {
        WindowOperator<Integer, Integer, ?, Void, ?> operator = WindowReaderTest.getWindowOperator(stream -> stream.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).reduce((ReduceFunction)new ReduceSum()));
        OperatorState operatorState = WindowReaderTest.getOperatorState(operator);
        KeyedStateInputFormat format = new KeyedStateInputFormat(operatorState, (StateBackend)new MemoryStateBackend(), new Configuration(), (StateReaderOperator)WindowReaderOperator.reduce((ReduceFunction)new ReduceSum(), (WindowReaderFunction)new PassThroughReader(), (TypeInformation)Types.INT, (TypeSerializer)new TimeWindow.Serializer(), (TypeInformation)Types.INT));
        List list = this.readState(format);
        Assert.assertEquals(Arrays.asList(1, 1), list);
    }

    @Test
    public void testSessionWindow() throws Exception {
        WindowOperator<Integer, Integer, ?, Void, ?> operator = WindowReaderTest.getWindowOperator(stream -> stream.window((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.milliseconds((long)3L))).reduce((ReduceFunction)new ReduceSum()));
        OperatorState operatorState = WindowReaderTest.getOperatorState(operator);
        KeyedStateInputFormat format = new KeyedStateInputFormat(operatorState, (StateBackend)new MemoryStateBackend(), new Configuration(), (StateReaderOperator)WindowReaderOperator.reduce((ReduceFunction)new ReduceSum(), (WindowReaderFunction)new PassThroughReader(), (TypeInformation)Types.INT, (TypeSerializer)new TimeWindow.Serializer(), (TypeInformation)Types.INT));
        List list = this.readState(format);
        Assert.assertEquals(Collections.singletonList(2), list);
    }

    @Test
    public void testAggregateWindow() throws Exception {
        WindowOperator<Integer, Integer, ?, Void, ?> operator = WindowReaderTest.getWindowOperator(stream -> stream.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).aggregate((AggregateFunction)new AggregateSum()));
        OperatorState operatorState = WindowReaderTest.getOperatorState(operator);
        KeyedStateInputFormat format = new KeyedStateInputFormat(operatorState, (StateBackend)new MemoryStateBackend(), new Configuration(), (StateReaderOperator)WindowReaderOperator.aggregate((AggregateFunction)new AggregateSum(), (WindowReaderFunction)new PassThroughReader(), (TypeInformation)Types.INT, (TypeSerializer)new TimeWindow.Serializer(), (TypeInformation)Types.INT));
        List list = this.readState(format);
        Assert.assertEquals(Arrays.asList(1, 1), list);
    }

    @Test
    public void testProcessReader() throws Exception {
        WindowOperator<Integer, Integer, ?, Void, ?> operator = WindowReaderTest.getWindowOperator(stream -> stream.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).process(WindowReaderTest.mockProcessWindowFunction(), Types.INT));
        OperatorState operatorState = WindowReaderTest.getOperatorState(operator);
        KeyedStateInputFormat format = new KeyedStateInputFormat(operatorState, (StateBackend)new MemoryStateBackend(), new Configuration(), (StateReaderOperator)WindowReaderOperator.process((WindowReaderFunction)new PassThroughReader(), (TypeInformation)Types.INT, (TypeSerializer)new TimeWindow.Serializer(), (TypeInformation)Types.INT));
        List list = this.readState(format);
        Assert.assertEquals(Arrays.asList(1, 1), list);
    }

    @Test
    public void testPerPaneAndPerKeyState() throws Exception {
        WindowOperator<Integer, Integer, ?, Void, ?> operator = WindowReaderTest.getWindowOperator(stream -> stream.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).trigger(new AlwaysFireTrigger()).process((ProcessWindowFunction)new MultiFireWindow(), Types.INT));
        OperatorState operatorState = WindowReaderTest.getOperatorState(operator);
        KeyedStateInputFormat format = new KeyedStateInputFormat(operatorState, (StateBackend)new MemoryStateBackend(), new Configuration(), (StateReaderOperator)WindowReaderOperator.process((WindowReaderFunction)new MultiFireReaderFunction(), (TypeInformation)Types.INT, (TypeSerializer)new TimeWindow.Serializer(), (TypeInformation)Types.INT));
        List list = this.readState(format);
        Assert.assertEquals(Arrays.asList(Tuple2.of((Object)2, (Object)1), Tuple2.of((Object)2, (Object)1)), list);
    }

    private static WindowOperator<Integer, Integer, ?, Void, ?> getWindowOperator(Function<KeyedStream<Integer, Integer>, SingleOutputStreamOperator<Integer>> window) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KeyedStream keyedStream = env.addSource(WindowReaderTest.mockSourceFunction()).returns(Integer.class).keyBy(new IdentityKeySelector());
        SingleOutputStreamOperator stream = window.apply((KeyedStream<Integer, Integer>)keyedStream).uid(UID);
        return WindowReaderTest.getLastOperator(stream);
    }

    private static SourceFunction<Integer> mockSourceFunction() {
        return (SourceFunction)Mockito.mock(SourceFunction.class);
    }

    private static <W extends Window> ProcessWindowFunction<Integer, Integer, Integer, W> mockProcessWindowFunction() {
        return (ProcessWindowFunction)Mockito.mock(ProcessWindowFunction.class);
    }

    private static OperatorState getOperatorState(WindowOperator<Integer, Integer, ?, Void, ?> operator) throws Exception {
        KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), Types.INT, 128, 1, 0);
        harness.open();
        harness.processElement((Object)1, 0L);
        harness.processElement((Object)1, 1L);
        OperatorSubtaskState state = harness.snapshot(0L, 0L);
        harness.close();
        OperatorID operatorID = OperatorIDGenerator.fromUid((String)UID);
        OperatorState operatorState = new OperatorState(operatorID, 1, 128);
        operatorState.putState(0, state);
        return operatorState;
    }

    private static <T> WindowOperator<Integer, Integer, ?, Void, ?> getLastOperator(DataStream<T> dataStream) {
        OneInputTransformation oneInput;
        OneInputStreamOperator operator;
        Transformation transformation = dataStream.getTransformation();
        if (!(transformation instanceof OneInputTransformation)) {
            Assert.fail((String)"This test only supports window operators");
        }
        if (!((operator = (oneInput = (OneInputTransformation)transformation).getOperator()) instanceof WindowOperator)) {
            Assert.fail((String)"This test only supports window operators");
        }
        return (WindowOperator)operator;
    }

    @Nonnull
    private <OUT> List<OUT> readState(KeyedStateInputFormat<Integer, TimeWindow, OUT> format) throws IOException {
        KeyGroupRangeInputSplit split = format.createInputSplits(1)[0];
        ArrayList<Object> data = new ArrayList<Object>();
        format.setRuntimeContext((RuntimeContext)new MockStreamingRuntimeContext(false, 1, 0));
        format.openInputFormat();
        format.open(split);
        while (!format.reachedEnd()) {
            data.add(format.nextRecord(null));
        }
        format.close();
        format.closeInputFormat();
        return data;
    }

    private static class AlwaysFireTrigger<W extends Window>
    extends Trigger<Object, W> {
        private AlwaysFireTrigger() {
        }

        public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        }
    }

    private static class MultiFireReaderFunction
    extends WindowReaderFunction<Integer, Tuple2<Integer, Integer>, Integer, TimeWindow> {
        private MultiFireReaderFunction() {
        }

        public void readWindow(Integer integer, WindowReaderFunction.Context<TimeWindow> context, Iterable<Integer> elements, Collector<Tuple2<Integer, Integer>> out) throws Exception {
            Integer perKey = (Integer)context.globalState().getReducingState(new ReducingStateDescriptor("per-key", (ReduceFunction)new ReduceSum(), Types.INT)).get();
            Integer perPane = (Integer)context.windowState().getReducingState(new ReducingStateDescriptor("per-pane", (ReduceFunction)new ReduceSum(), Types.INT)).get();
            out.collect((Object)Tuple2.of((Object)perKey, (Object)perPane));
        }
    }

    private static class MultiFireWindow
    extends ProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
        private MultiFireWindow() {
        }

        public void process(Integer integer, ProcessWindowFunction.Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
            Integer element = elements.iterator().next();
            context.globalState().getReducingState(new ReducingStateDescriptor("per-key", (ReduceFunction)new ReduceSum(), Types.INT)).add((Object)element);
            context.windowState().getReducingState(new ReducingStateDescriptor("per-pane", (ReduceFunction)new ReduceSum(), Types.INT)).add((Object)element);
        }
    }

    private static class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private IdentityKeySelector() {
        }

        public T getKey(T value) {
            return value;
        }
    }
}

