/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.writers;

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class TestUnorderedPartitionedKVWriter {
    private static final Logger LOG = LoggerFactory.getLogger(TestUnorderedPartitionedKVWriter.class);
    private static final String HOST_STRING = "localhost";
    private static final int SHUFFLE_PORT = 4000;
    private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
    private static final Path TEST_ROOT_DIR = new Path(testTmpDir, TestUnorderedPartitionedKVWriter.class.getSimpleName());
    private static FileSystem localFs;
    private boolean shouldCompress;
    private TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats;
    private Configuration defaultConf = new Configuration();

    public TestUnorderedPartitionedKVWriter(boolean shouldCompress, TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats) {
        this.shouldCompress = shouldCompress;
        this.reportPartitionStats = reportPartitionStats;
    }

    @Parameterized.Parameters(name="test[{0}, {1}]")
    public static Collection<Object[]> data() {
        Object[][] data = new Object[][]{{false, TezRuntimeConfiguration.ReportPartitionStats.DISABLED}, {false, TezRuntimeConfiguration.ReportPartitionStats.ENABLED}, {false, TezRuntimeConfiguration.ReportPartitionStats.NONE}, {false, TezRuntimeConfiguration.ReportPartitionStats.MEMORY_OPTIMIZED}, {false, TezRuntimeConfiguration.ReportPartitionStats.PRECISE}, {true, TezRuntimeConfiguration.ReportPartitionStats.DISABLED}, {true, TezRuntimeConfiguration.ReportPartitionStats.ENABLED}, {true, TezRuntimeConfiguration.ReportPartitionStats.NONE}, {true, TezRuntimeConfiguration.ReportPartitionStats.MEMORY_OPTIMIZED}, {true, TezRuntimeConfiguration.ReportPartitionStats.PRECISE}};
        return Arrays.asList(data);
    }

    @Before
    public void setup() throws IOException {
        LOG.info("Setup. Using test dir: " + TEST_ROOT_DIR);
        localFs = FileSystem.getLocal((Configuration)new Configuration());
        localFs.delete(TEST_ROOT_DIR, true);
        localFs.mkdirs(TEST_ROOT_DIR);
    }

    @After
    public void cleanup() throws IOException {
        LOG.info("CleanUp");
        localFs.delete(TEST_ROOT_DIR, true);
    }

    @Test(timeout=10000L)
    public void testBufferSizing() throws IOException {
        ApplicationId appId = ApplicationId.newInstance((long)10000000L, (int)1);
        TezCounters counters = new TezCounters();
        String uniqueId = UUID.randomUUID().toString();
        String auxiliaryService = this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        OutputContext outputContext = this.createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
        int maxSingleBufferSizeBytes = 2047;
        long sizePerBuffer = 1980L;
        Configuration conf = this.createConfiguration(outputContext, IntWritable.class, LongWritable.class, false, 2047);
        int numOutputs = 10;
        UnorderedPartitionedKVWriterForTest kvWriter = null;
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 2048L);
        Assert.assertEquals((long)2L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1024L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1024L, (long)kvWriter.lastBufferSize);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        Assert.assertEquals((long)1L, (long)kvWriter.spillLimit);
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 6141L);
        Assert.assertEquals((long)3L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1980L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1980L, (long)kvWriter.lastBufferSize);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        Assert.assertEquals((long)1L, (long)kvWriter.spillLimit);
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 5117L);
        Assert.assertEquals((long)2L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1980L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1980L, (long)kvWriter.lastBufferSize);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        Assert.assertEquals((long)1L, (long)kvWriter.spillLimit);
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 5118L);
        Assert.assertEquals((long)3L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1980L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1024L, (long)kvWriter.lastBufferSize);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        Assert.assertEquals((long)1L, (long)kvWriter.spillLimit);
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 8189L);
        Assert.assertEquals((long)4L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1980L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1980L, (long)kvWriter.lastBufferSize);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        Assert.assertEquals((long)1L, (long)kvWriter.spillLimit);
        conf.setInt("tez.runtime.unordered-partitioned-kvwriter.buffer-merge-percent", 50);
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 8189L);
        Assert.assertEquals((long)4L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1980L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1980L, (long)kvWriter.lastBufferSize);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        Assert.assertEquals((long)2L, (long)kvWriter.spillLimit);
        conf.unset("tez.runtime.unordered.output.max-per-buffer.size-bytes");
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 2048L);
        Assert.assertEquals((long)2L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1024L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1024L, (long)kvWriter.lastBufferSize);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        Assert.assertEquals((long)1L, (long)kvWriter.spillLimit);
    }

    @Test(timeout=10000L)
    public void testNoSpill() throws IOException, InterruptedException {
        this.baseTest(10, 10, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout=10000L)
    public void testSingleSpill() throws IOException, InterruptedException {
        this.baseTest(50, 10, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout=10000L)
    public void testMultipleSpills() throws IOException, InterruptedException {
        this.baseTest(200, 10, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout=10000L)
    public void testMultipleSpillsWithSmallBuffer() throws IOException, InterruptedException {
        this.baseTest(200, 10, null, this.shouldCompress, 512, 0, 9600, false);
    }

    @Test(timeout=10000L)
    public void testMergeBuffersAndSpill() throws IOException, InterruptedException {
        this.baseTest(200, 10, null, this.shouldCompress, 2048, 10);
    }

    @Test(timeout=10000L)
    public void testNoRecords() throws IOException, InterruptedException {
        this.baseTest(0, 10, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout=10000L)
    public void testNoRecords_SinglePartition() throws IOException, InterruptedException {
        this.baseTest(0, 1, null, this.shouldCompress, -1, 0, 2048, false);
        this.baseTest(0, 1, null, this.shouldCompress, -1, 0, 2048, true);
    }

    @Test(timeout=10000L)
    public void testSkippedPartitions() throws IOException, InterruptedException {
        this.baseTest(200, 10, Sets.newHashSet((Object[])new Integer[]{2, 5}), this.shouldCompress, -1, 0);
    }

    @Test(timeout=10000L)
    public void testNoSpill_SinglePartition() throws IOException, InterruptedException {
        this.baseTest(10, 1, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout=10000L)
    public void testSpill_SinglePartition() throws IOException, InterruptedException {
        this.baseTest(1000, 1, null, this.shouldCompress, -1, 0, 2048, true);
    }

    @Test(timeout=10000L)
    public void testRandomText() throws IOException, InterruptedException {
        this.textTest(100, 10, 2048L, 0, 0, 0, false, true);
    }

    @Test(timeout=10000L)
    public void testLargeKeys() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 10, 0, 0, false, true);
    }

    @Test(timeout=10000L)
    public void testLargevalues() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 0, 10, 0, false, true);
    }

    @Test(timeout=10000L)
    public void testLargeKvPairs() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 0, 0, 10, false, true);
    }

    @Test(timeout=10000L)
    public void testTextMixedRecords() throws IOException, InterruptedException {
        this.textTest(100, 10, 2048L, 10, 10, 10, false, true);
    }

    @Test(timeout=10000000L)
    public void testRandomTextWithoutFinalMerge() throws IOException, InterruptedException {
        this.textTest(100, 10, 2048L, 0, 0, 0, false, false);
    }

    @Test(timeout=10000L)
    public void testLargeKeysWithoutFinalMerge() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 10, 0, 0, false, false);
    }

    @Test(timeout=10000L)
    public void testLargevaluesWithoutFinalMerge() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 0, 10, 0, false, false);
    }

    @Test(timeout=10000L)
    public void testLargeKvPairsWithoutFinalMerge() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 0, 0, 10, false, false);
    }

    @Test(timeout=10000L)
    public void testTextMixedRecordsWithoutFinalMerge() throws IOException, InterruptedException {
        this.textTest(100, 10, 2048L, 10, 10, 10, false, false);
    }

    public void textTest(int numRegularRecords, int numPartitions, long availableMemory, int numLargeKeys, int numLargevalues, int numLargeKvPairs, boolean pipeliningEnabled, boolean isFinalMergeEnabled) throws IOException, InterruptedException {
        int i;
        int partition;
        Object val;
        String key;
        int i2;
        HashPartitioner partitioner = new HashPartitioner();
        ApplicationId appId = ApplicationId.newInstance((long)10000000L, (int)1);
        TezCounters counters = new TezCounters();
        String uniqueId = UUID.randomUUID().toString();
        int dagId = 1;
        String auxiliaryService = this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        OutputContext outputContext = this.createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
        Random random = new Random();
        Configuration conf = this.createConfiguration(outputContext, Text.class, Text.class, this.shouldCompress, -1, HashPartitioner.class);
        conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", pipeliningEnabled);
        conf.setBoolean("tez.runtime.enable.final-merge.in.output", isFinalMergeEnabled);
        DefaultCodec codec = null;
        if (this.shouldCompress) {
            codec = new DefaultCodec();
            ((Configurable)codec).setConf(conf);
        }
        int numRecordsWritten = 0;
        HashMap<Integer, LinkedListMultimap> expectedValues = new HashMap<Integer, LinkedListMultimap>();
        for (int i3 = 0; i3 < numPartitions; ++i3) {
            expectedValues.put(i3, LinkedListMultimap.create());
        }
        UnorderedPartitionedKVWriterForTest kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numPartitions, availableMemory);
        int sizePerBuffer = kvWriter.sizePerBuffer;
        BitSet partitionsWithData = new BitSet(numPartitions);
        Text keyText = new Text();
        Text valText = new Text();
        for (i2 = 0; i2 < numRegularRecords; ++i2) {
            key = TestUnorderedPartitionedKVWriter.createRandomString(Math.abs(random.nextInt(10)));
            val = TestUnorderedPartitionedKVWriter.createRandomString(Math.abs(random.nextInt(20)));
            keyText.set(key);
            valText.set((String)val);
            partition = partitioner.getPartition((Object)keyText, (Object)valText, numPartitions);
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)key, val);
            kvWriter.write(keyText, valText);
            ++numRecordsWritten;
        }
        for (i2 = 0; i2 < numLargeKeys; ++i2) {
            key = TestUnorderedPartitionedKVWriter.createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
            val = TestUnorderedPartitionedKVWriter.createRandomString(Math.abs(random.nextInt(20)));
            keyText.set(key);
            valText.set((String)val);
            partition = partitioner.getPartition((Object)keyText, (Object)valText, numPartitions);
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)key, val);
            kvWriter.write(keyText, valText);
            ++numRecordsWritten;
        }
        if (pipeliningEnabled) {
            ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.times((int)numLargeKeys))).sendEvents(Matchers.anyListOf(Event.class));
        }
        for (i2 = 0; i2 < numLargevalues; ++i2) {
            key = TestUnorderedPartitionedKVWriter.createRandomString(Math.abs(random.nextInt(10)));
            val = TestUnorderedPartitionedKVWriter.createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
            keyText.set(key);
            valText.set((String)val);
            partition = partitioner.getPartition((Object)keyText, (Object)valText, numPartitions);
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)key, val);
            kvWriter.write(keyText, valText);
            ++numRecordsWritten;
        }
        if (pipeliningEnabled) {
            ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.times((int)(numLargevalues + numLargeKeys)))).sendEvents(Matchers.anyListOf(Event.class));
        }
        for (i2 = 0; i2 < numLargeKvPairs; ++i2) {
            key = TestUnorderedPartitionedKVWriter.createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
            val = TestUnorderedPartitionedKVWriter.createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
            keyText.set(key);
            valText.set((String)val);
            partition = partitioner.getPartition((Object)keyText, (Object)valText, numPartitions);
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)key, val);
            kvWriter.write(keyText, valText);
            ++numRecordsWritten;
        }
        if (pipeliningEnabled) {
            ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.times((int)(numLargevalues + numLargeKeys + numLargeKvPairs)))).sendEvents(Matchers.anyListOf(Event.class));
        }
        List events = kvWriter.close();
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.never())).reportFailure((TaskFailureType)Matchers.any(TaskFailureType.class), (Throwable)Matchers.any(Throwable.class), (String)Matchers.any(String.class));
        if (!pipeliningEnabled) {
            VertexManagerEvent vmEvent = null;
            for (Event event : events) {
                if (!(event instanceof VertexManagerEvent)) continue;
                Assert.assertNull(vmEvent);
                vmEvent = (VertexManagerEvent)event;
            }
            ShuffleUserPayloads.VertexManagerEventPayloadProto vmEventPayload = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)vmEvent.getUserPayload().asReadOnlyBuffer()));
            Assert.assertEquals((long)numRecordsWritten, (long)vmEventPayload.getNumRecord());
        }
        TezCounter outputLargeRecordsCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_LARGE_RECORDS);
        Assert.assertEquals((long)(numLargeKeys + numLargevalues + numLargeKvPairs), (long)outputLargeRecordsCounter.getValue());
        if (pipeliningEnabled || !isFinalMergeEnabled) {
            for (int i4 = 0; i4 < kvWriter.numSpills.get(); ++i4) {
                Assert.assertTrue((boolean)localFs.exists(kvWriter.outputFileHandler.getSpillFileForWrite(i4, 0L)));
                Assert.assertTrue((boolean)localFs.exists(kvWriter.outputFileHandler.getSpillIndexFileForWrite(i4, 0L)));
            }
            return;
        }
        Assert.assertEquals((long)2L, (long)events.size());
        Assert.assertTrue((boolean)(events.get(0) instanceof VertexManagerEvent));
        VertexManagerEvent vme = (VertexManagerEvent)events.get(0);
        this.verifyPartitionStats(vme, partitionsWithData);
        Assert.assertTrue((boolean)(events.get(1) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(1);
        Assert.assertEquals((long)0L, (long)cdme.getSourceIndexStart());
        Assert.assertEquals((long)numPartitions, (long)cdme.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto eventProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        Assert.assertFalse((boolean)eventProto.hasData());
        BitSet emptyPartitionBits = null;
        if (partitionsWithData.cardinality() != numPartitions) {
            Assert.assertTrue((boolean)eventProto.hasEmptyPartitions());
            byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)eventProto.getEmptyPartitions());
            emptyPartitionBits = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
            Assert.assertEquals((long)(numPartitions - partitionsWithData.cardinality()), (long)emptyPartitionBits.cardinality());
        } else {
            Assert.assertFalse((boolean)eventProto.hasEmptyPartitions());
            emptyPartitionBits = new BitSet(numPartitions);
        }
        Assert.assertEquals((Object)HOST_STRING, (Object)eventProto.getHost());
        Assert.assertEquals((long)4000L, (long)eventProto.getPort());
        Assert.assertEquals((Object)uniqueId, (Object)eventProto.getPathComponent());
        TezTaskOutputFiles taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
        Path outputFilePath = kvWriter.finalOutPath;
        Path spillFilePath = kvWriter.finalIndexPath;
        if (numRecordsWritten <= 0) {
            return;
        }
        Assert.assertTrue((boolean)localFs.exists(outputFilePath));
        Assert.assertTrue((boolean)localFs.exists(spillFilePath));
        this.checkPermissions(outputFilePath, spillFilePath);
        TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
        DataInputBuffer keyBuffer = new DataInputBuffer();
        DataInputBuffer valBuffer = new DataInputBuffer();
        Text keyDeser = new Text();
        Text valDeser = new Text();
        for (i = 0; i < numPartitions; ++i) {
            if (emptyPartitionBits.get(i)) continue;
            TezIndexRecord indexRecord = spillRecord.getIndex(i);
            FSDataInputStream inStream = FileSystem.getLocal((Configuration)conf).open(outputFilePath);
            inStream.seek(indexRecord.getStartOffset());
            IFile.Reader reader = new IFile.Reader((InputStream)inStream, indexRecord.getPartLength(), (CompressionCodec)codec, null, null, false, 0, -1);
            while (reader.nextRawKey(keyBuffer)) {
                reader.nextRawValue(valBuffer);
                keyDeser.readFields((DataInput)keyBuffer);
                valDeser.readFields((DataInput)valBuffer);
                int partition2 = partitioner.getPartition((Object)keyDeser, (Object)valDeser, numPartitions);
                Assert.assertTrue((boolean)((Multimap)expectedValues.get(partition2)).remove((Object)keyDeser.toString(), (Object)valDeser.toString()));
            }
            inStream.close();
        }
        for (i = 0; i < numPartitions; ++i) {
            Assert.assertEquals((long)0L, (long)((Multimap)expectedValues.get(i)).size());
            expectedValues.remove(i);
        }
        Assert.assertEquals((long)0L, (long)expectedValues.size());
    }

    private int[] getPartitionStats(VertexManagerEvent vme) throws IOException {
        RoaringBitmap partitionStats = new RoaringBitmap();
        ShuffleUserPayloads.VertexManagerEventPayloadProto payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)vme.getUserPayload()));
        if (!this.reportPartitionStats.isEnabled()) {
            Assert.assertFalse((boolean)payload.hasPartitionStats());
            Assert.assertFalse((boolean)payload.hasDetailedPartitionStats());
            return null;
        }
        if (this.reportPartitionStats.isPrecise()) {
            Assert.assertTrue((boolean)payload.hasDetailedPartitionStats());
            List sizeInMBList = payload.getDetailedPartitionStats().getSizeInMbList();
            int[] stats = new int[sizeInMBList.size()];
            for (int i = 0; i < sizeInMBList.size(); ++i) {
                int n = i;
                stats[n] = stats[n] + (Integer)sizeInMBList.get(i);
            }
            return stats;
        }
        Assert.assertTrue((boolean)payload.hasPartitionStats());
        ByteString compressedPartitionStats = payload.getPartitionStats();
        byte[] rawData = TezCommonUtils.decompressByteStringToByteArray((ByteString)compressedPartitionStats);
        ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
        partitionStats.deserialize((DataInput)new DataInputStream(bin));
        int[] stats = new int[partitionStats.getCardinality()];
        Iterator it = partitionStats.iterator();
        DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
        int RANGE_LEN = RANGES.length;
        while (it.hasNext()) {
            int pos = (Integer)it.next();
            int index = pos / RANGE_LEN;
            int rangeIndex = pos % RANGE_LEN;
            if (RANGES[rangeIndex].getSizeInMB() <= 0) continue;
            int n = index;
            stats[n] = stats[n] + RANGES[rangeIndex].getSizeInMB();
        }
        return stats;
    }

    private void verifyPartitionStats(VertexManagerEvent vme, BitSet expectedPartitionsWithData) throws IOException {
        int[] stats = this.getPartitionStats(vme);
        if (stats == null) {
            return;
        }
        for (int i = 0; i < stats.length; ++i) {
            Assert.assertTrue((expectedPartitionsWithData.get(i) == stats[i] > 0 ? 1 : 0) != 0);
        }
    }

    @Test(timeout=10000L)
    public void testNoSpill_WithPipelinedShuffle() throws IOException, InterruptedException {
        this.baseTestWithPipelinedTransfer(10, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testSingleSpill_WithPipelinedShuffle() throws IOException, InterruptedException {
        this.baseTestWithPipelinedTransfer(50, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testMultipleSpills_WithPipelinedShuffle() throws IOException, InterruptedException {
        this.baseTestWithPipelinedTransfer(200, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testNoRecords_WithPipelinedShuffle() throws IOException, InterruptedException {
        this.baseTestWithPipelinedTransfer(0, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testNoRecords_SinglePartition_WithPipelinedShuffle() throws IOException, InterruptedException {
        this.baseTestWithPipelinedTransfer(0, 1, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testSkippedPartitions_WithPipelinedShuffle() throws IOException, InterruptedException {
        this.baseTestWithPipelinedTransfer(200, 10, Sets.newHashSet((Object[])new Integer[]{2, 5}), this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testLargeKvPairs_WithPipelinedShuffle() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 10, 20, 50, true, false);
    }

    private void baseTestWithPipelinedTransfer(int numRecords, int numPartitions, Set<Integer> skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException {
        PartitionerForTest partitioner = new PartitionerForTest();
        ApplicationId appId = ApplicationId.newInstance((long)10000000L, (int)1);
        TezCounters counters = new TezCounters();
        String uniqueId = UUID.randomUUID().toString();
        int dagId = 1;
        String auxiliaryService = this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        OutputContext outputContext = this.createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
        Configuration conf = this.createConfiguration(outputContext, IntWritable.class, LongWritable.class, shouldCompress, -1);
        conf.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        DefaultCodec codec = null;
        if (shouldCompress) {
            codec = new DefaultCodec();
            ((Configurable)codec).setConf(conf);
        }
        int numOutputs = numPartitions;
        long availableMemory = 2048L;
        int numRecordsWritten = 0;
        UnorderedPartitionedKVWriterForTest kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, availableMemory);
        int sizePerBuffer = kvWriter.sizePerBuffer;
        int sizePerRecord = 12;
        int sizePerRecordWithOverhead = sizePerRecord + 12;
        BitSet partitionsWithData = new BitSet(numPartitions);
        IntWritable intWritable = new IntWritable();
        LongWritable longWritable = new LongWritable();
        for (int i = 0; i < numRecords; ++i) {
            intWritable.set(i);
            longWritable.set((long)i);
            int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
            if (skippedPartitions != null && skippedPartitions.contains(partition)) continue;
            partitionsWithData.set(partition);
            kvWriter.write(intWritable, longWritable);
            ++numRecordsWritten;
        }
        int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
        int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
        ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(List.class);
        List lastEvents = kvWriter.close();
        if (numPartitions == 1) {
            Assert.assertEquals((Object)false, (Object)kvWriter.skipBuffers);
        }
        Assert.assertTrue((lastEvents.size() == 0 ? 1 : 0) != 0);
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.atLeast((int)numExpectedSpills))).sendEvents((List)eventCaptor.capture());
        int numOfCapturedEvents = eventCaptor.getAllValues().size();
        lastEvents = (List)eventCaptor.getAllValues().get(numOfCapturedEvents - 1);
        VertexManagerEvent VMEvent = (VertexManagerEvent)lastEvents.get(0);
        for (int i = 0; i < numOfCapturedEvents; ++i) {
            List events = (List)eventCaptor.getAllValues().get(i);
            if (i < numOfCapturedEvents - 1) {
                Assert.assertTrue((events.size() == 1 ? 1 : 0) != 0);
                Assert.assertTrue((boolean)(events.get(0) instanceof CompositeDataMovementEvent));
                continue;
            }
            Assert.assertTrue((events.size() == 2 ? 1 : 0) != 0);
            Assert.assertTrue((boolean)(events.get(0) instanceof VertexManagerEvent));
            Assert.assertTrue((boolean)(events.get(1) instanceof CompositeDataMovementEvent));
        }
        this.verifyPartitionStats(VMEvent, partitionsWithData);
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.never())).reportFailure((TaskFailureType)Matchers.any(TaskFailureType.class), (Throwable)Matchers.any(Throwable.class), (String)Matchers.any(String.class));
        Assert.assertNull((Object)kvWriter.currentBuffer);
        Assert.assertEquals((long)0L, (long)kvWriter.availableBuffers.size());
        TezCounter outputRecordBytesCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES);
        TezCounter outputRecordsCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_RECORDS);
        TezCounter outputBytesWithOverheadCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
        TezCounter fileOutputBytesCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES_PHYSICAL);
        TezCounter spilledRecordsCounter = counters.findCounter((Enum)TaskCounter.SPILLED_RECORDS);
        TezCounter additionalSpillBytesWritternCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        TezCounter additionalSpillBytesReadCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        TezCounter numAdditionalSpillsCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILL_COUNT);
        Assert.assertEquals((long)(numRecordsWritten * sizePerRecord), (long)outputRecordBytesCounter.getValue());
        Assert.assertEquals((long)numRecordsWritten, (long)outputRecordsCounter.getValue());
        Assert.assertEquals((long)(numRecordsWritten * sizePerRecordWithOverhead), (long)outputBytesWithOverheadCounter.getValue());
        long fileOutputBytes = fileOutputBytesCounter.getValue();
        if (numRecordsWritten > 0) {
            Assert.assertTrue((fileOutputBytes > 0L ? 1 : 0) != 0);
            if (!shouldCompress) {
                Assert.assertTrue((fileOutputBytes > outputRecordBytesCounter.getValue() ? 1 : 0) != 0);
            }
        } else {
            Assert.assertEquals((long)0L, (long)fileOutputBytes);
        }
        Assert.assertTrue(((long)(recordsPerBuffer * numExpectedSpills) >= spilledRecordsCounter.getValue() ? 1 : 0) != 0);
        long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
        long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
        Assert.assertEquals((long)additionalSpillBytesWritten, (long)0L);
        Assert.assertTrue((additionalSpillBytesWritten == additionalSpillBytesRead ? 1 : 0) != 0);
        Assert.assertEquals((long)numAdditionalSpillsCounter.getValue(), (long)0L);
        Assert.assertTrue((lastEvents.size() > 0 ? 1 : 0) != 0);
        int index = lastEvents.size() - 1;
        Assert.assertTrue((boolean)(lastEvents.get(index) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)lastEvents.get(index);
        Assert.assertEquals((long)0L, (long)cdme.getSourceIndexStart());
        Assert.assertEquals((long)numOutputs, (long)cdme.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto eventProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        Assert.assertTrue((boolean)eventProto.getLastEvent());
        this.verifyEmptyPartitions(eventProto, numRecordsWritten, numPartitions, skippedPartitions);
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.atLeast((int)1))).notifyProgress();
        TezTaskOutputFiles taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
        if (numRecordsWritten > 0) {
            int numSpills = kvWriter.numSpills.get();
            for (int i = 0; i < numSpills; ++i) {
                Path outputFile = taskOutput.getSpillFileForWrite(i, 10L);
                Path indexFile = taskOutput.getSpillIndexFileForWrite(i, 10L);
                Assert.assertTrue((boolean)localFs.exists(outputFile));
                Assert.assertTrue((boolean)localFs.exists(indexFile));
                this.checkPermissions(outputFile, indexFile);
            }
        } else {
            return;
        }
    }

    private void checkPermissions(Path outputFile, Path indexFile) throws IOException {
        Assert.assertEquals((String)"Incorrect output permissions (user)", (Object)FsAction.READ_WRITE, (Object)localFs.getFileStatus(outputFile).getPermission().getUserAction());
        Assert.assertEquals((String)"Incorrect output permissions (group)", (Object)FsAction.READ, (Object)localFs.getFileStatus(outputFile).getPermission().getGroupAction());
        Assert.assertEquals((String)"Incorrect index permissions (user)", (Object)FsAction.READ_WRITE, (Object)localFs.getFileStatus(indexFile).getPermission().getUserAction());
        Assert.assertEquals((String)"Incorrect index permissions (group)", (Object)FsAction.READ, (Object)localFs.getFileStatus(indexFile).getPermission().getGroupAction());
    }

    private void verifyEmptyPartitions(ShuffleUserPayloads.DataMovementEventPayloadProto eventProto, int numRecordsWritten, int numPartitions, Set<Integer> skippedPartitions) throws IOException {
        if (eventProto.hasEmptyPartitions()) {
            byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)eventProto.getEmptyPartitions());
            BitSet emptyPartitionBits = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
            if (numRecordsWritten == 0) {
                Assert.assertEquals((long)numPartitions, (long)emptyPartitionBits.cardinality());
            } else if (skippedPartitions != null) {
                for (Integer e : skippedPartitions) {
                    Assert.assertTrue((boolean)emptyPartitionBits.get(e));
                }
                Assert.assertEquals((long)skippedPartitions.size(), (long)emptyPartitionBits.cardinality());
            }
            if (emptyPartitionBits.cardinality() != numPartitions) {
                Assert.assertEquals((Object)HOST_STRING, (Object)eventProto.getHost());
                Assert.assertEquals((long)4000L, (long)eventProto.getPort());
                Assert.assertTrue((boolean)eventProto.hasPathComponent());
            } else {
                Assert.assertFalse((boolean)eventProto.hasHost());
                Assert.assertFalse((boolean)eventProto.hasPort());
                Assert.assertFalse((boolean)eventProto.hasPathComponent());
            }
        } else {
            Assert.assertEquals((Object)HOST_STRING, (Object)eventProto.getHost());
            Assert.assertEquals((long)4000L, (long)eventProto.getPort());
            Assert.assertTrue((boolean)eventProto.hasPathComponent());
        }
    }

    @Test(timeout=10000L)
    public void testNoSpill_WithFinalMergeDisabled() throws IOException, InterruptedException {
        this.baseTestWithFinalMergeDisabled(10, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testSingleSpill_WithFinalMergeDisabled() throws IOException, InterruptedException {
        this.baseTestWithFinalMergeDisabled(50, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testSinglePartition_WithFinalMergeDisabled() throws IOException, InterruptedException {
        this.baseTestWithFinalMergeDisabled(0, 1, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testMultipleSpills_WithFinalMergeDisabled() throws IOException, InterruptedException {
        this.baseTestWithFinalMergeDisabled(200, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testNoRecords_WithFinalMergeDisabled() throws IOException, InterruptedException {
        this.baseTestWithFinalMergeDisabled(0, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testNoRecords_SinglePartition_WithFinalMergeDisabled() throws IOException, InterruptedException {
        this.baseTestWithFinalMergeDisabled(0, 1, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testSkippedPartitions_WithFinalMergeDisabled() throws IOException, InterruptedException {
        this.baseTestWithFinalMergeDisabled(200, 10, Sets.newHashSet((Object[])new Integer[]{2, 5}), this.shouldCompress);
    }

    private void baseTestWithFinalMergeDisabled(int numRecords, int numPartitions, Set<Integer> skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException {
        PartitionerForTest partitioner = new PartitionerForTest();
        ApplicationId appId = ApplicationId.newInstance((long)10000000L, (int)1);
        TezCounters counters = new TezCounters();
        String uniqueId = UUID.randomUUID().toString();
        int dagId = 1;
        String auxiliaryService = this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        OutputContext outputContext = this.createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
        Configuration conf = this.createConfiguration(outputContext, IntWritable.class, LongWritable.class, shouldCompress, -1);
        conf.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", false);
        DefaultCodec codec = null;
        if (shouldCompress) {
            codec = new DefaultCodec();
            ((Configurable)codec).setConf(conf);
        }
        int numOutputs = numPartitions;
        long availableMemory = 2048L;
        int numRecordsWritten = 0;
        UnorderedPartitionedKVWriterForTest kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, availableMemory);
        int sizePerBuffer = kvWriter.sizePerBuffer;
        int sizePerRecord = 12;
        int sizePerRecordWithOverhead = sizePerRecord + 12;
        BitSet partitionsWithData = new BitSet(numPartitions);
        IntWritable intWritable = new IntWritable();
        LongWritable longWritable = new LongWritable();
        for (int i = 0; i < numRecords; ++i) {
            intWritable.set(i);
            longWritable.set((long)i);
            int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
            if (skippedPartitions != null && skippedPartitions.contains(partition)) continue;
            partitionsWithData.set(partition);
            kvWriter.write(intWritable, longWritable);
            ++numRecordsWritten;
        }
        int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
        int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
        ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(List.class);
        List lastEvents = kvWriter.close();
        if (numPartitions == 1) {
            Assert.assertEquals((Object)true, (Object)kvWriter.skipBuffers);
        }
        int spills = Math.max(1, kvWriter.numSpills.get());
        Assert.assertEquals((long)(spills + 1), (long)lastEvents.size());
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.atMost((int)0))).sendEvents((List)eventCaptor.capture());
        for (int i = 0; i < lastEvents.size(); ++i) {
            Event event = (Event)lastEvents.get(i);
            if (!(event instanceof VertexManagerEvent) || numRecordsWritten <= 0) continue;
            this.verifyPartitionStats((VertexManagerEvent)event, partitionsWithData);
        }
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.never())).reportFailure((TaskFailureType)Matchers.any(TaskFailureType.class), (Throwable)Matchers.any(Throwable.class), (String)Matchers.any(String.class));
        Assert.assertNull((Object)kvWriter.currentBuffer);
        Assert.assertEquals((long)0L, (long)kvWriter.availableBuffers.size());
        TezCounter outputRecordBytesCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES);
        TezCounter outputRecordsCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_RECORDS);
        TezCounter outputBytesWithOverheadCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
        TezCounter fileOutputBytesCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES_PHYSICAL);
        TezCounter spilledRecordsCounter = counters.findCounter((Enum)TaskCounter.SPILLED_RECORDS);
        TezCounter additionalSpillBytesWritternCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        TezCounter additionalSpillBytesReadCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        TezCounter numAdditionalSpillsCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILL_COUNT);
        Assert.assertEquals((long)(numRecordsWritten * sizePerRecord), (long)outputRecordBytesCounter.getValue());
        Assert.assertEquals((long)numRecordsWritten, (long)outputRecordsCounter.getValue());
        if (outputRecordsCounter.getValue() > 0L) {
            Assert.assertEquals((long)(numRecordsWritten * sizePerRecordWithOverhead), (long)outputBytesWithOverheadCounter.getValue());
        } else {
            Assert.assertEquals((long)0L, (long)outputBytesWithOverheadCounter.getValue());
        }
        long fileOutputBytes = fileOutputBytesCounter.getValue();
        if (numRecordsWritten > 0) {
            Assert.assertTrue((fileOutputBytes > 0L ? 1 : 0) != 0);
            if (!shouldCompress) {
                Assert.assertTrue((String)("fileOutputBytes=" + fileOutputBytes + ", outputRecordBytes=" + outputRecordBytesCounter.getValue()), (fileOutputBytes > outputRecordBytesCounter.getValue() ? 1 : 0) != 0);
            }
        } else {
            Assert.assertEquals((long)0L, (long)fileOutputBytes);
        }
        Assert.assertTrue(((long)(recordsPerBuffer * numExpectedSpills) >= spilledRecordsCounter.getValue() ? 1 : 0) != 0);
        long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
        long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
        Assert.assertEquals((long)additionalSpillBytesWritten, (long)0L);
        Assert.assertTrue((additionalSpillBytesWritten == additionalSpillBytesRead ? 1 : 0) != 0);
        Assert.assertEquals((long)numAdditionalSpillsCounter.getValue(), (long)0L);
        Assert.assertTrue((lastEvents.size() > 0 ? 1 : 0) != 0);
        int index = lastEvents.size() - 1;
        Assert.assertTrue((boolean)(lastEvents.get(index) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)lastEvents.get(index);
        Assert.assertEquals((long)0L, (long)cdme.getSourceIndexStart());
        Assert.assertEquals((long)numOutputs, (long)cdme.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto eventProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        this.verifyEmptyPartitions(eventProto, numRecordsWritten, numPartitions, skippedPartitions);
        if (outputRecordsCounter.getValue() > 0L) {
            Assert.assertTrue((boolean)eventProto.getLastEvent());
        }
        Pattern mergePathComponentPattern = Pattern.compile("(.*)(_\\d+)");
        for (Event event : lastEvents) {
            if (!(event instanceof CompositeDataMovementEvent)) continue;
            cdme = (CompositeDataMovementEvent)event;
            eventProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
            Assert.assertEquals((Object)false, (Object)eventProto.getPipelined());
            if (eventProto.hasPathComponent()) {
                Matcher matcher = mergePathComponentPattern.matcher(eventProto.getPathComponent());
                Assert.assertTrue((String)("spill id should be present in path component " + eventProto.getPathComponent()), (boolean)matcher.matches());
                Assert.assertEquals((long)2L, (long)matcher.groupCount());
                Assert.assertEquals((Object)uniqueId, (Object)matcher.group(1));
                Assert.assertTrue((String)"spill id should be present in path component", (matcher.group(2) != null ? 1 : 0) != 0);
                Path outputPath = new Path(outputContext.getWorkDirs()[0], "output/" + eventProto.getPathComponent() + "/" + "file.out");
                Path indexPath = outputPath.suffix(".index");
                this.checkPermissions(outputPath, indexPath);
                continue;
            }
            Assert.assertEquals((long)0L, (long)eventProto.getSpillId());
            if (outputRecordsCounter.getValue() > 0L) {
                Assert.assertEquals((Object)true, (Object)eventProto.getLastEvent());
                continue;
            }
            byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)eventProto.getEmptyPartitions());
            BitSet emptyPartitionBits = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
            Assert.assertEquals((long)numPartitions, (long)emptyPartitionBits.cardinality());
        }
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.atLeast((int)1))).notifyProgress();
        TezTaskOutputFiles taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
        if (numRecordsWritten > 0) {
            int numSpills = kvWriter.numSpills.get();
            for (int i = 0; i < numSpills; ++i) {
                Assert.assertTrue((boolean)localFs.exists(taskOutput.getSpillFileForWrite(i, 10L)));
                Assert.assertTrue((boolean)localFs.exists(taskOutput.getSpillIndexFileForWrite(i, 10L)));
            }
        } else {
            return;
        }
    }

    private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions, boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent) throws IOException, InterruptedException {
        this.baseTest(numRecords, numPartitions, skippedPartitions, shouldCompress, maxSingleBufferSizeBytes, bufferMergePercent, 2048, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions, boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent, int availableMemory, boolean dataViaEventEnabled) throws IOException, InterruptedException {
        int i;
        PartitionerForTest partitioner = new PartitionerForTest();
        ApplicationId appId = ApplicationId.newInstance((long)10000000L, (int)1);
        TezCounters counters = new TezCounters();
        String uniqueId = UUID.randomUUID().toString();
        int dagId = 1;
        String auxiliaryService = this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        OutputContext outputContext = this.createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
        Configuration conf = this.createConfiguration(outputContext, IntWritable.class, LongWritable.class, shouldCompress, maxSingleBufferSizeBytes);
        conf.setInt("tez.runtime.unordered-partitioned-kvwriter.buffer-merge-percent", bufferMergePercent);
        conf.setBoolean("tez.runtime.transfer.data-via-events.enabled", dataViaEventEnabled);
        DefaultCodec codec = null;
        if (shouldCompress) {
            codec = new DefaultCodec();
            ((Configurable)codec).setConf(conf);
        }
        int numOutputs = numPartitions;
        int numRecordsWritten = 0;
        HashMap<Integer, LinkedListMultimap> expectedValues = new HashMap<Integer, LinkedListMultimap>();
        for (int i2 = 0; i2 < numOutputs; ++i2) {
            expectedValues.put(i2, LinkedListMultimap.create());
        }
        UnorderedPartitionedKVWriterForTest kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, availableMemory);
        int sizePerBuffer = kvWriter.sizePerBuffer;
        int sizePerRecord = 12;
        int sizePerRecordWithOverhead = sizePerRecord + 12;
        IntWritable intWritable = new IntWritable();
        LongWritable longWritable = new LongWritable();
        BitSet partitionsWithData = new BitSet(numPartitions);
        for (int i3 = 0; i3 < numRecords; ++i3) {
            intWritable.set(i3);
            longWritable.set((long)i3);
            int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
            if (skippedPartitions != null && skippedPartitions.contains(partition)) continue;
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)intWritable.get(), (Object)longWritable.get());
            kvWriter.write(intWritable, longWritable);
            ++numRecordsWritten;
        }
        List events = kvWriter.close();
        if (numPartitions == 1) {
            Assert.assertEquals((Object)true, (Object)kvWriter.skipBuffers);
            Assert.assertEquals((long)2L, (long)events.size());
            Event event1 = (Event)events.get(1);
            Assert.assertTrue((boolean)(event1 instanceof CompositeDataMovementEvent));
            CompositeDataMovementEvent dme = (CompositeDataMovementEvent)event1;
            ByteBuffer bb = dme.getUserPayload();
            ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)bb));
            Assert.assertEquals((long)kvWriter.outputRecordsCounter.getValue(), (long)shufflePayload.getNumRecord());
        }
        int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
        int numExpectedSpills = numRecordsWritten / recordsPerBuffer / kvWriter.spillLimit;
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.never())).reportFailure((TaskFailureType)Matchers.any(TaskFailureType.class), (Throwable)Matchers.any(Throwable.class), (String)Matchers.any(String.class));
        Assert.assertNull((Object)kvWriter.currentBuffer);
        Assert.assertEquals((long)0L, (long)kvWriter.availableBuffers.size());
        TezCounter outputRecordBytesCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES);
        TezCounter outputRecordsCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_RECORDS);
        TezCounter outputBytesWithOverheadCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
        TezCounter fileOutputBytesCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES_PHYSICAL);
        TezCounter spilledRecordsCounter = counters.findCounter((Enum)TaskCounter.SPILLED_RECORDS);
        TezCounter additionalSpillBytesWritternCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        TezCounter additionalSpillBytesReadCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        TezCounter numAdditionalSpillsCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILL_COUNT);
        Assert.assertEquals((long)(numRecordsWritten * sizePerRecord), (long)outputRecordBytesCounter.getValue());
        if (numPartitions > 1) {
            Assert.assertEquals((long)(numRecordsWritten * sizePerRecordWithOverhead), (long)outputBytesWithOverheadCounter.getValue());
        }
        Assert.assertEquals((long)numRecordsWritten, (long)outputRecordsCounter.getValue());
        long fileOutputBytes = fileOutputBytesCounter.getValue();
        if (numRecordsWritten > 0) {
            Assert.assertTrue((fileOutputBytes > 0L ? 1 : 0) != 0);
            if (!shouldCompress && !dataViaEventEnabled) {
                Assert.assertTrue((fileOutputBytes > outputRecordBytesCounter.getValue() ? 1 : 0) != 0);
            }
        } else {
            Assert.assertEquals((long)0L, (long)fileOutputBytes);
        }
        if (!dataViaEventEnabled) {
            Assert.assertEquals((long)(recordsPerBuffer * numExpectedSpills), (long)spilledRecordsCounter.getValue());
        }
        long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
        long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
        if (numExpectedSpills == 0) {
            Assert.assertEquals((long)0L, (long)additionalSpillBytesWritten);
            Assert.assertEquals((long)0L, (long)additionalSpillBytesRead);
        } else {
            Assert.assertTrue((additionalSpillBytesWritten > 0L ? 1 : 0) != 0);
            if (!dataViaEventEnabled) {
                Assert.assertTrue((additionalSpillBytesRead > 0L ? 1 : 0) != 0);
                if (!shouldCompress) {
                    Assert.assertTrue((additionalSpillBytesWritten > (long)(recordsPerBuffer * numExpectedSpills * sizePerRecord) ? 1 : 0) != 0);
                    Assert.assertTrue((additionalSpillBytesRead > (long)(recordsPerBuffer * numExpectedSpills * sizePerRecord) ? 1 : 0) != 0);
                }
            } else if (kvWriter.writer.getCompressedLength() > 512L) {
                Assert.assertTrue((additionalSpillBytesWritten > 0L ? 1 : 0) != 0);
            }
        }
        if (!dataViaEventEnabled) {
            Assert.assertEquals((long)additionalSpillBytesWritten, (long)additionalSpillBytesRead);
        }
        Assert.assertTrue(((long)numExpectedSpills >= numAdditionalSpillsCounter.getValue() ? 1 : 0) != 0);
        BitSet emptyPartitionBits = null;
        Assert.assertEquals((long)2L, (long)events.size());
        Assert.assertTrue((boolean)(events.get(0) instanceof VertexManagerEvent));
        VertexManagerEvent vme = (VertexManagerEvent)events.get(0);
        this.verifyPartitionStats(vme, partitionsWithData);
        Assert.assertTrue((boolean)(events.get(1) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(1);
        Assert.assertEquals((long)0L, (long)cdme.getSourceIndexStart());
        Assert.assertEquals((long)numOutputs, (long)cdme.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto eventProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        if (skippedPartitions == null && numRecordsWritten > 0) {
            Assert.assertFalse((boolean)eventProto.hasEmptyPartitions());
            emptyPartitionBits = new BitSet(numPartitions);
        } else {
            Assert.assertTrue((boolean)eventProto.hasEmptyPartitions());
            byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)eventProto.getEmptyPartitions());
            emptyPartitionBits = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
            if (numRecordsWritten == 0) {
                Assert.assertEquals((long)numPartitions, (long)emptyPartitionBits.cardinality());
            } else {
                for (Integer e : skippedPartitions) {
                    Assert.assertTrue((boolean)emptyPartitionBits.get(e));
                }
                Assert.assertEquals((long)skippedPartitions.size(), (long)emptyPartitionBits.cardinality());
            }
        }
        if (emptyPartitionBits.cardinality() != numPartitions) {
            Assert.assertEquals((Object)HOST_STRING, (Object)eventProto.getHost());
            Assert.assertEquals((long)4000L, (long)eventProto.getPort());
            Assert.assertEquals((Object)uniqueId, (Object)eventProto.getPathComponent());
        } else {
            Assert.assertFalse((boolean)eventProto.hasHost());
            Assert.assertFalse((boolean)eventProto.hasPort());
            Assert.assertFalse((boolean)eventProto.hasPathComponent());
        }
        TezTaskOutputFiles taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
        Path outputFilePath = kvWriter.finalOutPath;
        Path spillFilePath = kvWriter.finalIndexPath;
        if (numRecordsWritten <= 0) {
            return;
        }
        boolean isInMem = eventProto.getData().hasData();
        Assert.assertTrue((boolean)localFs.exists(outputFilePath));
        Assert.assertEquals((String)"Incorrect output permissions (user)", (Object)FsAction.READ_WRITE, (Object)localFs.getFileStatus(outputFilePath).getPermission().getUserAction());
        Assert.assertEquals((String)"Incorrect output permissions (group)", (Object)FsAction.READ, (Object)localFs.getFileStatus(outputFilePath).getPermission().getGroupAction());
        if (!isInMem) {
            Assert.assertTrue((boolean)localFs.exists(spillFilePath));
            Assert.assertEquals((String)"Incorrect index permissions (user)", (Object)FsAction.READ_WRITE, (Object)localFs.getFileStatus(spillFilePath).getPermission().getUserAction());
            Assert.assertEquals((String)"Incorrect index permissions (group)", (Object)FsAction.READ, (Object)localFs.getFileStatus(spillFilePath).getPermission().getGroupAction());
            List list = kvWriter.spillInfoList;
            synchronized (list) {
                for (UnorderedPartitionedKVWriter.SpillInfo spill : kvWriter.spillInfoList) {
                    Assert.assertFalse((String)("lingering intermediate spill file " + spill.outPath), (boolean)localFs.exists(spill.outPath));
                }
            }
        }
        DataInputBuffer keyBuffer = new DataInputBuffer();
        DataInputBuffer valBuffer = new DataInputBuffer();
        IntWritable keyDeser = new IntWritable();
        LongWritable valDeser = new LongWritable();
        for (i = 0; i < numOutputs; ++i) {
            ByteArrayInputStream inStream;
            IFile.Reader reader = null;
            if (isInMem) {
                int dataLoadSize = eventProto.getData().getData().size();
                inStream = new ByteArrayInputStream(eventProto.getData().getData().toByteArray());
                reader = new IFile.Reader((InputStream)inStream, (long)dataLoadSize, (CompressionCodec)codec, null, null, false, 0, -1);
            } else {
                TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
                TezIndexRecord indexRecord = spillRecord.getIndex(i);
                if (skippedPartitions != null && skippedPartitions.contains(i)) {
                    Assert.assertFalse((String)("The Index Record for partition " + i + " should not have any data"), (boolean)indexRecord.hasData());
                    continue;
                }
                FSDataInputStream tmpStream = FileSystem.getLocal((Configuration)conf).open(outputFilePath);
                tmpStream.seek(indexRecord.getStartOffset());
                inStream = tmpStream;
                reader = new IFile.Reader((InputStream)tmpStream, indexRecord.getPartLength(), (CompressionCodec)codec, null, null, false, 0, -1);
            }
            while (reader.nextRawKey(keyBuffer)) {
                reader.nextRawValue(valBuffer);
                keyDeser.readFields((DataInput)keyBuffer);
                valDeser.readFields((DataInput)valBuffer);
                int partition = partitioner.getPartition(keyDeser, valDeser, numOutputs);
                Assert.assertTrue((boolean)((Multimap)expectedValues.get(partition)).remove((Object)keyDeser.get(), (Object)valDeser.get()));
            }
            ((InputStream)inStream).close();
        }
        for (i = 0; i < numOutputs; ++i) {
            Assert.assertEquals((long)0L, (long)((Multimap)expectedValues.get(i)).size());
            expectedValues.remove(i);
        }
        Assert.assertEquals((long)0L, (long)expectedValues.size());
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.atLeast((int)1))).notifyProgress();
    }

    private static String createRandomString(int size) {
        StringBuilder sb = new StringBuilder(size);
        Random random = new Random();
        for (int i = 0; i < size; ++i) {
            int r = Math.abs(random.nextInt()) % 26;
            sb.append((char)(65 + r));
        }
        return sb.toString();
    }

    private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, String uniqueId, String auxiliaryService) {
        OutputContext outputContext = (OutputContext)Mockito.mock(OutputContext.class);
        ((OutputContext)Mockito.doReturn((Object)counters).when((Object)outputContext)).getCounters();
        ((OutputContext)Mockito.doReturn((Object)appId).when((Object)outputContext)).getApplicationId();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getDAGAttemptNumber();
        ((OutputContext)Mockito.doReturn((Object)"dagName").when((Object)outputContext)).getDAGName();
        ((OutputContext)Mockito.doReturn((Object)"destinationVertexName").when((Object)outputContext)).getDestinationVertexName();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getOutputIndex();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getTaskAttemptNumber();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getTaskIndex();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getTaskVertexIndex();
        ((OutputContext)Mockito.doReturn((Object)"vertexName").when((Object)outputContext)).getTaskVertexName();
        ((OutputContext)Mockito.doReturn((Object)uniqueId).when((Object)outputContext)).getUniqueIdentifier();
        ((OutputContext)Mockito.doAnswer((Answer)new Answer<ByteBuffer>(){

            public ByteBuffer answer(InvocationOnMock invocation) throws Throwable {
                ByteBuffer portBuffer = ByteBuffer.allocate(4);
                portBuffer.mark();
                portBuffer.putInt(4000);
                portBuffer.reset();
                return portBuffer;
            }
        }).when((Object)outputContext)).getServiceProviderMetaData((String)Matchers.eq((Object)auxiliaryService));
        Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
        String[] outDirs = new String[]{outDirBase.toString()};
        ((OutputContext)Mockito.doReturn((Object)outDirs).when((Object)outputContext)).getWorkDirs();
        return outputContext;
    }

    private Configuration createConfiguration(OutputContext outputContext, Class<? extends Writable> keyClass, Class<? extends Writable> valClass, boolean shouldCompress, int maxSingleBufferSizeBytes) {
        return this.createConfiguration(outputContext, keyClass, valClass, shouldCompress, maxSingleBufferSizeBytes, PartitionerForTest.class);
    }

    private Configuration createConfiguration(OutputContext outputContext, Class<? extends Writable> keyClass, Class<? extends Writable> valClass, boolean shouldCompress, int maxSingleBufferSizeBytes, Class<? extends Partitioner> partitionerClass) {
        Configuration conf = new Configuration(false);
        conf.set("fs.permissions.umask-mode", "077");
        conf.setStrings("tez.runtime.framework.local.dirs", outputContext.getWorkDirs());
        conf.set("tez.runtime.key.class", keyClass.getName());
        conf.set("tez.runtime.value.class", valClass.getName());
        conf.set("tez.runtime.partitioner.class", partitionerClass.getName());
        if (maxSingleBufferSizeBytes >= 0) {
            conf.setInt("tez.runtime.unordered.output.max-per-buffer.size-bytes", maxSingleBufferSizeBytes);
        }
        conf.setBoolean("tez.runtime.compress", shouldCompress);
        if (shouldCompress) {
            conf.set("tez.runtime.compress.codec", DefaultCodec.class.getName());
        }
        conf.set("tez.runtime.report.partition.stats", this.reportPartitionStats.getType());
        return conf;
    }

    private static class UnorderedPartitionedKVWriterForTest
    extends UnorderedPartitionedKVWriter {
        public UnorderedPartitionedKVWriterForTest(OutputContext outputContext, Configuration conf, int numOutputs, long availableMemoryBytes) throws IOException {
            super(outputContext, conf, numOutputs, availableMemoryBytes);
        }

        String getHost() {
            return TestUnorderedPartitionedKVWriter.HOST_STRING;
        }
    }

    public static class PartitionerForTest
    implements Partitioner {
        public int getPartition(Object key, Object value, int numPartitions) {
            if (key instanceof IntWritable) {
                return ((IntWritable)key).get() % numPartitions;
            }
            throw new UnsupportedOperationException("Test partitioner expected to be called with IntWritable only");
        }
    }
}

