/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.operators.base;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class CoGroupOperatorCollectionTest
implements Serializable {
    @Test
    public void testExecuteOnCollection() {
        try {
            List<Tuple2> input1 = Arrays.asList(new Tuple2Builder().add((Object)"foo", (Object)1).add((Object)"foobar", (Object)1).add((Object)"foo", (Object)1).add((Object)"bar", (Object)1).add((Object)"foo", (Object)1).add((Object)"foo", (Object)1).build());
            List<Tuple2> input2 = Arrays.asList(new Tuple2Builder().add((Object)"foo", (Object)1).add((Object)"foo", (Object)1).add((Object)"bar", (Object)1).add((Object)"foo", (Object)1).add((Object)"barfoo", (Object)1).add((Object)"foo", (Object)1).build());
            ExecutionConfig executionConfig = new ExecutionConfig();
            HashMap accumulators = new HashMap();
            HashMap cpTasks = new HashMap();
            RuntimeUDFContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, cpTasks, accumulators);
            SumCoGroup udf1 = new SumCoGroup();
            SumCoGroup udf2 = new SumCoGroup();
            executionConfig.disableObjectReuse();
            List resultSafe = this.getCoGroupOperator(udf1).executeOnCollections(input1, input2, (RuntimeContext)ctx, executionConfig);
            executionConfig.enableObjectReuse();
            List resultRegular = this.getCoGroupOperator(udf2).executeOnCollections(input1, input2, (RuntimeContext)ctx, executionConfig);
            Assert.assertTrue((boolean)udf1.isClosed);
            Assert.assertTrue((boolean)udf2.isClosed);
            HashSet<Tuple2> expected = new HashSet<Tuple2>(Arrays.asList(new Tuple2Builder().add((Object)"foo", (Object)8).add((Object)"bar", (Object)2).add((Object)"foobar", (Object)1).add((Object)"barfoo", (Object)1).build()));
            Assert.assertEquals(expected, new HashSet(resultSafe));
            Assert.assertEquals(expected, new HashSet(resultRegular));
            executionConfig.disableObjectReuse();
            List resultSafe2 = this.getCoGroupOperator(new SumCoGroup()).executeOnCollections(Collections.emptyList(), Collections.emptyList(), (RuntimeContext)ctx, executionConfig);
            executionConfig.enableObjectReuse();
            List resultRegular2 = this.getCoGroupOperator(new SumCoGroup()).executeOnCollections(Collections.emptyList(), Collections.emptyList(), (RuntimeContext)ctx, executionConfig);
            Assert.assertEquals((long)0L, (long)resultSafe2.size());
            Assert.assertEquals((long)0L, (long)resultRegular2.size());
        }
        catch (Throwable t) {
            t.printStackTrace();
            Assert.fail((String)t.getMessage());
        }
    }

    private CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>> getCoGroupOperator(RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> udf) {
        return new CoGroupOperatorBase(udf, new BinaryOperatorInformation(TypeInfoParser.parse((String)"Tuple2<String, Integer>"), TypeInfoParser.parse((String)"Tuple2<String, Integer>"), TypeInfoParser.parse((String)"Tuple2<String, Integer>")), new int[]{0}, new int[]{0}, "coGroup on Collections");
    }

    private class SumCoGroup
    extends RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private boolean isOpened = false;
        private boolean isClosed = false;

        private SumCoGroup() {
        }

        public void open(Configuration parameters) throws Exception {
            this.isOpened = true;
            RuntimeContext ctx = this.getRuntimeContext();
            Assert.assertEquals((Object)"Test UDF", (Object)ctx.getTaskName());
            Assert.assertEquals((long)4L, (long)ctx.getNumberOfParallelSubtasks());
            Assert.assertEquals((long)0L, (long)ctx.getIndexOfThisSubtask());
        }

        public void coGroup(Iterable<Tuple2<String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple2<String, Integer>> out) throws Exception {
            Assert.assertTrue((boolean)this.isOpened);
            Assert.assertFalse((boolean)this.isClosed);
            String f0 = null;
            int sumF1 = 0;
            for (Tuple2<String, Integer> input : first) {
                f0 = f0 == null ? (String)input.f0 : f0;
                sumF1 += ((Integer)input.f1).intValue();
            }
            for (Tuple2<String, Integer> input : second) {
                f0 = f0 == null ? (String)input.f0 : f0;
                sumF1 += ((Integer)input.f1).intValue();
            }
            out.collect((Object)Tuple2.of((Object)f0, (Object)sumF1));
        }

        public void close() throws Exception {
            this.isClosed = true;
        }
    }
}

