/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.WindowReaderFunction;
import org.apache.flink.state.api.utils.AggregateSum;
import org.apache.flink.state.api.utils.ReduceSum;
import org.apache.flink.state.api.utils.SavepointTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.util.Collector;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public abstract class DataSetSavepointWindowReaderITCase<B extends StateBackend>
extends SavepointTestBase {
    private static final String uid = "stateful-operator";
    private static final Integer[] numbers = new Integer[]{1, 2, 3};

    protected abstract B getStateBackend();

    @Test
    public void testReduceWindowStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 0L)).keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).reduce((ReduceFunction)new ReduceSum()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).reduce(uid, (ReduceFunction)new ReduceSum(), Types.INT, Types.INT).collect();
        Assert.assertThat((String)"Unexpected results from keyed state", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])numbers));
    }

    @Test
    public void testReduceEvictorWindowStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 0L)).keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).evictor(new NoOpEvictor()).reduce((ReduceFunction)new ReduceSum()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).evictor().reduce(uid, (ReduceFunction)new ReduceSum(), Types.INT, Types.INT).collect();
        Assert.assertThat((String)"Unexpected results from keyed state", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])numbers));
    }

    @Test
    public void testAggregateWindowStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 0L)).keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).aggregate((AggregateFunction)new AggregateSum()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).aggregate(uid, (AggregateFunction)new AggregateSum(), Types.INT, Types.INT, Types.INT).collect();
        Assert.assertThat((String)"Unexpected results from keyed state", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])numbers));
    }

    @Test
    public void testAggregateEvictorWindowStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 0L)).keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).evictor(new NoOpEvictor()).aggregate((AggregateFunction)new AggregateSum()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).evictor().aggregate(uid, (AggregateFunction)new AggregateSum(), Types.INT, Types.INT, Types.INT).collect();
        Assert.assertThat((String)"Unexpected results from keyed state", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])numbers));
    }

    @Test
    public void testProcessWindowStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 0L)).keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).process((ProcessWindowFunction)new NoOpProcessWindowFunction()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).process(uid, (WindowReaderFunction)new BasicReaderFunction(), Types.INT, Types.INT, Types.INT).collect();
        Assert.assertThat((String)"Unexpected results from keyed state", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])numbers));
    }

    @Test
    public void testProcessEvictorWindowStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 0L)).keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).evictor(new NoOpEvictor()).process((ProcessWindowFunction)new NoOpProcessWindowFunction()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).evictor().process(uid, (WindowReaderFunction)new BasicReaderFunction(), Types.INT, Types.INT, Types.INT).collect();
        Assert.assertThat((String)"Unexpected results from keyed state", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])numbers));
    }

    @Test
    public void testApplyWindowStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 0L)).keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).apply((WindowFunction)new NoOpWindowFunction()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).process(uid, (WindowReaderFunction)new BasicReaderFunction(), Types.INT, Types.INT, Types.INT).collect();
        Assert.assertThat((String)"Unexpected results from keyed state", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])numbers));
    }

    @Test
    public void testApplyEvictorWindowStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 0L)).keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)10L))).evictor(new NoOpEvictor()).apply((WindowFunction)new NoOpWindowFunction()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).evictor().process(uid, (WindowReaderFunction)new BasicReaderFunction(), Types.INT, Types.INT, Types.INT).collect();
        Assert.assertThat((String)"Unexpected results from keyed state", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])numbers));
    }

    @Test
    public void testWindowTriggerStateReader() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(this.getStateBackend());
        env.setParallelism(4);
        env.addSource(this.createSource(numbers)).rebalance().keyBy((KeySelector & Serializable)id -> id).window((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).reduce((ReduceFunction)new ReduceSum()).uid(uid).addSink((SinkFunction)new DiscardingSink());
        String savepointPath = this.takeSavepoint(env);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)savepointPath, this.getStateBackend());
        List results = savepoint.window((TypeSerializer)new GlobalWindow.Serializer()).reduce(uid, (ReduceFunction)new ReduceSum(), (WindowReaderFunction)new TriggerReaderFunction(), Types.INT, Types.INT, Types.LONG).collect();
        Assert.assertThat((String)"Unexpected results from trigger state", (Object)results, (Matcher)Matchers.contains((Object[])new Long[]{1L, 1L, 1L}));
    }

    private static class LongSum
    implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private LongSum() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }

    private static class TriggerReaderFunction
    extends WindowReaderFunction<Integer, Long, Integer, GlobalWindow> {
        private final ReducingStateDescriptor<Long> triggerCountDesc = new ReducingStateDescriptor("count", (ReduceFunction)new LongSum(), (TypeSerializer)LongSerializer.INSTANCE);

        private TriggerReaderFunction() {
        }

        public void readWindow(Integer integer, WindowReaderFunction.Context<GlobalWindow> context, Iterable<Integer> elements, Collector<Long> out) throws Exception {
            ReducingState state = (ReducingState)context.triggerState(this.triggerCountDesc);
            out.collect(state.get());
        }
    }

    private static class NoOpEvictor<W extends Window>
    implements Evictor<Integer, W> {
        private NoOpEvictor() {
        }

        public void evictBefore(Iterable<TimestampedValue<Integer>> elements, int size, W window, Evictor.EvictorContext evictorContext) {
        }

        public void evictAfter(Iterable<TimestampedValue<Integer>> elements, int size, W window, Evictor.EvictorContext evictorContext) {
        }
    }

    private static class BasicReaderFunction
    extends WindowReaderFunction<Integer, Integer, Integer, TimeWindow> {
        private BasicReaderFunction() {
        }

        public void readWindow(Integer key, WindowReaderFunction.Context<TimeWindow> context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
            Assert.assertEquals((String)"Unexpected window", (Object)new TimeWindow(0L, 10L), (Object)context.window());
            Assert.assertThat((String)"Unexpected registered timers", (Object)context.registeredEventTimeTimers(), (Matcher)Matchers.contains((Object[])new Long[]{9L}));
            out.collect((Object)elements.iterator().next());
        }
    }

    private static class NoOpWindowFunction
    implements WindowFunction<Integer, Integer, Integer, TimeWindow> {
        private NoOpWindowFunction() {
        }

        public void apply(Integer integer, TimeWindow window, Iterable<Integer> input, Collector<Integer> out) {
        }
    }

    private static class NoOpProcessWindowFunction
    extends ProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
        private NoOpProcessWindowFunction() {
        }

        public void process(Integer integer, ProcessWindowFunction.Context context, Iterable<Integer> elements, Collector<Integer> out) {
        }
    }
}

