/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.NewSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.WindowedOperatorTransformation;
import org.apache.flink.state.api.functions.Timestamper;
import org.apache.flink.state.api.utils.MaxWatermarkSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.util.StreamCollector;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SavepointWriterWindowITCase
extends AbstractTestBase {
    private static final String UID = "uid";
    private static final Collection<String> WORDS = Arrays.asList("hello", "world", "hello", "everyone");
    private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> STANDARD_MATCHER = Matchers.containsInAnyOrder((Object[])new Tuple2[]{Tuple2.of((Object)"hello", (Object)2), Tuple2.of((Object)"world", (Object)1), Tuple2.of((Object)"everyone", (Object)1)});
    private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> EVICTOR_MATCHER = Matchers.containsInAnyOrder((Object[])new Tuple2[]{Tuple2.of((Object)"hello", (Object)1), Tuple2.of((Object)"world", (Object)1), Tuple2.of((Object)"everyone", (Object)1)});
    private static final TypeInformation<Tuple2<String, Integer>> TUPLE_TYPE_INFO = new TypeHint<Tuple2<String, Integer>>(){}.getTypeInfo();
    private static final List<Tuple3<String, WindowBootstrap, WindowStream>> SETUP_FUNCTIONS = Arrays.asList(Tuple3.of((Object)"reduce", transformation -> transformation.reduce((ReduceFunction)new Reducer()), stream -> stream.reduce((ReduceFunction)new Reducer())), Tuple3.of((Object)"aggregate", transformation -> transformation.aggregate((AggregateFunction)new Aggregator()), stream -> stream.aggregate((AggregateFunction)new Aggregator())), Tuple3.of((Object)"apply", transformation -> transformation.apply((WindowFunction)new CustomWindowFunction()), stream -> stream.apply((WindowFunction)new CustomWindowFunction())), Tuple3.of((Object)"process", transformation -> transformation.process((ProcessWindowFunction)new CustomProcessWindowFunction()), stream -> stream.process((ProcessWindowFunction)new CustomProcessWindowFunction())));
    private static final List<Tuple2<String, StateBackend>> STATE_BACKENDS = Arrays.asList(Tuple2.of((Object)"MemoryStateBackend", (Object)new MemoryStateBackend()), Tuple2.of((Object)"RocksDB", (Object)new RocksDBStateBackend((StateBackend)new MemoryStateBackend())));
    @Rule
    public StreamCollector collector = new StreamCollector();
    private final WindowBootstrap windowBootstrap;
    private final WindowStream windowStream;
    private final StateBackend stateBackend;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        ArrayList<Object[]> parameterList = new ArrayList<Object[]>();
        for (Tuple2<String, StateBackend> stateBackend : STATE_BACKENDS) {
            for (Tuple3<String, WindowBootstrap, WindowStream> setup : SETUP_FUNCTIONS) {
                Object[] parameters = new Object[]{(String)stateBackend.f0 + ": " + (String)setup.f0, setup.f1, setup.f2, stateBackend.f1};
                parameterList.add(parameters);
            }
        }
        return parameterList;
    }

    public SavepointWriterWindowITCase(String ignore, WindowBootstrap windowBootstrap, WindowStream windowStream, StateBackend stateBackend) {
        this.windowBootstrap = windowBootstrap;
        this.windowStream = windowStream;
        this.stateBackend = stateBackend;
    }

    @Test
    public void testTumbleWindow() throws Exception {
        String savepointPath = this.getTempDirPath(new AbstractID().toHexString());
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        SingleInputUdfOperator bootstrapData = bEnv.fromCollection(WORDS).map((MapFunction & Serializable)word -> Tuple2.of((Object)word, (Object)1)).returns(TUPLE_TYPE_INFO);
        WindowedOperatorTransformation transformation = OperatorTransformation.bootstrapWith((DataSet)bootstrapData).assignTimestamps((Timestamper & Serializable)record -> 2L).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0, Types.STRING).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)5L)));
        ((NewSavepoint)Savepoint.create((StateBackend)this.stateBackend, (int)128).withOperator(UID, this.windowBootstrap.bootstrap((WindowedOperatorTransformation<Tuple2<String, Integer>, String, TimeWindow>)transformation))).write(savepointPath);
        bEnv.execute("write state");
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        sEnv.setStateBackend(this.stateBackend);
        WindowedStream stream = sEnv.addSource(new MaxWatermarkSource()).returns(TUPLE_TYPE_INFO).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)5L)));
        SingleOutputStreamOperator windowed = this.windowStream.window((WindowedStream<Tuple2<String, Integer>, String, TimeWindow>)stream).uid(UID);
        CompletableFuture future = this.collector.collect((DataStream)windowed);
        this.submitJob(savepointPath, sEnv);
        Collection results = (Collection)future.get();
        Assert.assertThat((String)"Incorrect results from bootstrapped windows", (Object)results, STANDARD_MATCHER);
    }

    @Test
    public void testTumbleWindowWithEvictor() throws Exception {
        String savepointPath = this.getTempDirPath(new AbstractID().toHexString());
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        SingleInputUdfOperator bootstrapData = bEnv.fromCollection(WORDS).map((MapFunction & Serializable)word -> Tuple2.of((Object)word, (Object)1)).returns(TUPLE_TYPE_INFO);
        WindowedOperatorTransformation transformation = OperatorTransformation.bootstrapWith((DataSet)bootstrapData).assignTimestamps((Timestamper & Serializable)record -> 2L).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0, Types.STRING).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)5L))).evictor((Evictor)CountEvictor.of((long)1L));
        ((NewSavepoint)Savepoint.create((StateBackend)new MemoryStateBackend(), (int)128).withOperator(UID, this.windowBootstrap.bootstrap((WindowedOperatorTransformation<Tuple2<String, Integer>, String, TimeWindow>)transformation))).write(savepointPath);
        bEnv.execute("write state");
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        WindowedStream stream = sEnv.addSource(new MaxWatermarkSource(), TUPLE_TYPE_INFO).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)5L))).evictor((Evictor)CountEvictor.of((long)1L));
        SingleOutputStreamOperator windowed = this.windowStream.window((WindowedStream<Tuple2<String, Integer>, String, TimeWindow>)stream).uid(UID);
        CompletableFuture future = this.collector.collect((DataStream)windowed);
        this.submitJob(savepointPath, sEnv);
        Collection results = (Collection)future.get();
        Assert.assertThat((String)"Incorrect results from bootstrapped windows", (Object)results, EVICTOR_MATCHER);
    }

    @Test
    public void testSlideWindow() throws Exception {
        String savepointPath = this.getTempDirPath(new AbstractID().toHexString());
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        SingleInputUdfOperator bootstrapData = bEnv.fromCollection(WORDS).map((MapFunction & Serializable)word -> Tuple2.of((Object)word, (Object)1)).returns(TUPLE_TYPE_INFO);
        WindowedOperatorTransformation transformation = OperatorTransformation.bootstrapWith((DataSet)bootstrapData).assignTimestamps((Timestamper & Serializable)record -> 2L).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0, Types.STRING).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)5L), (Time)Time.milliseconds((long)1L)));
        ((NewSavepoint)Savepoint.create((StateBackend)new MemoryStateBackend(), (int)128).withOperator(UID, this.windowBootstrap.bootstrap((WindowedOperatorTransformation<Tuple2<String, Integer>, String, TimeWindow>)transformation))).write(savepointPath);
        bEnv.execute("write state");
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        WindowedStream stream = sEnv.addSource(new MaxWatermarkSource()).returns(TUPLE_TYPE_INFO).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)5L), (Time)Time.milliseconds((long)1L)));
        SingleOutputStreamOperator windowed = this.windowStream.window((WindowedStream<Tuple2<String, Integer>, String, TimeWindow>)stream).uid(UID);
        CompletableFuture future = this.collector.collect((DataStream)windowed);
        this.submitJob(savepointPath, sEnv);
        Collection results = (Collection)future.get();
        Assert.assertEquals((String)"Incorrect number of results", (long)15L, (long)results.size());
        Assert.assertThat((String)"Incorrect bootstrap state", new HashSet(results), STANDARD_MATCHER);
    }

    @Test
    public void testSlideWindowWithEvictor() throws Exception {
        String savepointPath = this.getTempDirPath(new AbstractID().toHexString());
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        SingleInputUdfOperator bootstrapData = bEnv.fromCollection(WORDS).map((MapFunction & Serializable)word -> Tuple2.of((Object)word, (Object)1)).returns(TUPLE_TYPE_INFO);
        WindowedOperatorTransformation transformation = OperatorTransformation.bootstrapWith((DataSet)bootstrapData).assignTimestamps((Timestamper & Serializable)record -> 2L).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0, Types.STRING).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)5L), (Time)Time.milliseconds((long)1L))).evictor((Evictor)CountEvictor.of((long)1L));
        ((NewSavepoint)Savepoint.create((StateBackend)new MemoryStateBackend(), (int)128).withOperator(UID, this.windowBootstrap.bootstrap((WindowedOperatorTransformation<Tuple2<String, Integer>, String, TimeWindow>)transformation))).write(savepointPath);
        bEnv.execute("write state");
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        WindowedStream stream = sEnv.addSource(new MaxWatermarkSource()).returns(TUPLE_TYPE_INFO).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)5L), (Time)Time.milliseconds((long)1L))).evictor((Evictor)CountEvictor.of((long)1L));
        SingleOutputStreamOperator windowed = this.windowStream.window((WindowedStream<Tuple2<String, Integer>, String, TimeWindow>)stream).uid(UID);
        CompletableFuture future = this.collector.collect((DataStream)windowed);
        this.submitJob(savepointPath, sEnv);
        Collection results = (Collection)future.get();
        Assert.assertEquals((String)"Incorrect number of results", (long)15L, (long)results.size());
        Assert.assertThat((String)"Incorrect bootstrap state", new HashSet(results), EVICTOR_MATCHER);
    }

    private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) {
        JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)true));
        ClusterClient client = miniClusterResource.getClusterClient();
        try {
            Optional serializedThrowable = ((JobResult)((CompletableFuture)client.submitJob(jobGraph).thenCompose(arg_0 -> ((ClusterClient)client).requestJobResult(arg_0))).get()).getSerializedThrowable();
            Assert.assertFalse((boolean)serializedThrowable.isPresent());
        }
        catch (Throwable t) {
            Assert.fail((String)"Failed to submit job");
        }
    }

    @FunctionalInterface
    private static interface WindowStream {
        public SingleOutputStreamOperator<Tuple2<String, Integer>> window(WindowedStream<Tuple2<String, Integer>, String, TimeWindow> var1);
    }

    @FunctionalInterface
    private static interface WindowBootstrap {
        public BootstrapTransformation<Tuple2<String, Integer>> bootstrap(WindowedOperatorTransformation<Tuple2<String, Integer>, String, TimeWindow> var1);
    }

    private static class CustomProcessWindowFunction
    extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
        private CustomProcessWindowFunction() {
        }

        public void process(String s, ProcessWindowFunction.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) {
            Iterator<Tuple2<String, Integer>> iterator = elements.iterator();
            Tuple2<String, Integer> acc = iterator.next();
            while (iterator.hasNext()) {
                Tuple2<String, Integer> next = iterator.next();
                Tuple2<String, Integer> tuple2 = acc;
                Integer.valueOf((Integer)tuple2.f1 + (Integer)next.f1);
                tuple2.f1 = tuple2.f1;
            }
            out.collect(acc);
        }
    }

    private static class CustomWindowFunction
    implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
        private CustomWindowFunction() {
        }

        public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
            Iterator<Tuple2<String, Integer>> iterator = input.iterator();
            Tuple2<String, Integer> acc = iterator.next();
            while (iterator.hasNext()) {
                Tuple2<String, Integer> next = iterator.next();
                Tuple2<String, Integer> tuple2 = acc;
                Integer.valueOf((Integer)tuple2.f1 + (Integer)next.f1);
                tuple2.f1 = tuple2.f1;
            }
            out.collect(acc);
        }
    }

    private static class Aggregator
    implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private Aggregator() {
        }

        public Tuple2<String, Integer> createAccumulator() {
            return null;
        }

        public Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
            if (accumulator == null) {
                return Tuple2.of((Object)value.f0, (Object)value.f1);
            }
            Tuple2<String, Integer> tuple2 = accumulator;
            tuple2.f1 = (Integer)tuple2.f1 + (Integer)value.f1;
            return accumulator;
        }

        public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
            return accumulator;
        }

        public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
            Tuple2<String, Integer> tuple2 = a;
            tuple2.f1 = (Integer)tuple2.f1 + (Integer)b.f1;
            return a;
        }
    }

    private static class Reducer
    implements ReduceFunction<Tuple2<String, Integer>> {
        private Reducer() {
        }

        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
            return Tuple2.of((Object)value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
        }
    }
}

