/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ObjectReuseITCase
extends MultipleProgramsTestBaseJUnit4 {
    private static final List<Tuple2<String, Integer>> REDUCE_DATA = Arrays.asList(new Tuple2((Object)"a", (Object)1), new Tuple2((Object)"a", (Object)2), new Tuple2((Object)"a", (Object)3), new Tuple2((Object)"a", (Object)4), new Tuple2((Object)"a", (Object)50));
    private static final List<Tuple2<String, Integer>> GROUP_REDUCE_DATA = Arrays.asList(new Tuple2((Object)"a", (Object)1), new Tuple2((Object)"a", (Object)2), new Tuple2((Object)"a", (Object)3), new Tuple2((Object)"a", (Object)4), new Tuple2((Object)"a", (Object)5));
    private final boolean objectReuse;

    public ObjectReuseITCase(boolean objectReuse) {
        super(MultipleProgramsTestBaseJUnit4.TestExecutionMode.CLUSTER);
        this.objectReuse = objectReuse;
    }

    @Test
    public void testKeyedReduce() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        if (this.objectReuse) {
            env.getConfig().enableObjectReuse();
        } else {
            env.getConfig().disableObjectReuse();
        }
        DataStreamSource input = env.fromData(REDUCE_DATA);
        SingleOutputStreamOperator result = input.keyBy((KeySelector & Serializable)x -> (String)x.f0).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                Tuple2<String, Integer> tuple2 = value2;
                tuple2.f1 = (Integer)tuple2.f1 + (Integer)value1.f1;
                return value2;
            }
        });
        Tuple2 res = (Tuple2)result.executeAndCollect().next();
        Assert.assertEquals((Object)new Tuple2((Object)"a", (Object)60), (Object)res);
    }

    @Test
    public void testGlobalReduce() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        if (this.objectReuse) {
            env.getConfig().enableObjectReuse();
        } else {
            env.getConfig().disableObjectReuse();
        }
        DataStreamSource input = env.fromData(REDUCE_DATA);
        SingleOutputStreamOperator result = input.windowAll((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                if ((Integer)value1.f1 % 3 == 0) {
                    Tuple2<String, Integer> tuple2 = value1;
                    tuple2.f1 = (Integer)tuple2.f1 + (Integer)value2.f1;
                    return value1;
                }
                Tuple2<String, Integer> tuple2 = value2;
                tuple2.f1 = (Integer)tuple2.f1 + (Integer)value1.f1;
                return value2;
            }
        });
        Tuple2 res = (Tuple2)result.executeAndCollect().next();
        Assert.assertEquals((Object)new Tuple2((Object)"a", (Object)60), (Object)res);
    }

    @Parameterized.Parameters(name="Execution mode = CLUSTER, Reuse = {0}")
    public static Collection<Object[]> executionModes() {
        return Arrays.asList({false}, {true});
    }
}

