/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.output;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(value=Parameterized.class)
public class TestOnFileSortedOutput {
    private static final Random rnd = new Random();
    private static final String UniqueID = "UUID";
    private static final String HOST = "localhost";
    private static final int PORT = 80;
    private Configuration conf;
    private FileSystem fs;
    private Path workingDir;
    private int partitions;
    private OrderedPartitionedKVOutputConfig.SorterImpl sorterImpl;
    private int sorterThreads;
    final AtomicLong outputSize = new AtomicLong();
    final AtomicLong numRecords = new AtomicLong();
    private KeyValuesWriter writer;
    private OrderedPartitionedKVOutput sortedOutput;
    private boolean sendEmptyPartitionViaEvent;
    private int emptyPartitionIdx;
    private TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats;

    public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, OrderedPartitionedKVOutputConfig.SorterImpl sorterImpl, int sorterThreads, int emptyPartitionIdx, TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats) throws IOException {
        this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent;
        this.emptyPartitionIdx = emptyPartitionIdx;
        this.sorterImpl = sorterImpl;
        this.sorterThreads = sorterThreads;
        this.reportPartitionStats = reportPartitionStats;
        this.conf = new Configuration();
        this.workingDir = new Path(".", this.getClass().getName());
        String localDirs = this.workingDir.toString();
        this.conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDirs});
        this.fs = FileSystem.getLocal((Configuration)this.conf);
    }

    @Before
    public void setup() throws Exception {
        this.conf.set("tez.runtime.sorter.class", this.sorterImpl.name());
        this.conf.setInt("tez.runtime.pipelined.sorter.sort.threads", this.sorterThreads);
        this.conf.setInt("tez.runtime.io.sort.mb", 5);
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", Text.class.getName());
        this.conf.set("tez.runtime.partitioner.class", HashPartitioner.class.getName());
        this.conf.setBoolean("tez.runtime.empty.partitions.info-via-events.enabled", this.sendEmptyPartitionViaEvent);
        this.conf.set("tez.runtime.report.partition.stats", this.reportPartitionStats.getType());
        this.outputSize.set(0L);
        this.numRecords.set(0L);
        this.fs.mkdirs(this.workingDir);
        this.partitions = Math.max(1, rnd.nextInt(10));
    }

    @After
    public void cleanup() throws IOException {
        this.fs.delete(this.workingDir, true);
    }

    @Parameterized.Parameters(name="test[{0}, {1}, {2}, {3}, {4}]")
    public static Collection<Object[]> getParameters() {
        ArrayList<Object[]> parameters = new ArrayList<Object[]>();
        parameters.add(new Object[]{false, OrderedPartitionedKVOutputConfig.SorterImpl.LEGACY, 1, -1, TezRuntimeConfiguration.ReportPartitionStats.ENABLED});
        parameters.add(new Object[]{false, OrderedPartitionedKVOutputConfig.SorterImpl.LEGACY, 1, 0, TezRuntimeConfiguration.ReportPartitionStats.ENABLED});
        parameters.add(new Object[]{true, OrderedPartitionedKVOutputConfig.SorterImpl.LEGACY, 1, -1, TezRuntimeConfiguration.ReportPartitionStats.ENABLED});
        parameters.add(new Object[]{true, OrderedPartitionedKVOutputConfig.SorterImpl.LEGACY, 1, 0, TezRuntimeConfiguration.ReportPartitionStats.ENABLED});
        parameters.add(new Object[]{true, OrderedPartitionedKVOutputConfig.SorterImpl.LEGACY, 1, 0, TezRuntimeConfiguration.ReportPartitionStats.PRECISE});
        parameters.add(new Object[]{false, OrderedPartitionedKVOutputConfig.SorterImpl.PIPELINED, 2, -1, TezRuntimeConfiguration.ReportPartitionStats.ENABLED});
        parameters.add(new Object[]{false, OrderedPartitionedKVOutputConfig.SorterImpl.PIPELINED, 2, 0, TezRuntimeConfiguration.ReportPartitionStats.ENABLED});
        parameters.add(new Object[]{true, OrderedPartitionedKVOutputConfig.SorterImpl.PIPELINED, 2, -1, TezRuntimeConfiguration.ReportPartitionStats.ENABLED});
        parameters.add(new Object[]{true, OrderedPartitionedKVOutputConfig.SorterImpl.PIPELINED, 2, 0, TezRuntimeConfiguration.ReportPartitionStats.ENABLED});
        parameters.add(new Object[]{true, OrderedPartitionedKVOutputConfig.SorterImpl.PIPELINED, 2, 0, TezRuntimeConfiguration.ReportPartitionStats.PRECISE});
        return parameters;
    }

    private void startSortedOutput(int partitions) throws Exception {
        OutputContext context = this.createTezOutputContext();
        this.conf.setInt("tez.runtime.io.sort.mb", 4);
        UserPayload payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        this.sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
        this.sortedOutput.initialize();
        this.sortedOutput.start();
        this.writer = this.sortedOutput.getWriter();
    }

    private void _testPipelinedShuffle(String sorterName) throws Exception {
        this.conf.setInt("tez.runtime.io.sort.mb", 3);
        this.conf.set("tez.runtime.sorter.class", sorterName);
        this.conf.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        this.conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        OutputContext context = this.createTezOutputContext();
        UserPayload payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        this.sortedOutput = new OrderedPartitionedKVOutput(context, this.partitions);
        this.sortedOutput.initialize();
        this.sortedOutput.start();
        Assert.assertFalse((boolean)this.sortedOutput.finalMergeEnabled);
        Assert.assertTrue((boolean)this.sortedOutput.pipelinedShuffle);
    }

    @Test(timeout=5000L)
    public void testPipelinedShuffle() throws Exception {
        this._testPipelinedShuffle(OrderedPartitionedKVOutputConfig.SorterImpl.PIPELINED.name());
    }

    @Test(timeout=5000L)
    public void testPipelinedShuffleWithFinalMerge() throws Exception {
        this.conf.setInt("tez.runtime.io.sort.mb", 3);
        this.conf.set("tez.runtime.sorter.class", OrderedPartitionedKVOutputConfig.SorterImpl.PIPELINED.name());
        this.conf.setBoolean("tez.runtime.enable.final-merge.in.output", true);
        this.conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        OutputContext context = this.createTezOutputContext();
        UserPayload payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        this.sortedOutput = new OrderedPartitionedKVOutput(context, this.partitions);
        this.sortedOutput.initialize();
        this.sortedOutput.start();
        Assert.assertFalse((boolean)this.sortedOutput.finalMergeEnabled);
        Assert.assertTrue((boolean)this.sortedOutput.pipelinedShuffle);
    }

    @Test
    public void testPipelinedSettingsWithDefaultSorter() throws Exception {
        this.conf.setInt("tez.runtime.io.sort.mb", 3);
        this.conf.set("tez.runtime.sorter.class", OrderedPartitionedKVOutputConfig.SorterImpl.LEGACY.name());
        this.conf.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        this.conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        OutputContext context = this.createTezOutputContext();
        UserPayload payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        this.sortedOutput = new OrderedPartitionedKVOutput(context, this.partitions);
        this.sortedOutput.initialize();
        try {
            this.sortedOutput.start();
            Assert.fail((String)"Should have thrown illegal argument exception as pipelining is enabled with DefaultSorter");
        }
        catch (IllegalArgumentException ie) {
            Assert.assertTrue((boolean)ie.getMessage().contains("works with PipelinedSorter"));
        }
    }

    @Test(timeout=5000L)
    public void testSortBufferSize() throws Exception {
        OutputContext context = this.createTezOutputContext();
        this.conf.setInt("tez.runtime.io.sort.mb", 2048);
        UserPayload payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        this.sortedOutput = new OrderedPartitionedKVOutput(context, this.partitions);
        try {
            this.sortedOutput.initialize();
            DefaultSorter sorter = new DefaultSorter(context, this.conf, 100, 3670016000L);
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("tez.runtime.io.sort.mb"));
        }
        this.conf.setInt("tez.runtime.io.sort.mb", 0);
        payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        this.sortedOutput = new OrderedPartitionedKVOutput(context, this.partitions);
        try {
            this.sortedOutput.initialize();
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("tez.runtime.io.sort.mb"));
        }
    }

    @Test(timeout=5000L)
    public void baseTest() throws Exception {
        this.startSortedOutput(this.partitions);
        long recordsWritten = this.numRecords.get();
        for (int i = 0; i < Math.max(1, rnd.nextInt(50)); ++i) {
            Text key = new Text(new BigInteger(256, rnd).toString());
            LinkedList<Text> values = new LinkedList<Text>();
            for (int j = 0; j < Math.max(2, rnd.nextInt(10)); ++j) {
                ++recordsWritten;
                values.add(new Text(new BigInteger(256, rnd).toString()));
            }
            this.writer.write((Object)key, values);
        }
        List eventList = this.sortedOutput.close();
        Assert.assertTrue((eventList != null && eventList.size() == 2 ? 1 : 0) != 0);
        Assert.assertEquals((long)recordsWritten, (long)this.numRecords.get());
        ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)((CompositeDataMovementEvent)eventList.get(1)).getUserPayload()));
        ShuffleUserPayloads.VertexManagerEventPayloadProto vmPayload = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)((VertexManagerEvent)eventList.get(0)).getUserPayload()));
        if (this.reportPartitionStats.isPrecise()) {
            Assert.assertTrue((boolean)vmPayload.hasDetailedPartitionStats());
        } else {
            Assert.assertTrue((boolean)vmPayload.hasPartitionStats());
        }
        Assert.assertEquals((Object)HOST, (Object)payload.getHost());
        Assert.assertEquals((long)80L, (long)payload.getPort());
        Assert.assertEquals((Object)UniqueID, (Object)payload.getPathComponent());
    }

    @Test(timeout=5000L)
    public void testWithSomeEmptyPartition() throws Exception {
        this.partitions = Math.max(2, this.partitions);
        this.startSortedOutput(this.partitions);
        for (int i = 0; i < 2 * this.partitions; ++i) {
            Text key = new Text(new BigInteger(256, rnd).toString());
            Text value = new Text(new BigInteger(256, rnd).toString());
            if (i % this.partitions == this.emptyPartitionIdx) continue;
            this.writer.write((Object)key, (Object)value);
        }
        List eventList = this.sortedOutput.close();
        Assert.assertTrue((eventList != null && eventList.size() == 2 ? 1 : 0) != 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)((CompositeDataMovementEvent)eventList.get(1)).getUserPayload()));
        Assert.assertEquals((Object)HOST, (Object)payload.getHost());
        Assert.assertEquals((long)80L, (long)payload.getPort());
        Assert.assertEquals((Object)UniqueID, (Object)payload.getPathComponent());
    }

    @Test(timeout=5000L)
    public void testAllEmptyPartition() throws Exception {
        this.startSortedOutput(this.partitions);
        List eventList = this.sortedOutput.close();
        Assert.assertTrue((eventList != null && eventList.size() == 2 ? 1 : 0) != 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)((CompositeDataMovementEvent)eventList.get(1)).getUserPayload()));
        if (this.sendEmptyPartitionViaEvent) {
            Assert.assertEquals((Object)"", (Object)payload.getHost());
            Assert.assertEquals((long)0L, (long)payload.getPort());
            Assert.assertEquals((Object)"", (Object)payload.getPathComponent());
        } else {
            Assert.assertEquals((Object)HOST, (Object)payload.getHost());
            Assert.assertEquals((long)80L, (long)payload.getPort());
            Assert.assertEquals((Object)UniqueID, (Object)payload.getPathComponent());
        }
    }

    private OutputContext createTezOutputContext() throws IOException {
        String[] workingDirs = new String[]{this.workingDir.toString()};
        UserPayload payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
        serviceProviderMetaData.writeInt(80);
        TezCounters counters = new TezCounters();
        OutputStatisticsReporter reporter = (OutputStatisticsReporter)Mockito.mock(OutputStatisticsReporter.class);
        ((OutputStatisticsReporter)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                TestOnFileSortedOutput.this.outputSize.set((Long)invocation.getArguments()[0]);
                return null;
            }
        }).when((Object)reporter)).reportDataSize(Matchers.anyLong());
        ((OutputStatisticsReporter)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                TestOnFileSortedOutput.this.numRecords.set((Long)invocation.getArguments()[0]);
                return null;
            }
        }).when((Object)reporter)).reportItemsProcessed(Matchers.anyLong());
        OutputContext context = (OutputContext)Mockito.mock(OutputContext.class);
        ((OutputContext)Mockito.doReturn((Object)counters).when((Object)context)).getCounters();
        ((OutputContext)Mockito.doReturn((Object)workingDirs).when((Object)context)).getWorkDirs();
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        ((OutputContext)Mockito.doReturn((Object)0x500000L).when((Object)context)).getTotalMemoryAvailableToTask();
        ((OutputContext)Mockito.doReturn((Object)UniqueID).when((Object)context)).getUniqueIdentifier();
        ((OutputContext)Mockito.doReturn((Object)"v1").when((Object)context)).getDestinationVertexName();
        ((OutputContext)Mockito.doReturn((Object)ByteBuffer.wrap(serviceProviderMetaData.getData())).when((Object)context)).getServiceProviderMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        ((OutputContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                long requestedSize = (Long)invocation.getArguments()[0];
                MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler)invocation.getArguments()[1];
                callback.memoryAssigned(requestedSize);
                return null;
            }
        }).when((Object)context)).requestInitialMemory(Matchers.anyLong(), (MemoryUpdateCallback)Matchers.any(MemoryUpdateCallback.class));
        ExecutionContext ExecutionContext2 = (ExecutionContext)Mockito.mock(ExecutionContext.class);
        ((ExecutionContext)Mockito.doReturn((Object)HOST).when((Object)ExecutionContext2)).getHostName();
        ((OutputContext)Mockito.doReturn((Object)reporter).when((Object)context)).getStatisticsReporter();
        ((OutputContext)Mockito.doReturn((Object)ExecutionContext2).when((Object)context)).getExecutionContext();
        return context;
    }

    @Test(timeout=5000L)
    public void testInvalidSorter() throws Exception {
        try {
            this._testPipelinedShuffle("Foo");
            Assert.fail((String)"Expected start to fail due to invalid sorter");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test(timeout=5000L)
    public void testLowerCaseNamedSorter() throws Exception {
        this._testPipelinedShuffle("Pipelined");
    }
}

