/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class FoldApplyWindowFunctionTest {
    @Test
    public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception {
        DummyStreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
        ArrayList<OneInputTransformation> transformations = new ArrayList<OneInputTransformation>();
        int initValue = 1;
        FoldApplyWindowFunction foldWindowFunction = new FoldApplyWindowFunction((Object)initValue, (FoldFunction)new FoldFunction<Integer, Integer>(){
            private static final long serialVersionUID = -4849549768529720587L;

            public Integer fold(Integer accumulator, Integer value) throws Exception {
                return accumulator + value;
            }
        }, (WindowFunction)new WindowFunction<Integer, Integer, Integer, TimeWindow>(){

            public void apply(Integer integer, TimeWindow window, Iterable<Integer> input, Collector<Integer> out) throws Exception {
                for (Integer in : input) {
                    out.collect((Object)in);
                }
            }
        });
        AccumulatingProcessingTimeWindowOperator windowOperator = new AccumulatingProcessingTimeWindowOperator((WindowFunction)foldWindowFunction, (KeySelector)new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = -7951310554369722809L;

            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        }, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 3000L, 3000L);
        SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){
            private static final long serialVersionUID = 8297735565464653028L;

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            }

            public void cancel() {
            }
        };
        SourceTransformation source = new SourceTransformation("", new StreamSource((SourceFunction)sourceFunction), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1);
        transformations.add(new OneInputTransformation((StreamTransformation)source, "test", (OneInputStreamOperator)windowOperator, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1));
        StreamGraph streamGraph = StreamGraphGenerator.generate((StreamExecutionEnvironment)env, transformations);
        ArrayList result = new ArrayList();
        ArrayList<Integer> input = new ArrayList<Integer>();
        ArrayList<Integer> expected = new ArrayList<Integer>();
        input.add(1);
        input.add(2);
        input.add(3);
        Iterator iterator = input.iterator();
        while (iterator.hasNext()) {
            int value = (Integer)iterator.next();
            initValue += value;
        }
        expected.add(initValue);
        foldWindowFunction.apply((Object)0, (Window)new TimeWindow(0L, 1L), input, (Collector)new ListCollector(result));
        Assert.assertEquals(expected, result);
    }

    public static class DummyStreamExecutionEnvironment
    extends StreamExecutionEnvironment {
        public JobExecutionResult execute(String jobName) throws Exception {
            return null;
        }
    }
}

