/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FileChunk;
import org.apache.hadoop.io.IntWritable;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ExceptionReporter;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetchedInputAllocatorOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestMergeManager {
    private static final Logger LOG = LoggerFactory.getLogger(TestMergeManager.class);
    private static Configuration defaultConf = new TezConfiguration();
    private static FileSystem localFs = null;
    private static Path workDir = null;

    @Before
    @After
    public void cleanup() throws IOException {
        localFs.delete(workDir, true);
    }

    @Test(timeout=10000L)
    public void testConfigs() throws IOException {
        long maxTaskMem = 0x200000000L;
        TezConfiguration conf = new TezConfiguration(defaultConf);
        conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.8f);
        conf.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.5f);
        Assert.assertTrue((MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem) == 6871947776L ? 1 : 0) != 0);
        conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.5f);
        conf.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.5f);
        Assert.assertTrue((MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem) > Integer.MAX_VALUE ? 1 : 0) != 0);
        conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.4f);
        conf.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.9f);
        Assert.assertTrue((MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem) > Integer.MAX_VALUE ? 1 : 0) != 0);
        conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.1f);
        conf.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.1f);
        Assert.assertTrue((MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem) < Integer.MAX_VALUE ? 1 : 0) != 0);
        try {
            conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 2.4f);
            MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem);
            Assert.fail((String)"Should have thrown wrong buffer percent configuration exception");
        }
        catch (IllegalArgumentException ie) {
            // empty catch block
        }
        try {
            conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", -2.4f);
            MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem);
            Assert.fail((String)"Should have thrown wrong buffer percent configuration exception");
        }
        catch (IllegalArgumentException ie) {
            // empty catch block
        }
        try {
            conf.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 1.4f);
            MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem);
            Assert.fail((String)"Should have thrown wrong post merge buffer percent configuration exception");
        }
        catch (IllegalArgumentException ie) {
            // empty catch block
        }
        try {
            conf.setFloat("tez.runtime.task.input.post-merge.buffer.percent", -1.4f);
            MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem);
            Assert.fail((String)"Should have thrown wrong post merge buffer percent configuration exception");
        }
        catch (IllegalArgumentException ie) {
            // empty catch block
        }
        try {
            conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 1.4f);
            MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem);
            Assert.fail((String)"Should have thrown wrong shuffle fetch buffer percent configuration exception");
        }
        catch (IllegalArgumentException ie) {
            // empty catch block
        }
        try {
            conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", -1.4f);
            MergeManager.getInitialMemoryRequirement((Configuration)conf, (long)maxTaskMem);
            Assert.fail((String)"Should have thrown wrong shuffle fetch buffer percent configuration exception");
        }
        catch (IllegalArgumentException ie) {
            // empty catch block
        }
        conf.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.4f);
        conf.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.8f);
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext t0inputContext = this.createMockInputContext(UUID.randomUUID().toString(), maxTaskMem);
        ExceptionReporter t0exceptionReporter = (ExceptionReporter)Mockito.mock(ExceptionReporter.class);
        long initialMemoryAvailable = (long)((double)maxTaskMem * 0.8);
        MergeManager mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, t0inputContext, null, null, null, null, t0exceptionReporter, initialMemoryAvailable, null, false, -1);
        Assert.assertTrue((mergeManager.postMergeMemLimit > Integer.MAX_VALUE ? 1 : 0) != 0);
        initialMemoryAvailable = 0xC800000L;
        mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, t0inputContext, null, null, null, null, t0exceptionReporter, initialMemoryAvailable, null, false, -1);
        Assert.assertTrue((mergeManager.postMergeMemLimit == initialMemoryAvailable ? 1 : 0) != 0);
    }

    @Test(timeout=10000L)
    public void testReservationAccounting() throws IOException {
        TezConfiguration conf = new TezConfiguration(defaultConf);
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        InputContext inputContext = this.createMockInputContext(UUID.randomUUID().toString());
        MergeManager mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, null, inputContext, null, null, null, null, (ExceptionReporter)Mockito.mock(ExceptionReporter.class), 2000000L, null, false, -1);
        mergeManager.configureAndStart();
        Assert.assertEquals((long)0L, (long)mergeManager.getUsedMemory());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        MapOutput mapOutput = mergeManager.reserve(null, 1L, 1L, 0);
        Assert.assertEquals((long)1L, (long)mergeManager.getUsedMemory());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        mapOutput.abort();
        Assert.assertEquals((long)0L, (long)mergeManager.getUsedMemory());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        mapOutput = mergeManager.reserve(null, 2L, 2L, 0);
        mergeManager.closeInMemoryFile(mapOutput);
        Assert.assertEquals((long)2L, (long)mergeManager.getUsedMemory());
        Assert.assertEquals((long)2L, (long)mergeManager.getCommitMemory());
        mergeManager.releaseCommittedMemory(2L);
        Assert.assertEquals((long)0L, (long)mergeManager.getUsedMemory());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
    }

    @Test(timeout=20000L)
    public void testIntermediateMemoryMergeAccounting() throws Exception {
        TezConfiguration conf = new TezConfiguration(defaultConf);
        conf.setBoolean("tez.runtime.compress", false);
        conf.set("tez.runtime.key.class", IntWritable.class.getName());
        conf.set("tez.runtime.value.class", IntWritable.class.getName());
        conf.setBoolean("tez.runtime.shuffle.memory-to-memory.enable", true);
        conf.setInt("tez.runtime.shuffle.memory-to-memory.segments", 2);
        Path localDir = new Path(workDir, "local");
        Path srcDir = new Path(workDir, "srcData");
        localFs.mkdirs(localDir);
        localFs.mkdirs(srcDir);
        conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDir.toString()});
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext inputContext = this.createMockInputContext(UUID.randomUUID().toString());
        ExceptionReporter exceptionReporter = (ExceptionReporter)Mockito.mock(ExceptionReporter.class);
        MergeManager mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, inputContext, null, null, null, null, exceptionReporter, 2000000L, null, false, -1);
        mergeManager.configureAndStart();
        Assert.assertEquals((long)0L, (long)mergeManager.getUsedMemory());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        byte[] data1 = this.generateData((Configuration)conf, 10);
        byte[] data2 = this.generateData((Configuration)conf, 20);
        MapOutput firstMapOutput = mergeManager.reserve(null, (long)data1.length, (long)data1.length, 0);
        MapOutput secondMapOutput = mergeManager.reserve(null, (long)data2.length, (long)data2.length, 0);
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)firstMapOutput.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)secondMapOutput.getType());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        Assert.assertEquals((long)(data1.length + data2.length), (long)mergeManager.getUsedMemory());
        System.arraycopy(data1, 0, firstMapOutput.getMemory(), 0, data1.length);
        System.arraycopy(data2, 0, secondMapOutput.getMemory(), 0, data2.length);
        secondMapOutput.commit();
        Assert.assertEquals((long)data2.length, (long)mergeManager.getCommitMemory());
        Assert.assertEquals((long)(data1.length + data2.length), (long)mergeManager.getUsedMemory());
        firstMapOutput.commit();
        mergeManager.waitForMemToMemMerge();
        Assert.assertEquals((long)(data1.length + data2.length), (long)mergeManager.getCommitMemory());
        Assert.assertEquals((long)(data1.length + data2.length), (long)mergeManager.getUsedMemory());
    }

    @Test(timeout=60000L)
    public void testIntermediateMemoryMerge() throws Throwable {
        TezConfiguration conf = new TezConfiguration(defaultConf);
        conf.setBoolean("tez.runtime.compress", false);
        conf.set("tez.runtime.key.class", IntWritable.class.getName());
        conf.set("tez.runtime.value.class", IntWritable.class.getName());
        conf.setBoolean("tez.runtime.shuffle.memory-to-memory.enable", true);
        conf.setInt("tez.runtime.shuffle.memory-to-memory.segments", 3);
        Path localDir = new Path(workDir, "local");
        Path srcDir = new Path(workDir, "srcData");
        localFs.mkdirs(localDir);
        localFs.mkdirs(srcDir);
        conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDir.toString()});
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext inputContext = this.createMockInputContext(UUID.randomUUID().toString());
        ExceptionReporter exceptionReporter = (ExceptionReporter)Mockito.mock(ExceptionReporter.class);
        MergeManager mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, inputContext, null, null, null, null, exceptionReporter, 2000000L, null, false, -1);
        mergeManager.configureAndStart();
        Assert.assertEquals((long)0L, (long)mergeManager.getUsedMemory());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        byte[] data1 = this.generateDataBySize((Configuration)conf, 10);
        byte[] data2 = this.generateDataBySize((Configuration)conf, 20);
        byte[] data3 = this.generateDataBySize((Configuration)conf, 200);
        byte[] data4 = this.generateDataBySize((Configuration)conf, 20000);
        MapOutput mo1 = mergeManager.reserve(new InputAttemptIdentifier(0, 0), (long)data1.length, (long)data1.length, 0);
        MapOutput mo2 = mergeManager.reserve(new InputAttemptIdentifier(1, 0), (long)data2.length, (long)data2.length, 0);
        MapOutput mo3 = mergeManager.reserve(new InputAttemptIdentifier(2, 0), (long)data3.length, (long)data3.length, 0);
        MapOutput mo4 = mergeManager.reserve(new InputAttemptIdentifier(3, 0), (long)data4.length, (long)data4.length, 0);
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo1.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo2.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo3.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo4.getType());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        Assert.assertEquals((long)(data1.length + data2.length + data3.length + data4.length), (long)mergeManager.getUsedMemory());
        System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
        System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
        System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
        System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
        mo1.commit();
        mo2.commit();
        mo3.commit();
        mo4.commit();
        mergeManager.waitForMemToMemMerge();
        Assert.assertEquals((long)1L, (long)mergeManager.inMemoryMergedMapOutputs.size());
        Assert.assertEquals((long)1L, (long)mergeManager.inMemoryMapOutputs.size());
        mergeManager.close(true);
        mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, inputContext, null, null, null, null, exceptionReporter, 2000000L, null, false, -1);
        mergeManager.configureAndStart();
        data1 = this.generateDataBySize((Configuration)conf, 10);
        data2 = this.generateDataBySize((Configuration)conf, 400000);
        data3 = this.generateDataBySize((Configuration)conf, 400000);
        data4 = this.generateDataBySize((Configuration)conf, 400000);
        mo1 = mergeManager.reserve(new InputAttemptIdentifier(0, 0), (long)data1.length, (long)data1.length, 0);
        mo2 = mergeManager.reserve(new InputAttemptIdentifier(1, 0), (long)data2.length, (long)data2.length, 0);
        mo3 = mergeManager.reserve(new InputAttemptIdentifier(2, 0), (long)data3.length, (long)data3.length, 0);
        mo4 = mergeManager.reserve(new InputAttemptIdentifier(3, 0), (long)data4.length, (long)data4.length, 0);
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo1.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo2.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo3.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo4.getType());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        Assert.assertEquals((long)(data1.length + data2.length + data3.length + data4.length), (long)mergeManager.getUsedMemory());
        System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
        System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
        System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
        System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
        mo1.commit();
        mo2.commit();
        mo3.commit();
        mo4.commit();
        mergeManager.waitForMemToMemMerge();
        Assert.assertEquals((long)1L, (long)mergeManager.inMemoryMergedMapOutputs.size());
        Assert.assertEquals((long)2L, (long)mergeManager.inMemoryMapOutputs.size());
        mergeManager.close(true);
        mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, inputContext, null, null, null, null, exceptionReporter, 2000000L, null, false, -1);
        mergeManager.configureAndStart();
        data1 = this.generateDataBySize((Configuration)conf, 400000);
        data2 = this.generateDataBySize((Configuration)conf, 400000);
        data3 = this.generateDataBySize((Configuration)conf, 400000);
        data4 = this.generateDataBySize((Configuration)conf, 400000);
        mo1 = mergeManager.reserve(new InputAttemptIdentifier(0, 0), (long)data1.length, (long)data1.length, 0);
        mo2 = mergeManager.reserve(new InputAttemptIdentifier(1, 0), (long)data2.length, (long)data2.length, 0);
        mo3 = mergeManager.reserve(new InputAttemptIdentifier(2, 0), (long)data3.length, (long)data3.length, 0);
        mo4 = mergeManager.reserve(new InputAttemptIdentifier(3, 0), (long)data4.length, (long)data4.length, 0);
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo1.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo2.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo3.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo4.getType());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        Assert.assertEquals((long)(data1.length + data2.length + data3.length + data4.length), (long)mergeManager.getUsedMemory());
        System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
        System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
        System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
        System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
        mo1.commit();
        mo2.commit();
        mo3.commit();
        mo4.commit();
        mergeManager.waitForMemToMemMerge();
        Assert.assertEquals((long)0L, (long)mergeManager.inMemoryMergedMapOutputs.size());
        Assert.assertEquals((long)4L, (long)mergeManager.inMemoryMapOutputs.size());
        mergeManager.close(true);
        conf.setInt("tez.runtime.shuffle.memory-to-memory.segments", 4);
        mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, inputContext, null, null, null, null, exceptionReporter, 2000000L, null, false, -1);
        mergeManager.configureAndStart();
        data1 = this.generateDataBySize((Configuration)conf, 490000);
        data2 = this.generateDataBySize((Configuration)conf, 490000);
        data3 = this.generateDataBySize((Configuration)conf, 490000);
        data4 = this.generateDataBySize((Configuration)conf, 230000);
        mo1 = mergeManager.reserve(new InputAttemptIdentifier(0, 0), (long)data1.length, (long)data1.length, 0);
        mo2 = mergeManager.reserve(new InputAttemptIdentifier(1, 0), (long)data2.length, (long)data2.length, 0);
        mo3 = mergeManager.reserve(new InputAttemptIdentifier(2, 0), (long)data3.length, (long)data3.length, 0);
        mo4 = mergeManager.reserve(new InputAttemptIdentifier(3, 0), (long)data4.length, (long)data4.length, 0);
        Assert.assertTrue((mergeManager.getUsedMemory() >= 1493000L ? 1 : 0) != 0);
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo1.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo2.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo3.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo4.getType());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        Assert.assertEquals((long)(data1.length + data2.length + data3.length + data4.length), (long)mergeManager.getUsedMemory());
        System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
        System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
        System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
        System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
        mo1.commit();
        mo2.commit();
        mo3.commit();
        mo4.commit();
        int numberOfMapOutputs = 4;
        mergeManager.waitForMemToMemMerge();
        Assert.assertEquals((long)numberOfMapOutputs, (long)mergeManager.inMemoryMapOutputs.size());
        mergeManager.close(true);
        conf.setInt("tez.runtime.shuffle.memory-to-memory.segments", 4);
        mergeManager = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, inputContext, null, null, null, null, exceptionReporter, 2000000L, null, false, -1);
        mergeManager.configureAndStart();
        data1 = this.generateDataBySize((Configuration)conf, 490000);
        data2 = this.generateDataBySize((Configuration)conf, 490000);
        data3 = this.generateDataBySize((Configuration)conf, 490000);
        data4 = this.generateDataBySize((Configuration)conf, 230000);
        mo1 = mergeManager.reserve(new InputAttemptIdentifier(0, 0), (long)data1.length, (long)data1.length, 0);
        mo2 = mergeManager.reserve(new InputAttemptIdentifier(1, 0), (long)data2.length, (long)data2.length, 0);
        mo3 = mergeManager.reserve(new InputAttemptIdentifier(2, 0), (long)data3.length, (long)data3.length, 0);
        mo4 = mergeManager.reserve(new InputAttemptIdentifier(3, 0), (long)data4.length, (long)data4.length, 0);
        Assert.assertTrue((mergeManager.getUsedMemory() >= 1493000L ? 1 : 0) != 0);
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo1.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo2.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo3.getType());
        Assert.assertEquals((Object)MapOutput.Type.MEMORY, (Object)mo4.getType());
        Assert.assertEquals((long)0L, (long)mergeManager.getCommitMemory());
        Assert.assertEquals((long)(data1.length + data2.length + data3.length + data4.length), (long)mergeManager.getUsedMemory());
        System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
        System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
        System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
        System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
        mo1.commit();
        mo2.commit();
        mo3.commit();
        mo4.commit();
        numberOfMapOutputs = 4;
        mergeManager.waitForMemToMemMerge();
        Assert.assertEquals((long)numberOfMapOutputs, (long)mergeManager.inMemoryMapOutputs.size());
        Assert.assertNull((Object)mergeManager.close(false));
        Assert.assertFalse((boolean)mergeManager.isMergeComplete());
    }

    private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        FSDataOutputStream fsdos = new FSDataOutputStream((OutputStream)baos, null);
        IFile.Writer writer = new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
        int i = 0;
        do {
            writer.append((Object)new IntWritable(i), (Object)new IntWritable(i));
            ++i;
        } while (writer.getRawLength() <= (long)rawLen);
        writer.close();
        int compressedLength = (int)writer.getCompressedLength();
        int rawLength = (int)writer.getRawLength();
        byte[] data = new byte[rawLength];
        ShuffleUtils.shuffleToMemory((byte[])data, (InputStream)new ByteArrayInputStream(baos.toByteArray()), (int)rawLength, (int)compressedLength, null, (boolean)false, (int)0, (Logger)LOG, (String)"sometask");
        return data;
    }

    private byte[] generateData(Configuration conf, int numEntries) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        FSDataOutputStream fsdos = new FSDataOutputStream((OutputStream)baos, null);
        IFile.Writer writer = new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
        for (int i = 0; i < numEntries; ++i) {
            writer.append((Object)new IntWritable(i), (Object)new IntWritable(i));
        }
        writer.close();
        int compressedLength = (int)writer.getCompressedLength();
        int rawLength = (int)writer.getRawLength();
        byte[] data = new byte[rawLength];
        ShuffleUtils.shuffleToMemory((byte[])data, (InputStream)new ByteArrayInputStream(baos.toByteArray()), (int)rawLength, (int)compressedLength, null, (boolean)false, (int)0, (Logger)LOG, (String)"sometask");
        return data;
    }

    @Test(timeout=10000L)
    public void testLocalDiskMergeMultipleTasks() throws IOException, InterruptedException {
        this.testLocalDiskMergeMultipleTasks(false);
        this.testLocalDiskMergeMultipleTasks(true);
    }

    @Test(timeout=10000L)
    public void testOnDiskMergerFilenames() throws IOException, InterruptedException {
        TezConfiguration conf = new TezConfiguration(defaultConf);
        conf.setBoolean("tez.runtime.compress", false);
        conf.set("tez.runtime.key.class", IntWritable.class.getName());
        conf.set("tez.runtime.value.class", IntWritable.class.getName());
        Path localDir = new Path(workDir, "local");
        Path srcDir = new Path(workDir, "srcData");
        localFs.mkdirs(localDir);
        localFs.mkdirs(srcDir);
        conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDir.toString()});
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext inputContext = this.createMockInputContext(UUID.randomUUID().toString());
        ExceptionReporter exceptionReporter = (ExceptionReporter)Mockito.mock(ExceptionReporter.class);
        MergeManager mergeManagerReal = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, inputContext, null, null, null, null, exceptionReporter, 0x100000L, null, false, -1);
        MergeManager mergeManager = (MergeManager)Mockito.spy((Object)mergeManagerReal);
        SrcFileInfo file1Info = this.createFile((Configuration)conf, (FileSystem)localFs, new Path(srcDir, "attemptsrc1.out"), 2, 3, 6);
        SrcFileInfo file2Info = this.createFile((Configuration)conf, (FileSystem)localFs, new Path(srcDir, "attemptsrc2.out"), 2, 3, 0);
        InputAttemptIdentifier iIdentifier1 = new InputAttemptIdentifier(0, 0, file1Info.path.getName());
        InputAttemptIdentifier iIdentifier2 = new InputAttemptIdentifier(1, 0, file2Info.path.getName());
        MapOutput mapOutput1 = TestMergeManager.getMapOutputForDirectDiskFetch(iIdentifier1, file1Info.path, file1Info.indexedRecords[0], mergeManager);
        MapOutput mapOutput2 = TestMergeManager.getMapOutputForDirectDiskFetch(iIdentifier2, file2Info.path, file2Info.indexedRecords[0], mergeManager);
        mapOutput1.commit();
        mapOutput2.commit();
        ((MergeManager)Mockito.verify((Object)mergeManager)).closeOnDiskFile(mapOutput1.getOutputPath());
        ((MergeManager)Mockito.verify((Object)mergeManager)).closeOnDiskFile(mapOutput2.getOutputPath());
        LinkedList mergeFiles = new LinkedList();
        mergeFiles.addAll(mergeManager.onDiskMapOutputs);
        mergeManager.onDiskMapOutputs.clear();
        mergeManager.onDiskMerger.merge(mergeFiles);
        Assert.assertEquals((long)1L, (long)mergeManager.onDiskMapOutputs.size());
        FileChunk fcMerged1 = (FileChunk)mergeManager.onDiskMapOutputs.iterator().next();
        Path m1Path = fcMerged1.getPath();
        Assert.assertTrue((boolean)m1Path.toString().endsWith("merged0"));
        SrcFileInfo file3Info = this.createFile((Configuration)conf, (FileSystem)localFs, new Path(srcDir, "attemptsrc3.out"), 2, 22, 5);
        InputAttemptIdentifier iIdentifier3 = new InputAttemptIdentifier(2, 0, file1Info.path.getName());
        MapOutput mapOutput3 = TestMergeManager.getMapOutputForDirectDiskFetch(iIdentifier3, file3Info.path, file3Info.indexedRecords[0], mergeManager);
        mapOutput3.commit();
        ((MergeManager)Mockito.verify((Object)mergeManager)).closeOnDiskFile(mapOutput3.getOutputPath());
        mergeFiles = new LinkedList();
        mergeFiles.addAll(mergeManager.onDiskMapOutputs);
        mergeManager.onDiskMapOutputs.clear();
        mergeManager.onDiskMerger.merge(mergeFiles);
        Assert.assertEquals((long)1L, (long)mergeManager.onDiskMapOutputs.size());
        FileChunk fcMerged2 = (FileChunk)mergeManager.onDiskMapOutputs.iterator().next();
        Path m2Path = fcMerged2.getPath();
        Assert.assertTrue((boolean)m2Path.toString().endsWith("merged1"));
        Assert.assertNotEquals((Object)m1Path, (Object)m2Path);
        SrcFileInfo file4Info = this.createFile((Configuration)conf, (FileSystem)localFs, new Path(srcDir, "attemptsrc4.out"), 2, 45, 35);
        InputAttemptIdentifier iIdentifier4 = new InputAttemptIdentifier(3, 0, file4Info.path.getName());
        MapOutput mapOutput4 = TestMergeManager.getMapOutputForDirectDiskFetch(iIdentifier4, file4Info.path, file4Info.indexedRecords[0], mergeManager);
        mapOutput4.commit();
        ((MergeManager)Mockito.verify((Object)mergeManager)).closeOnDiskFile(mapOutput4.getOutputPath());
        LinkedList tmpList = new LinkedList();
        mergeFiles = new LinkedList();
        Assert.assertEquals((long)2L, (long)mergeManager.onDiskMapOutputs.size());
        tmpList.addAll(mergeManager.onDiskMapOutputs);
        mergeFiles.add(tmpList.get(1));
        mergeFiles.add(tmpList.get(0));
        mergeManager.onDiskMapOutputs.clear();
        mergeManager.onDiskMerger.merge(mergeFiles);
        Assert.assertEquals((long)1L, (long)mergeManager.onDiskMapOutputs.size());
        FileChunk fcMerged3 = (FileChunk)mergeManager.onDiskMapOutputs.iterator().next();
        Path m3Path = fcMerged3.getPath();
        Assert.assertTrue((boolean)m3Path.toString().endsWith("merged2"));
        Assert.assertNotEquals((Object)m2Path, (Object)m3Path);
        Assert.assertEquals((long)m1Path.toString().length(), (long)m2Path.toString().length());
        Assert.assertEquals((long)m2Path.toString().length(), (long)m3Path.toString().length());
        String m1Prefix = m1Path.toString().substring(0, m1Path.toString().indexOf("."));
        String m2Prefix = m2Path.toString().substring(0, m2Path.toString().indexOf("."));
        String m3Prefix = m3Path.toString().substring(0, m3Path.toString().indexOf("."));
        Assert.assertEquals((Object)m1Prefix, (Object)m2Prefix);
        Assert.assertNotEquals((Object)m1Prefix, (Object)m3Prefix);
        Assert.assertNotEquals((Object)m2Prefix, (Object)m3Prefix);
        ((InputContext)Mockito.verify((Object)inputContext, (VerificationMode)Mockito.atLeastOnce())).notifyProgress();
    }

    void testLocalDiskMergeMultipleTasks(boolean interruptInMiddle) throws IOException, InterruptedException {
        TezConfiguration conf = new TezConfiguration(defaultConf);
        conf.setBoolean("tez.runtime.compress", false);
        conf.set("tez.runtime.key.class", IntWritable.class.getName());
        conf.set("tez.runtime.value.class", IntWritable.class.getName());
        Path localDir = new Path(workDir, "local");
        Path srcDir = new Path(workDir, "srcData");
        localFs.mkdirs(localDir);
        localFs.mkdirs(srcDir);
        conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDir.toString()});
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext t0inputContext = this.createMockInputContext(UUID.randomUUID().toString());
        InputContext t1inputContext = this.createMockInputContext(UUID.randomUUID().toString());
        ExceptionReporter t0exceptionReporter = (ExceptionReporter)Mockito.mock(ExceptionReporter.class);
        ExceptionReporter t1exceptionReporter = (ExceptionReporter)Mockito.mock(ExceptionReporter.class);
        MergeManager t0mergeManagerReal = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, t0inputContext, null, null, null, null, t0exceptionReporter, 2000000L, null, false, -1);
        MergeManager t0mergeManager = (MergeManager)Mockito.spy((Object)t0mergeManagerReal);
        t0mergeManager.configureAndStart();
        MergeManager t1mergeManagerReal = new MergeManager((Configuration)conf, (FileSystem)localFs, localDirAllocator, t1inputContext, null, null, null, null, t1exceptionReporter, 2000000L, null, false, -1);
        MergeManager t1mergeManager = (MergeManager)Mockito.spy((Object)t1mergeManagerReal);
        SrcFileInfo src1Info = this.createFile((Configuration)conf, (FileSystem)localFs, new Path(srcDir, "attemptsrc1.out"), 2, 3, 0);
        SrcFileInfo src2Info = this.createFile((Configuration)conf, (FileSystem)localFs, new Path(srcDir, "attemptsrc2.out"), 2, 3, 6);
        InputAttemptIdentifier t0Identifier0 = new InputAttemptIdentifier(0, 0, src1Info.path.getName());
        InputAttemptIdentifier t0Identifier1 = new InputAttemptIdentifier(1, 0, src2Info.path.getName());
        InputAttemptIdentifier t1Identifier0 = new InputAttemptIdentifier(0, 0, src1Info.path.getName());
        InputAttemptIdentifier t1Identifier1 = new InputAttemptIdentifier(1, 0, src2Info.path.getName());
        MapOutput t0MapOutput0 = TestMergeManager.getMapOutputForDirectDiskFetch(t0Identifier0, src1Info.path, src1Info.indexedRecords[0], t0mergeManager);
        MapOutput t0MapOutput1 = TestMergeManager.getMapOutputForDirectDiskFetch(t0Identifier1, src2Info.path, src2Info.indexedRecords[0], t0mergeManager);
        MapOutput t1MapOutput0 = TestMergeManager.getMapOutputForDirectDiskFetch(t1Identifier0, src1Info.path, src1Info.indexedRecords[1], t1mergeManager);
        MapOutput t1MapOutput1 = TestMergeManager.getMapOutputForDirectDiskFetch(t1Identifier1, src2Info.path, src2Info.indexedRecords[1], t1mergeManager);
        t0MapOutput0.commit();
        t0MapOutput1.commit();
        ((MergeManager)Mockito.verify((Object)t0mergeManager)).closeOnDiskFile(t0MapOutput0.getOutputPath());
        ((MergeManager)Mockito.verify((Object)t0mergeManager)).closeOnDiskFile(t0MapOutput1.getOutputPath());
        LinkedList t0MergeFiles = new LinkedList();
        t0MergeFiles.addAll(t0mergeManager.onDiskMapOutputs);
        t0mergeManager.onDiskMapOutputs.clear();
        if (!interruptInMiddle) {
            t0mergeManager.onDiskMerger.merge(t0MergeFiles);
            Assert.assertEquals((long)1L, (long)t0mergeManager.onDiskMapOutputs.size());
        } else {
            ((MergeManager)Mockito.doAnswer((Answer)new Answer(){

                public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                    Thread.sleep(2000L);
                    return invocationOnMock.callRealMethod();
                }
            }).when((Object)t0mergeManager)).closeOnDiskFile((FileChunk)Matchers.any(FileChunk.class));
            Thread interruptingThread = new Thread(new InterruptingThread(t0mergeManager.onDiskMerger));
            interruptingThread.start();
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            t0mergeManager.onDiskMerger.startMerge((Set)Sets.newHashSet(t0MergeFiles));
            t0mergeManager.onDiskMerger.waitForMerge();
            Assert.assertNotEquals((long)1L, (long)t0mergeManager.onDiskMapOutputs.size());
        }
        if (!interruptInMiddle) {
            t1MapOutput0.commit();
            t1MapOutput1.commit();
            ((MergeManager)Mockito.verify((Object)t1mergeManager)).closeOnDiskFile(t1MapOutput0.getOutputPath());
            ((MergeManager)Mockito.verify((Object)t1mergeManager)).closeOnDiskFile(t1MapOutput1.getOutputPath());
            LinkedList t1MergeFiles = new LinkedList();
            t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs);
            t1mergeManager.onDiskMapOutputs.clear();
            t1mergeManager.onDiskMerger.merge(t1MergeFiles);
            Assert.assertEquals((long)1L, (long)t1mergeManager.onDiskMapOutputs.size());
            Assert.assertNotEquals((Object)((FileChunk)t0mergeManager.onDiskMapOutputs.iterator().next()).getPath(), (Object)((FileChunk)t1mergeManager.onDiskMapOutputs.iterator().next()).getPath());
            Assert.assertTrue((boolean)((FileChunk)t0mergeManager.onDiskMapOutputs.iterator().next()).getPath().toString().contains(t0inputContext.getUniqueIdentifier()));
            Assert.assertTrue((boolean)((FileChunk)t1mergeManager.onDiskMapOutputs.iterator().next()).getPath().toString().contains(t1inputContext.getUniqueIdentifier()));
        }
    }

    private InputContext createMockInputContext(String uniqueId) {
        return this.createMockInputContext(uniqueId, 0xC800000L);
    }

    private InputContext createMockInputContext(String uniqueId, long mem) {
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ((InputContext)Mockito.doReturn((Object)new TezCounters()).when((Object)inputContext)).getCounters();
        ((InputContext)Mockito.doReturn((Object)mem).when((Object)inputContext)).getTotalMemoryAvailableToTask();
        ((InputContext)Mockito.doReturn((Object)"srcVertexName").when((Object)inputContext)).getSourceVertexName();
        ((InputContext)Mockito.doReturn((Object)uniqueId).when((Object)inputContext)).getUniqueIdentifier();
        return inputContext;
    }

    private SrcFileInfo createFile(Configuration conf, FileSystem fs, Path path, int numPartitions, int numKeysPerPartition, int startKey) throws IOException {
        FSDataOutputStream outStream = fs.create(path);
        int currentKey = startKey;
        SrcFileInfo srcFileInfo = new SrcFileInfo();
        SrcFileInfo.access$102(srcFileInfo, new TezIndexRecord[numPartitions]);
        srcFileInfo.path = path;
        for (int i = 0; i < numPartitions; ++i) {
            long pos = outStream.getPos();
            IFile.Writer writer = new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
            for (int j = 0; j < numKeysPerPartition; ++j) {
                writer.append((Object)new IntWritable(currentKey), (Object)new IntWritable(currentKey));
                ++currentKey;
            }
            writer.close();
            ((SrcFileInfo)srcFileInfo).indexedRecords[i] = new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
        }
        outStream.close();
        return srcFileInfo;
    }

    private static MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId, Path filename, TezIndexRecord indexRecord, MergeManager merger) throws IOException {
        return MapOutput.createLocalDiskMapOutput((InputAttemptIdentifier)srcAttemptId, (FetchedInputAllocatorOrderedGrouped)merger, (Path)filename, (long)indexRecord.getStartOffset(), (long)indexRecord.getPartLength(), (boolean)true);
    }

    static {
        try {
            defaultConf.set("fs.defaultFS", "file:///");
            localFs = FileSystem.getLocal((Configuration)defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestMergeManager.class.getSimpleName());
            workDir = localFs.makeQualified(workDir);
            localFs.mkdirs(workDir);
            LOG.info("Using workDir: " + workDir);
        }
        catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }

    private class SrcFileInfo {
        private Path path;
        private TezIndexRecord[] indexedRecords;

        private SrcFileInfo() {
        }

        static /* synthetic */ TezIndexRecord[] access$102(SrcFileInfo x0, TezIndexRecord[] x1) {
            x0.indexedRecords = x1;
            return x1;
        }
    }

    class InterruptingThread
    implements Runnable {
        MergeManager.OnDiskMerger mergeThread;

        public InterruptingThread(MergeManager.OnDiskMerger mergeThread) {
            this.mergeThread = mergeThread;
        }

        @Override
        public void run() {
            while (this.mergeThread.tmpDir == null) {
            }
            this.mergeThread.interrupt();
        }
    }
}

