/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.sort.impl.dflt;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestDefaultSorter {
    private Configuration conf;
    private static final int PORT = 80;
    private static final String UniqueID = "UUID";
    private static FileSystem localFs = null;
    private static Path workingDir = null;

    @Before
    public void setup() throws IOException {
        this.conf = new Configuration();
        this.conf.set("tez.runtime.sorter.class", OrderedPartitionedKVOutputConfig.SorterImpl.LEGACY.name());
        this.conf.set("fs.defaultFS", "file:///");
        localFs = FileSystem.getLocal((Configuration)this.conf);
        workingDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestDefaultSorter.class.getName()).makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
        String localDirs = workingDir.toString();
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", Text.class.getName());
        this.conf.set("tez.runtime.partitioner.class", HashPartitioner.class.getName());
        this.conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDirs});
    }

    @AfterClass
    public static void cleanup() throws IOException {
        localFs.delete(workingDir, true);
    }

    @After
    public void reset() throws IOException {
        TestDefaultSorter.cleanup();
        localFs.mkdirs(workingDir);
    }

    @Test(timeout=5000L)
    public void testSortSpillPercent() throws Exception {
        OutputContext context = this.createTezOutputContext();
        this.conf.setFloat("tez.runtime.sort.spill.percent", 0.0f);
        try {
            new DefaultSorter(context, this.conf, 10, 0xA00000L);
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("tez.runtime.sort.spill.percent"));
        }
        this.conf.setFloat("tez.runtime.sort.spill.percent", 1.1f);
        try {
            new DefaultSorter(context, this.conf, 10, 0xA00000L);
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("tez.runtime.sort.spill.percent"));
        }
    }

    @Test
    @Ignore
    public void testSortLimitsWithSmallRecord() throws IOException {
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", NullWritable.class.getName());
        OutputContext context = this.createTezOutputContext();
        ((OutputContext)Mockito.doReturn((Object)0xAF000000L).when((Object)context)).getTotalMemoryAvailableToTask();
        this.conf.setInt("tez.runtime.io.sort.mb", 2047);
        context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)new MemoryUpdateCallbackHandler());
        DefaultSorter sorter = new DefaultSorter(context, this.conf, 2, 0x7FF00000L);
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", Text.class.getName());
        int i = 0;
        while (true) {
            Text key = new Text(i + "");
            sorter.write((Object)key, (Object)NullWritable.get());
            i = (i + 1) % 10;
        }
    }

    @Test
    @Ignore
    public void testSortLimitsWithLargeRecords() throws IOException {
        OutputContext context = this.createTezOutputContext();
        ((OutputContext)Mockito.doReturn((Object)0xAF000000L).when((Object)context)).getTotalMemoryAvailableToTask();
        this.conf.setInt("tez.runtime.io.sort.mb", 2047);
        context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)new MemoryUpdateCallbackHandler());
        DefaultSorter sorter = new DefaultSorter(context, this.conf, 2, 0x7FF00000L);
        int i = 0;
        while (true) {
            Text key = new Text(i + "");
            int valSize = ThreadLocalRandom.current().nextInt(0x100000, 0x6400000);
            String val = StringInterner.weakIntern((String)StringUtils.repeat((String)"v", (int)valSize));
            sorter.write((Object)key, (Object)new Text(val));
            i = (i + 1) % 10;
        }
    }

    @Test(timeout=5000L)
    public void testSortMBLimits() throws Exception {
        Assert.assertTrue((String)"Expected 1800", (DefaultSorter.computeSortBufferSize((int)4096, (String)"") == 1800 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Expected 1800", (DefaultSorter.computeSortBufferSize((int)2047, (String)"") == 1800 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Expected 1024", (DefaultSorter.computeSortBufferSize((int)1024, (String)"") == 1024 ? 1 : 0) != 0);
        try {
            DefaultSorter.computeSortBufferSize((int)0, (String)"");
            Assert.fail((String)"Should have thrown error for setting buffer size to 0");
        }
        catch (RuntimeException re) {
            // empty catch block
        }
        try {
            DefaultSorter.computeSortBufferSize((int)-100, (String)"");
            Assert.fail((String)"Should have thrown error for setting buffer size to negative value");
        }
        catch (RuntimeException runtimeException) {
            // empty catch block
        }
    }

    @Test(timeout=30000L)
    public void basicTest() throws IOException {
        OutputContext context = this.createTezOutputContext();
        MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
        try {
            this.conf.setInt("tez.runtime.io.sort.mb", 300);
            context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)new MemoryUpdateCallbackHandler());
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("tez.runtime.io.sort.mb"));
        }
        this.conf.setLong("tez.runtime.io.sort.mb", 1L);
        context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)handler);
        DefaultSorter sorter = new DefaultSorter(context, this.conf, 5, handler.getMemoryAssigned());
        try {
            this.writeData((ExternalSorter)sorter, 1000, 1000);
            Assert.assertTrue((sorter.getNumSpills() > 2 ? 1 : 0) != 0);
            this.verifyCounters(sorter, context);
        }
        catch (IOException ioe) {
            Assert.fail((String)ioe.getMessage());
        }
    }

    @Test(timeout=30000L)
    public void testWithEmptyData() throws IOException {
        OutputContext context = this.createTezOutputContext();
        this.conf.setLong("tez.runtime.io.sort.mb", 1L);
        MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
        context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)handler);
        DefaultSorter sorter = new DefaultSorter(context, this.conf, 1, handler.getMemoryAssigned());
        try {
            sorter.flush();
            sorter.close();
            Assert.assertTrue((boolean)sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID));
            this.verifyCounters(sorter, context);
        }
        catch (Exception e) {
            Assert.fail();
        }
    }

    @Test(timeout=30000L)
    public void testWithEmptyDataWithFinalMergeDisabled() throws IOException {
        OutputContext context = this.createTezOutputContext();
        this.conf.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        this.conf.setLong("tez.runtime.io.sort.mb", 1L);
        MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
        context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)handler);
        DefaultSorter sorter = new DefaultSorter(context, this.conf, 5, handler.getMemoryAssigned());
        try {
            sorter.flush();
            sorter.close();
            Assert.assertTrue((boolean)sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase("UUID_0"));
            this.verifyCounters(sorter, context);
        }
        catch (Exception e) {
            Assert.fail();
        }
    }

    void testPartitionStats(boolean withStats) throws IOException {
        this.conf.setBoolean("tez.runtime.report.partition.stats", withStats);
        OutputContext context = this.createTezOutputContext();
        this.conf.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        this.conf.setLong("tez.runtime.io.sort.mb", 4L);
        MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
        context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)handler);
        DefaultSorter sorter = new DefaultSorter(context, this.conf, 1, handler.getMemoryAssigned());
        this.writeData((ExternalSorter)sorter, 1000, 10);
        Assert.assertTrue((sorter.getNumSpills() == 1 ? 1 : 0) != 0);
        this.verifyCounters(sorter, context);
        if (withStats) {
            Assert.assertTrue((sorter.getPartitionStats() != null ? 1 : 0) != 0);
        } else {
            Assert.assertTrue((sorter.getPartitionStats() == null ? 1 : 0) != 0);
        }
    }

    @Test(timeout=60000L)
    public void testWithPartitionStats() throws IOException {
        this.testPartitionStats(true);
    }

    @Test(timeout=60000L)
    public void testWithoutPartitionStats() throws IOException {
        this.testPartitionStats(false);
    }

    @Test(timeout=60000L)
    public void testWithSingleSpillWithFinalMergeDisabled() throws IOException {
        OutputContext context = this.createTezOutputContext();
        this.conf.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        this.conf.setLong("tez.runtime.io.sort.mb", 4L);
        MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
        context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)handler);
        DefaultSorter sorter = new DefaultSorter(context, this.conf, 1, handler.getMemoryAssigned());
        this.writeData((ExternalSorter)sorter, 1000, 10);
        Assert.assertTrue((sorter.getNumSpills() == 1 ? 1 : 0) != 0);
        ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(List.class);
        ((OutputContext)Mockito.verify((Object)context, (VerificationMode)VerificationModeFactory.times((int)1))).sendEvents((List)eventCaptor.capture());
        List events = (List)eventCaptor.getValue();
        for (Event event : events) {
            if (!(event instanceof CompositeDataMovementEvent)) continue;
            CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)event;
            ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
            Assert.assertTrue((boolean)shufflePayload.getPathComponent().equalsIgnoreCase("UUID_0"));
        }
        this.verifyCounters(sorter, context);
    }

    @Test(timeout=60000L)
    public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
        OutputContext context = this.createTezOutputContext();
        this.conf.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        this.conf.setLong("tez.runtime.io.sort.mb", 4L);
        this.conf.setInt("tez.runtime.index.cache.memory.limit.bytes", 1);
        MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
        context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement((Configuration)this.conf, (long)context.getTotalMemoryAvailableToTask()), (MemoryUpdateCallback)handler);
        DefaultSorter sorter = new DefaultSorter(context, this.conf, 1, handler.getMemoryAssigned());
        this.writeData((ExternalSorter)sorter, 10000, 1000);
        int spillCount = sorter.getNumSpills();
        ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(List.class);
        ((OutputContext)Mockito.verify((Object)context, (VerificationMode)VerificationModeFactory.times((int)1))).sendEvents((List)eventCaptor.capture());
        List events = (List)eventCaptor.getValue();
        int spillIndex = 0;
        for (Event event : events) {
            if (!(event instanceof CompositeDataMovementEvent)) continue;
            CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)event;
            ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
            Assert.assertTrue((boolean)shufflePayload.getPathComponent().equalsIgnoreCase("UUID_" + spillIndex));
            ++spillIndex;
        }
        Assert.assertTrue((spillIndex == spillCount ? 1 : 0) != 0);
        this.verifyCounters(sorter, context);
    }

    private void verifyCounters(DefaultSorter sorter, OutputContext context) {
        TezCounter finalOutputBytes;
        TezCounter numShuffleChunks = context.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_CHUNK_COUNT);
        TezCounter additionalSpills = context.getCounters().findCounter((Enum)TaskCounter.ADDITIONAL_SPILL_COUNT);
        TezCounter additionalSpillBytesWritten = context.getCounters().findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        TezCounter additionalSpillBytesRead = context.getCounters().findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        if (sorter.isFinalMergeEnabled()) {
            Assert.assertTrue((additionalSpills.getValue() == (long)(sorter.getNumSpills() - 1) ? 1 : 0) != 0);
            Assert.assertTrue((1L == numShuffleChunks.getValue() ? 1 : 0) != 0);
            if (sorter.getNumSpills() > 1) {
                Assert.assertTrue((additionalSpillBytesRead.getValue() > 0L ? 1 : 0) != 0);
                Assert.assertTrue((additionalSpillBytesWritten.getValue() > 0L ? 1 : 0) != 0);
            }
        } else {
            Assert.assertTrue((0L == additionalSpills.getValue() ? 1 : 0) != 0);
            Assert.assertTrue(((long)sorter.getNumSpills() == numShuffleChunks.getValue() ? 1 : 0) != 0);
            Assert.assertTrue((additionalSpillBytesRead.getValue() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((additionalSpillBytesWritten.getValue() == 0L ? 1 : 0) != 0);
        }
        Assert.assertTrue(((finalOutputBytes = context.getCounters().findCounter((Enum)TaskCounter.OUTPUT_BYTES_PHYSICAL)).getValue() > 0L ? 1 : 0) != 0);
        TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter((Enum)TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
        Assert.assertTrue((outputBytesWithOverheadCounter.getValue() > 0L ? 1 : 0) != 0);
        ((OutputContext)Mockito.verify((Object)context, (VerificationMode)Mockito.atLeastOnce())).notifyProgress();
    }

    private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {
        for (int i = 0; i < numKeys; ++i) {
            Text key = new Text(RandomStringUtils.randomAlphanumeric((int)keyLen));
            Text value = new Text(RandomStringUtils.randomAlphanumeric((int)keyLen));
            sorter.write((Object)key, (Object)value);
        }
        sorter.flush();
        sorter.close();
    }

    private OutputContext createTezOutputContext() throws IOException {
        String[] workingDirs = new String[]{workingDir.toString()};
        UserPayload payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
        serviceProviderMetaData.writeInt(80);
        TezCounters counters = new TezCounters();
        OutputContext context = (OutputContext)Mockito.mock(OutputContext.class);
        ExecutionContextImpl execContext = new ExecutionContextImpl("localhost");
        ((OutputContext)Mockito.doReturn((Object)Mockito.mock(OutputStatisticsReporter.class)).when((Object)context)).getStatisticsReporter();
        ((OutputContext)Mockito.doReturn((Object)execContext).when((Object)context)).getExecutionContext();
        ((OutputContext)Mockito.doReturn((Object)counters).when((Object)context)).getCounters();
        ((OutputContext)Mockito.doReturn((Object)workingDirs).when((Object)context)).getWorkDirs();
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        ((OutputContext)Mockito.doReturn((Object)0x500000L).when((Object)context)).getTotalMemoryAvailableToTask();
        ((OutputContext)Mockito.doReturn((Object)UniqueID).when((Object)context)).getUniqueIdentifier();
        ((OutputContext)Mockito.doReturn((Object)"v1").when((Object)context)).getDestinationVertexName();
        ((OutputContext)Mockito.doReturn((Object)ByteBuffer.wrap(serviceProviderMetaData.getData())).when((Object)context)).getServiceProviderMetaData("mapreduce_shuffle");
        ((OutputContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                long requestedSize = (Long)invocation.getArguments()[0];
                MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler)invocation.getArguments()[1];
                callback.memoryAssigned(requestedSize);
                return null;
            }
        }).when((Object)context)).requestInitialMemory(Matchers.anyLong(), (MemoryUpdateCallback)Matchers.any(MemoryUpdateCallback.class));
        return context;
    }
}

