/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.File;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class StateDescriptorPassingTest {
    @Test
    public void testFoldWindowState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
        DataStreamSource src = env.fromElements((Object[])new String[]{"abc"});
        SingleOutputStreamOperator result = src.keyBy((KeySelector)new KeySelector<String, String>(){

            public String getKey(String value) {
                return null;
            }
        }).timeWindow(Time.milliseconds((long)1000L)).fold((Object)new File("/"), (FoldFunction)new FoldFunction<String, File>(){

            public File fold(File a, String e) {
                return null;
            }
        });
        this.validateStateDescriptorConfigured(result);
    }

    @Test
    public void testReduceWindowState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
        DataStreamSource src = env.fromElements((Object[])new File[]{new File("/")});
        SingleOutputStreamOperator result = src.keyBy((KeySelector)new KeySelector<File, String>(){

            public String getKey(File value) {
                return null;
            }
        }).timeWindow(Time.milliseconds((long)1000L)).reduce((ReduceFunction)new ReduceFunction<File>(){

            public File reduce(File value1, File value2) {
                return null;
            }
        });
        this.validateStateDescriptorConfigured(result);
    }

    @Test
    public void testApplyWindowState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
        DataStreamSource src = env.fromElements((Object[])new File[]{new File("/")});
        SingleOutputStreamOperator result = src.keyBy((KeySelector)new KeySelector<File, String>(){

            public String getKey(File value) {
                return null;
            }
        }).timeWindow(Time.milliseconds((long)1000L)).apply((WindowFunction)new WindowFunction<File, String, String, TimeWindow>(){

            public void apply(String s, TimeWindow window, Iterable<File> input, Collector<String> out) {
            }
        });
        this.validateListStateDescriptorConfigured(result);
    }

    @Test
    public void testProcessWindowState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
        DataStreamSource src = env.fromElements((Object[])new File[]{new File("/")});
        SingleOutputStreamOperator result = src.keyBy((KeySelector)new KeySelector<File, String>(){

            public String getKey(File value) {
                return null;
            }
        }).timeWindow(Time.milliseconds((long)1000L)).process((ProcessWindowFunction)new ProcessWindowFunction<File, String, String, TimeWindow>(){

            public void process(String s, ProcessWindowFunction.Context ctx, Iterable<File> input, Collector<String> out) {
            }
        });
        this.validateListStateDescriptorConfigured(result);
    }

    @Test
    public void testProcessAllWindowState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
        DataStreamSource src = env.fromElements((Object[])new File[]{new File("/")});
        SingleOutputStreamOperator result = src.timeWindowAll(Time.milliseconds((long)1000L)).process((ProcessAllWindowFunction)new ProcessAllWindowFunction<File, String, TimeWindow>(){

            public void process(ProcessAllWindowFunction.Context ctx, Iterable<File> input, Collector<String> out) {
            }
        });
        this.validateListStateDescriptorConfigured(result);
    }

    @Test
    public void testFoldWindowAllState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
        DataStreamSource src = env.fromElements((Object[])new String[]{"abc"});
        SingleOutputStreamOperator result = src.timeWindowAll(Time.milliseconds((long)1000L)).fold((Object)new File("/"), (FoldFunction)new FoldFunction<String, File>(){

            public File fold(File a, String e) {
                return null;
            }
        });
        this.validateStateDescriptorConfigured(result);
    }

    @Test
    public void testReduceWindowAllState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
        DataStreamSource src = env.fromElements((Object[])new File[]{new File("/")});
        SingleOutputStreamOperator result = src.timeWindowAll(Time.milliseconds((long)1000L)).reduce((ReduceFunction)new ReduceFunction<File>(){

            public File reduce(File value1, File value2) {
                return null;
            }
        });
        this.validateStateDescriptorConfigured(result);
    }

    @Test
    public void testApplyWindowAllState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
        DataStreamSource src = env.fromElements((Object[])new File[]{new File("/")});
        SingleOutputStreamOperator result = src.timeWindowAll(Time.milliseconds((long)1000L)).apply((AllWindowFunction)new AllWindowFunction<File, String, TimeWindow>(){

            public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {
            }
        });
        this.validateListStateDescriptorConfigured(result);
    }

    private void validateStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
        OneInputTransformation transform = (OneInputTransformation)result.getTransformation();
        WindowOperator op = (WindowOperator)transform.getOperator();
        StateDescriptor descr = op.getStateDescriptor();
        TypeSerializer serializer = descr.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Kryo kryo = ((KryoSerializer)serializer).getKryo();
        Assert.assertTrue((String)"serializer registration was not properly passed on", (boolean)(kryo.getSerializer(File.class) instanceof JavaSerializer));
    }

    private void validateListStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
        OneInputTransformation transform = (OneInputTransformation)result.getTransformation();
        WindowOperator op = (WindowOperator)transform.getOperator();
        StateDescriptor descr = op.getStateDescriptor();
        Assert.assertTrue((boolean)(descr instanceof ListStateDescriptor));
        ListStateDescriptor listDescr = (ListStateDescriptor)descr;
        TypeSerializer serializer = listDescr.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof ListSerializer));
        TypeSerializer elementSerializer = listDescr.getElementSerializer();
        Assert.assertTrue((boolean)(elementSerializer instanceof KryoSerializer));
        Kryo kryo = ((KryoSerializer)elementSerializer).getKryo();
        Assert.assertTrue((String)"serializer registration was not properly passed on", (boolean)(kryo.getSerializer(File.class) instanceof JavaSerializer));
    }
}

