package org.apache.flink.api.common.operators.base;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.class */
class CoGroupOperatorCollectionTest implements Serializable {

    /* loaded from: input_file:org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest$SumCoGroup.class */
    private static class SumCoGroup extends RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private boolean isOpened;
        private boolean isClosed;

        private SumCoGroup() {
            this.isOpened = false;
            this.isClosed = false;
        }

        public void open(Configuration configuration) throws Exception {
            this.isOpened = true;
            RuntimeContext runtimeContext = getRuntimeContext();
            Assertions.assertThat(runtimeContext.getTaskName()).isEqualTo("Test UDF");
            Assertions.assertThat(runtimeContext.getNumberOfParallelSubtasks()).isEqualTo(4);
            Assertions.assertThat(runtimeContext.getIndexOfThisSubtask()).isZero();
        }

        public void coGroup(Iterable<Tuple2<String, Integer>> iterable, Iterable<Tuple2<String, Integer>> iterable2, Collector<Tuple2<String, Integer>> collector) throws Exception {
            Assertions.assertThat(this.isOpened).isTrue();
            Assertions.assertThat(this.isClosed).isFalse();
            String str = null;
            int i = 0;
            for (Tuple2<String, Integer> tuple2 : iterable) {
                str = str == null ? (String) tuple2.f0 : str;
                i += ((Integer) tuple2.f1).intValue();
            }
            for (Tuple2<String, Integer> tuple22 : iterable2) {
                str = str == null ? (String) tuple22.f0 : str;
                i += ((Integer) tuple22.f1).intValue();
            }
            collector.collect(Tuple2.of(str, Integer.valueOf(i)));
        }

        public void close() throws Exception {
            this.isClosed = true;
        }
    }

    CoGroupOperatorCollectionTest() {
    }

    @Test
    void testExecuteOnCollection() {
        try {
            List asList = Arrays.asList(new Tuple2Builder().add("foo", 1).add("foobar", 1).add("foo", 1).add("bar", 1).add("foo", 1).add("foo", 1).build());
            List asList2 = Arrays.asList(new Tuple2Builder().add("foo", 1).add("foo", 1).add("bar", 1).add("foo", 1).add("barfoo", 1).add("foo", 1).build());
            ExecutionConfig executionConfig = new ExecutionConfig();
            HashMap hashMap = new HashMap();
            RuntimeUDFContext runtimeUDFContext = new RuntimeUDFContext(new TaskInfo("Test UDF", 4, 0, 4, 0), (ClassLoader) null, executionConfig, new HashMap(), hashMap, UnregisteredMetricsGroup.createOperatorMetricGroup());
            SumCoGroup sumCoGroup = new SumCoGroup();
            SumCoGroup sumCoGroup2 = new SumCoGroup();
            executionConfig.disableObjectReuse();
            List executeOnCollections = getCoGroupOperator(sumCoGroup).executeOnCollections(asList, asList2, runtimeUDFContext, executionConfig);
            executionConfig.enableObjectReuse();
            List executeOnCollections2 = getCoGroupOperator(sumCoGroup2).executeOnCollections(asList, asList2, runtimeUDFContext, executionConfig);
            Assertions.assertThat(sumCoGroup.isClosed).isTrue();
            Assertions.assertThat(sumCoGroup2.isClosed).isTrue();
            HashSet hashSet = new HashSet(Arrays.asList(new Tuple2Builder().add("foo", 8).add("bar", 2).add("foobar", 1).add("barfoo", 1).build()));
            Assertions.assertThat(new HashSet(executeOnCollections)).containsExactlyInAnyOrderElementsOf(hashSet);
            Assertions.assertThat(new HashSet(executeOnCollections2)).containsExactlyInAnyOrderElementsOf(hashSet);
            executionConfig.disableObjectReuse();
            List executeOnCollections3 = getCoGroupOperator(new SumCoGroup()).executeOnCollections(Collections.emptyList(), Collections.emptyList(), runtimeUDFContext, executionConfig);
            executionConfig.enableObjectReuse();
            List executeOnCollections4 = getCoGroupOperator(new SumCoGroup()).executeOnCollections(Collections.emptyList(), Collections.emptyList(), runtimeUDFContext, executionConfig);
            Assertions.assertThat(executeOnCollections3).isEmpty();
            Assertions.assertThat(executeOnCollections4).isEmpty();
        } catch (Throwable th) {
            th.printStackTrace();
            Assertions.fail(th.getMessage());
        }
    }

    private CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>> getCoGroupOperator(RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> richCoGroupFunction) {
        TypeInformation of = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { // from class: org.apache.flink.api.common.operators.base.CoGroupOperatorCollectionTest.1
        });
        return new CoGroupOperatorBase<>(richCoGroupFunction, new BinaryOperatorInformation(of, of, of), new int[]{0}, new int[]{0}, "coGroup on Collections");
    }
}
