/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class TestShuffleUtils {
    private static final String HOST = "localhost";
    private static final int PORT = 8080;
    private static final String PATH_COMPONENT = "attempt";
    private OutputContext outputContext;
    private Configuration conf;
    private FileSystem localFs;
    private Path workingDir;

    private InputContext createTezInputContext() {
        ApplicationId applicationId = ApplicationId.newInstance((long)1L, (int)1);
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ((InputContext)Mockito.doReturn((Object)applicationId).when((Object)inputContext)).getApplicationId();
        ((InputContext)Mockito.doReturn((Object)"sourceVertex").when((Object)inputContext)).getSourceVertexName();
        Mockito.when((Object)inputContext.getCounters()).thenReturn((Object)new TezCounters());
        return inputContext;
    }

    private OutputContext createTezOutputContext() throws IOException {
        ApplicationId applicationId = ApplicationId.newInstance((long)1L, (int)1);
        OutputContext outputContext = (OutputContext)Mockito.mock(OutputContext.class);
        ExecutionContextImpl executionContext = (ExecutionContextImpl)Mockito.mock(ExecutionContextImpl.class);
        ((ExecutionContextImpl)Mockito.doReturn((Object)HOST).when((Object)executionContext)).getHostName();
        ((OutputContext)Mockito.doReturn((Object)executionContext).when((Object)outputContext)).getExecutionContext();
        DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
        serviceProviderMetaData.writeInt(80);
        ((OutputContext)Mockito.doReturn((Object)ByteBuffer.wrap(serviceProviderMetaData.getData())).when((Object)outputContext)).getServiceProviderMetaData("mapreduce_shuffle");
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getTaskVertexIndex();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getOutputIndex();
        ((OutputContext)Mockito.doReturn((Object)0).when((Object)outputContext)).getDAGAttemptNumber();
        ((OutputContext)Mockito.doReturn((Object)"destVertex").when((Object)outputContext)).getDestinationVertexName();
        Mockito.when((Object)outputContext.getCounters()).thenReturn((Object)new TezCounters());
        return outputContext;
    }

    @Before
    public void setup() throws Exception {
        this.outputContext = this.createTezOutputContext();
        this.conf = new Configuration();
        this.conf.set("fs.defaultFS", "file:///");
        this.localFs = FileSystem.getLocal((Configuration)this.conf);
        this.workingDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestShuffleUtils.class.getName()).makeQualified(this.localFs.getUri(), this.localFs.getWorkingDirectory());
        String localDirs = this.workingDir.toString();
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", Text.class.getName());
        this.conf.set("tez.runtime.partitioner.class", HashPartitioner.class.getName());
        this.conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDirs});
    }

    private Path createIndexFile(int numPartitions, boolean allEmptyPartitions) throws IOException {
        Path path = new Path(this.workingDir, "file.index.out");
        TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
        long startOffset = 0L;
        long partLen = 200L;
        Random rnd = new Random();
        for (int i = 0; i < numPartitions; ++i) {
            long rawLen = rnd.nextLong();
            if (i % 2 == 0 || allEmptyPartitions) {
                rawLen = 6L;
            }
            TezIndexRecord indexRecord = new TezIndexRecord(startOffset, rawLen, partLen);
            startOffset += partLen;
            spillRecord.putIndex(indexRecord, i);
        }
        spillRecord.writeToFile(path, this.conf);
        return path;
    }

    @Test
    public void testGenerateOnSpillEvent() throws Exception {
        LinkedList events = Lists.newLinkedList();
        Path indexFile = this.createIndexFile(10, false);
        boolean finalMergeEnabled = false;
        boolean isLastEvent = false;
        int spillId = 0;
        int physicalOutputs = 10;
        String pathComponent = "/attempt_x_y_0/file.out";
        ShuffleUtils.generateEventOnSpill((List)events, (boolean)finalMergeEnabled, (boolean)isLastEvent, (OutputContext)this.outputContext, (int)spillId, (TezSpillRecord)new TezSpillRecord(indexFile, this.conf), (int)physicalOutputs, (boolean)true, (String)pathComponent, null);
        Assert.assertTrue((events.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)(events.get(0) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(0);
        Assert.assertTrue((cdme.getCount() == physicalOutputs ? 1 : 0) != 0);
        Assert.assertTrue((cdme.getSourceIndexStart() == 0 ? 1 : 0) != 0);
        ByteBuffer payload = cdme.getUserPayload();
        ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)payload));
        Assert.assertTrue((dmeProto.getSpillId() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((dmeProto.hasLastEvent() && !dmeProto.getLastEvent() ? 1 : 0) != 0);
        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)dmeProto.getEmptyPartitions());
        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
        Assert.assertTrue((String)("emptyPartitionBitSet cardinality (expecting 5) = " + emptyPartitionsBitSet.cardinality()), (emptyPartitionsBitSet.cardinality() == 5 ? 1 : 0) != 0);
        events.clear();
    }

    @Test
    public void testGenerateOnSpillEvent_With_FinalMerge() throws Exception {
        LinkedList events = Lists.newLinkedList();
        Path indexFile = this.createIndexFile(10, false);
        boolean finalMergeEnabled = true;
        boolean isLastEvent = true;
        int spillId = 0;
        int physicalOutputs = 10;
        String pathComponent = "/attempt_x_y_0/file.out";
        ShuffleUtils.generateEventOnSpill((List)events, (boolean)finalMergeEnabled, (boolean)isLastEvent, (OutputContext)this.outputContext, (int)spillId, (TezSpillRecord)new TezSpillRecord(indexFile, this.conf), (int)physicalOutputs, (boolean)true, (String)pathComponent, null);
        Assert.assertTrue((events.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)(events.get(0) instanceof VertexManagerEvent));
        Assert.assertTrue((boolean)(events.get(1) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(1);
        Assert.assertTrue((cdme.getCount() == physicalOutputs ? 1 : 0) != 0);
        Assert.assertTrue((cdme.getSourceIndexStart() == 0 ? 1 : 0) != 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        Assert.assertFalse((boolean)dmeProto.hasSpillId());
        Assert.assertFalse((dmeProto.hasLastEvent() || dmeProto.getLastEvent() ? 1 : 0) != 0);
        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)dmeProto.getEmptyPartitions());
        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
        Assert.assertTrue((String)("emptyPartitionBitSet cardinality (expecting 5) = " + emptyPartitionsBitSet.cardinality()), (emptyPartitionsBitSet.cardinality() == 5 ? 1 : 0) != 0);
    }

    @Test
    public void testGenerateOnSpillEvent_With_All_EmptyPartitions() throws Exception {
        LinkedList events = Lists.newLinkedList();
        Path indexFile = this.createIndexFile(10, true);
        boolean finalMergeDisabled = false;
        boolean isLastEvent = true;
        int spillId = 0;
        int physicalOutputs = 10;
        String pathComponent = "/attempt_x_y_0/file.out";
        ShuffleUtils.generateEventOnSpill((List)events, (boolean)finalMergeDisabled, (boolean)isLastEvent, (OutputContext)this.outputContext, (int)spillId, (TezSpillRecord)new TezSpillRecord(indexFile, this.conf), (int)physicalOutputs, (boolean)true, (String)pathComponent, null);
        Assert.assertTrue((events.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)(events.get(0) instanceof VertexManagerEvent));
        Assert.assertTrue((boolean)(events.get(1) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(1);
        Assert.assertTrue((cdme.getCount() == physicalOutputs ? 1 : 0) != 0);
        Assert.assertTrue((cdme.getSourceIndexStart() == 0 ? 1 : 0) != 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        Assert.assertTrue((dmeProto.getSpillId() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((dmeProto.hasLastEvent() && dmeProto.getLastEvent() ? 1 : 0) != 0);
        Assert.assertTrue((boolean)dmeProto.getPathComponent().equals(""));
        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)dmeProto.getEmptyPartitions());
        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
        Assert.assertTrue((String)("emptyPartitionBitSet cardinality (expecting 10) = " + emptyPartitionsBitSet.cardinality()), (emptyPartitionsBitSet.cardinality() == 10 ? 1 : 0) != 0);
    }
}

