package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/ReduceTaskExternalITCase.class */
public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunction<Record, Record>> {
    private static final Logger LOG = LoggerFactory.getLogger(ReduceTaskExternalITCase.class);
    private final RecordComparator comparator;
    private final List<Record> outList;

    /* loaded from: input_file:org/apache/flink/runtime/operators/ReduceTaskExternalITCase$MockCombiningReduceStub.class */
    public static class MockCombiningReduceStub implements GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> {
        private static final long serialVersionUID = 1;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private final IntValue combineValue = new IntValue();

        public void reduce(Iterable<Record> iterable, Collector<Record> collector) {
            Record record = null;
            int i = 0;
            Iterator<Record> it = iterable.iterator();
            while (it.hasNext()) {
                record = it.next();
                record.getField(1, this.value);
                i += this.value.getValue();
            }
            record.getField(0, this.key);
            this.value.setValue(i - this.key.getValue());
            record.setField(1, this.value);
            collector.collect(record);
        }

        public void combine(Iterable<Record> iterable, Collector<Record> collector) {
            Record record = null;
            int i = 0;
            Iterator<Record> it = iterable.iterator();
            while (it.hasNext()) {
                record = it.next();
                record.getField(1, this.combineValue);
                i += this.combineValue.getValue();
            }
            this.combineValue.setValue(i);
            record.setField(1, this.combineValue);
            collector.collect(record);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/ReduceTaskExternalITCase$MockReduceStub.class */
    public static class MockReduceStub extends RichGroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();

        public void reduce(Iterable<Record> iterable, Collector<Record> collector) {
            Record record = null;
            int i = 0;
            Iterator<Record> it = iterable.iterator();
            while (it.hasNext()) {
                record = it.next();
                i++;
            }
            record.getField(0, this.key);
            this.value.setValue(i - this.key.getValue());
            record.setField(1, this.value);
            collector.collect(record);
        }
    }

    public ReduceTaskExternalITCase(ExecutionConfig executionConfig) {
        super(executionConfig, 0L, 1, 3145728L);
        this.comparator = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        this.outList = new ArrayList();
    }

    @Test
    public void testSingleLevelMergeReduceTask() {
        setNumFileHandlesForSort(2);
        addDriverComparator(this.comparator);
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try {
            addInputSorted(new UniformRecordGenerator(8192, 8, false), this.comparator.m535duplicate());
            testDriver(new GroupReduceDriver(), MockReduceStub.class);
        } catch (Exception e) {
            LOG.info("Exception while running the test task.", e);
            Assert.fail("Exception in Test: " + e.getMessage());
        }
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was 8192", this.outList.size() == 8192);
        for (Record record : this.outList) {
            Assert.assertTrue("Incorrect result", record.getField(1, IntValue.class).getValue() == 8 - record.getField(0, IntValue.class).getValue());
        }
        this.outList.clear();
    }

    @Test
    public void testMultiLevelMergeReduceTask() {
        setNumFileHandlesForSort(2);
        addDriverComparator(this.comparator);
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try {
            addInputSorted(new UniformRecordGenerator(32768, 8, false), this.comparator.m535duplicate());
            testDriver(new GroupReduceDriver(), MockReduceStub.class);
        } catch (Exception e) {
            LOG.info("Exception while running the test task.", e);
            Assert.fail("Exception in Test: " + e.getMessage());
        }
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was 32768", this.outList.size() == 32768);
        for (Record record : this.outList) {
            Assert.assertTrue("Incorrect result", record.getField(1, IntValue.class).getValue() == 8 - record.getField(0, IntValue.class).getValue());
        }
        this.outList.clear();
    }

    @Test
    public void testSingleLevelMergeCombiningReduceTask() throws IOException {
        addDriverComparator(this.comparator);
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        Sorter sorter = null;
        try {
            try {
                sorter = ExternalSorter.newBuilder(getMemoryManager(), getContainingTask(), RecordSerializerFactory.get().getSerializer(), this.comparator.m535duplicate()).maxNumFileHandles(2).withCombiner(new MockCombiningReduceStub()).enableSpilling(getIOManager(), 0.800000011920929d).memoryFraction(this.perSortFractionMem).objectReuse(true).largeRecords(true).build(new UniformRecordGenerator(8192, 8, false));
                addInput(sorter.getIterator());
                testDriver(new GroupReduceDriver(), MockCombiningReduceStub.class);
                if (sorter != null) {
                    sorter.close();
                }
            } catch (Exception e) {
                LOG.info("Exception while running the test task.", e);
                Assert.fail("Invoke method caused exception: " + e.getMessage());
                if (sorter != null) {
                    sorter.close();
                }
            }
            int i = 0;
            for (int i2 = 1; i2 < 8; i2++) {
                i += i2;
            }
            Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was 8192", this.outList.size() == 8192);
            for (Record record : this.outList) {
                Assert.assertTrue("Incorrect result", record.getField(1, IntValue.class).getValue() == i - record.getField(0, IntValue.class).getValue());
            }
            this.outList.clear();
        } catch (Throwable th) {
            if (sorter != null) {
                sorter.close();
            }
            throw th;
        }
    }

    @Test
    public void testMultiLevelMergeCombiningReduceTask() throws IOException {
        addDriverComparator(this.comparator);
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        Sorter sorter = null;
        try {
            try {
                sorter = ExternalSorter.newBuilder(getMemoryManager(), getContainingTask(), RecordSerializerFactory.get().getSerializer(), this.comparator.m535duplicate()).maxNumFileHandles(2).withCombiner(new MockCombiningReduceStub()).enableSpilling(getIOManager(), 0.800000011920929d).memoryFraction(this.perSortFractionMem).objectReuse(false).largeRecords(true).build(new UniformRecordGenerator(32768, 8, false));
                addInput(sorter.getIterator());
                testDriver(new GroupReduceDriver(), MockCombiningReduceStub.class);
                if (sorter != null) {
                    sorter.close();
                }
            } catch (Exception e) {
                LOG.info("Exception while running the test task.", e);
                Assert.fail("Invoke method caused exception: " + e.getMessage());
                if (sorter != null) {
                    sorter.close();
                }
            }
            int i = 0;
            for (int i2 = 1; i2 < 8; i2++) {
                i += i2;
            }
            Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was 32768", this.outList.size() == 32768);
            for (Record record : this.outList) {
                Assert.assertTrue("Incorrect result", record.getField(1, IntValue.class).getValue() == i - record.getField(0, IntValue.class).getValue());
            }
            this.outList.clear();
        } catch (Throwable th) {
            if (sorter != null) {
                sorter.close();
            }
            throw th;
        }
    }
}
