package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MROutputFiles;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
import org.apache.xerces.dom3.as.ASContentModel;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;

/* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.class */
public class TestMergeManager {

    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager$StubbedMergeManager.class */
    private static class StubbedMergeManager extends MergeManagerImpl<Text, Text> {
        private TestMergeThread mergeThread;

        public StubbedMergeManager(JobConf jobConf, ExceptionReporter exceptionReporter, CyclicBarrier cyclicBarrier, CyclicBarrier cyclicBarrier2) {
            super(null, jobConf, (FileSystem) Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, exceptionReporter, null, (MapOutputFile) Mockito.mock(MapOutputFile.class));
            this.mergeThread.setSyncBarriers(cyclicBarrier, cyclicBarrier2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl
        public MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
            this.mergeThread = new TestMergeThread(this, getExceptionReporter());
            return this.mergeThread;
        }

        public int getNumMerges() {
            return this.mergeThread.getNumMerges();
        }
    }

    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager$TestExceptionReporter.class */
    private static class TestExceptionReporter implements ExceptionReporter {
        private List<Throwable> exceptions;

        private TestExceptionReporter() {
            this.exceptions = new ArrayList();
        }

        @Override // org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter
        public void reportException(Throwable th) {
            this.exceptions.add(th);
            th.printStackTrace();
        }

        public int getNumExceptions() {
            return this.exceptions.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager$TestMergeThread.class */
    public static class TestMergeThread extends MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> {
        private AtomicInteger numMerges;
        private CyclicBarrier mergeStart;
        private CyclicBarrier mergeComplete;

        public TestMergeThread(MergeManagerImpl<Text, Text> mergeManagerImpl, ExceptionReporter exceptionReporter) {
            super(mergeManagerImpl, ASContentModel.AS_UNBOUNDED, exceptionReporter);
            this.numMerges = new AtomicInteger(0);
        }

        public synchronized void setSyncBarriers(CyclicBarrier cyclicBarrier, CyclicBarrier cyclicBarrier2) {
            this.mergeStart = cyclicBarrier;
            this.mergeComplete = cyclicBarrier2;
        }

        public int getNumMerges() {
            return this.numMerges.get();
        }

        @Override // org.apache.hadoop.mapreduce.task.reduce.MergeThread
        public void merge(List<InMemoryMapOutput<Text, Text>> list) throws IOException {
            synchronized (this) {
                this.numMerges.incrementAndGet();
                Iterator<InMemoryMapOutput<Text, Text>> it = list.iterator();
                while (it.hasNext()) {
                    this.manager.unreserve(it.next().getSize());
                }
            }
            try {
                this.mergeStart.await();
                this.mergeComplete.await();
            } catch (InterruptedException e) {
            } catch (BrokenBarrierException e2) {
            }
        }
    }

    @Test(timeout = 10000)
    public void testMemoryMerge() throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
        jobConf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, 10000L);
        jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.8f);
        jobConf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 0.9f);
        TestExceptionReporter testExceptionReporter = new TestExceptionReporter();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
        StubbedMergeManager stubbedMergeManager = new StubbedMergeManager(jobConf, testExceptionReporter, cyclicBarrier, cyclicBarrier2);
        MapOutput<Text, Text> reserve = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertTrue("Should be a memory merge", reserve instanceof InMemoryMapOutput);
        InMemoryMapOutput<Text, Text> inMemoryMapOutput = (InMemoryMapOutput) reserve;
        fillOutput(inMemoryMapOutput);
        MapOutput<Text, Text> reserve2 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertTrue("Should be a memory merge", reserve2 instanceof InMemoryMapOutput);
        InMemoryMapOutput<Text, Text> inMemoryMapOutput2 = (InMemoryMapOutput) reserve2;
        fillOutput(inMemoryMapOutput2);
        Assert.assertEquals("Should be told to wait", (Object) null, stubbedMergeManager.reserve(null, 7950L, 0));
        inMemoryMapOutput.commit();
        inMemoryMapOutput2.commit();
        cyclicBarrier.await();
        Assert.assertEquals(1L, stubbedMergeManager.getNumMerges());
        MapOutput<Text, Text> reserve3 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertTrue("Should be a memory merge", reserve3 instanceof InMemoryMapOutput);
        InMemoryMapOutput<Text, Text> inMemoryMapOutput3 = (InMemoryMapOutput) reserve3;
        fillOutput(inMemoryMapOutput3);
        MapOutput<Text, Text> reserve4 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertTrue("Should be a memory merge", reserve4 instanceof InMemoryMapOutput);
        InMemoryMapOutput<Text, Text> inMemoryMapOutput4 = (InMemoryMapOutput) reserve4;
        fillOutput(inMemoryMapOutput4);
        Assert.assertEquals("Should be told to wait", (Object) null, stubbedMergeManager.reserve(null, 7950L, 0));
        inMemoryMapOutput3.commit();
        inMemoryMapOutput4.commit();
        cyclicBarrier2.await();
        cyclicBarrier.await();
        Assert.assertEquals(2L, stubbedMergeManager.getNumMerges());
        cyclicBarrier2.await();
        Assert.assertEquals(2L, stubbedMergeManager.getNumMerges());
        Assert.assertEquals("exception reporter invoked", 0L, testExceptionReporter.getNumExceptions());
    }

    private void fillOutput(InMemoryMapOutput<Text, Text> inMemoryMapOutput) throws IOException {
        BoundedByteArrayOutputStream arrayStream = inMemoryMapOutput.getArrayStream();
        int limit = arrayStream.getLimit();
        for (int i = 0; i < limit; i++) {
            arrayStream.write(i);
        }
    }

    @Test
    public void testIoSortDefaults() {
        JobConf jobConf = new JobConf();
        Assert.assertEquals(10L, jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100));
        Assert.assertEquals(100L, jobConf.getInt(MRJobConfig.IO_SORT_MB, 10));
    }

    @Test(timeout = 10000)
    public void testOnDiskMerger() throws IOException, URISyntaxException, InterruptedException {
        JobConf jobConf = new JobConf();
        jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, 5);
        MergeManagerImpl mergeManagerImpl = new MergeManagerImpl(null, jobConf, FileSystem.getLocal(jobConf), null, null, null, null, null, null, null, null, null, null, new MROutputFiles());
        MergeThread mergeThread = (MergeThread) Whitebox.getInternalState(mergeManagerImpl, "onDiskMerger");
        Assert.assertEquals(((Integer) Whitebox.getInternalState(mergeThread, "mergeFactor")).intValue(), 5L);
        mergeThread.suspend();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            mergeManagerImpl.closeOnDiskFile(new MergeManagerImpl.CompressAwarePath(new Path("somePath"), 1L, random.nextInt()));
        }
        LinkedList linkedList = (LinkedList) Whitebox.getInternalState(mergeThread, "pendingToBeMerged");
        Assert.assertTrue("No inputs were added to list pending to merge", linkedList.size() > 0);
        for (int i2 = 0; i2 < linkedList.size(); i2++) {
            List list = (List) linkedList.get(i2);
            for (int i3 = 1; i3 < list.size(); i3++) {
                Assert.assertTrue("Not enough / too many inputs were going to be merged", list.size() > 0 && list.size() <= 5);
                Assert.assertTrue("Inputs to be merged were not sorted according to size: ", ((MergeManagerImpl.CompressAwarePath) list.get(i3)).getCompressedSize() >= ((MergeManagerImpl.CompressAwarePath) list.get(i3 - 1)).getCompressedSize());
            }
        }
    }

    @Test
    public void testLargeMemoryLimits() throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, 8589934592L);
        jobConf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
        jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f);
        jobConf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f);
        jobConf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f);
        MergeManagerImpl<Text, Text> mergeManagerImpl = new MergeManagerImpl<>(null, jobConf, (FileSystem) Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, null, null, new MROutputFiles());
        Assert.assertTrue("Large shuffle area unusable: " + mergeManagerImpl.memoryLimit, mergeManagerImpl.memoryLimit > 2147483647L);
        long maxInMemReduceLimit = mergeManagerImpl.getMaxInMemReduceLimit();
        Assert.assertTrue("Large in-memory reduce area unusable: " + maxInMemReduceLimit, maxInMemReduceLimit > 2147483647L);
        Assert.assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE", 2147483647L, mergeManagerImpl.maxSingleShuffleLimit);
        verifyReservedMapOutputType(mergeManagerImpl, 10L, "MEMORY");
        verifyReservedMapOutputType(mergeManagerImpl, 2147483648L, "DISK");
    }

    private void verifyReservedMapOutputType(MergeManagerImpl<Text, Text> mergeManagerImpl, long j, String str) throws IOException {
        Assert.assertEquals("Shuffled bytes: " + j, str, mergeManagerImpl.reserve(TaskAttemptID.forName("attempt_0_1_m_1_1"), j, 1).getDescription());
        mergeManagerImpl.unreserve(j);
    }

    @Test
    public void testZeroShuffleMemoryLimitPercent() throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f);
        verifyReservedMapOutputType(new MergeManagerImpl<>(null, jobConf, (FileSystem) Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, null, null, new MROutputFiles()), 10L, "DISK");
    }
}
