/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.zip.Deflater;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;

public class TestShuffleUtils {
    private OutputContext outputContext;
    private Configuration conf;
    private FileSystem localFs;
    private Path workingDir;

    private InputContext createTezInputContext() {
        ApplicationId applicationId = ApplicationId.newInstance((long)1L, (int)1);
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ((InputContext)Mockito.doReturn((Object)applicationId).when((Object)inputContext)).getApplicationId();
        ((InputContext)Mockito.doReturn((Object)"sourceVertex").when((Object)inputContext)).getSourceVertexName();
        Mockito.when((Object)inputContext.getCounters()).thenReturn((Object)new TezCounters());
        return inputContext;
    }

    private OutputContext createTezOutputContext() throws IOException {
        ApplicationId applicationId = ApplicationId.newInstance((long)1L, (int)1);
        OutputContext outputContext = (OutputContext)Mockito.mock(OutputContext.class);
        ExecutionContextImpl executionContext = (ExecutionContextImpl)Mockito.mock(ExecutionContextImpl.class);
        ((ExecutionContextImpl)Mockito.doReturn((Object)"localhost").when((Object)executionContext)).getHostName();
        ((OutputContext)Mockito.doReturn((Object)executionContext).when((Object)outputContext)).getExecutionContext();
        DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
        serviceProviderMetaData.writeInt(80);
        ((OutputContext)Mockito.doReturn((Object)ByteBuffer.wrap(serviceProviderMetaData.getData())).when((Object)outputContext)).getServiceProviderMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getTaskVertexIndex();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getOutputIndex();
        ((OutputContext)Mockito.doReturn((Object)0).when((Object)outputContext)).getDAGAttemptNumber();
        ((OutputContext)Mockito.doReturn((Object)"destVertex").when((Object)outputContext)).getDestinationVertexName();
        Mockito.when((Object)outputContext.getCounters()).thenReturn((Object)new TezCounters());
        return outputContext;
    }

    @Before
    public void setup() throws Exception {
        this.conf = new Configuration();
        this.outputContext = this.createTezOutputContext();
        this.conf.set("fs.defaultFS", "file:///");
        this.localFs = FileSystem.getLocal((Configuration)this.conf);
        this.workingDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestShuffleUtils.class.getName()).makeQualified(this.localFs.getUri(), this.localFs.getWorkingDirectory());
        String localDirs = this.workingDir.toString();
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", Text.class.getName());
        this.conf.set("tez.runtime.partitioner.class", HashPartitioner.class.getName());
        this.conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDirs});
    }

    private Path createIndexFile(int numPartitions, boolean allEmptyPartitions) throws IOException {
        Path path = new Path(this.workingDir, "file.index.out");
        TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
        long startOffset = 0L;
        long partLen = 200L;
        for (int i = 0; i < numPartitions; ++i) {
            long rawLen = ThreadLocalRandom.current().nextLong(100L, 200L);
            if (i % 2 == 0 || allEmptyPartitions) {
                rawLen = 0L;
            }
            TezIndexRecord indexRecord = new TezIndexRecord(startOffset, rawLen, partLen);
            startOffset += partLen;
            spillRecord.putIndex(indexRecord, i);
        }
        spillRecord.writeToFile(path, this.conf, FileSystem.getLocal((Configuration)this.conf).getRaw());
        return path;
    }

    @Test
    public void testGenerateOnSpillEvent() throws Exception {
        LinkedList events = Lists.newLinkedList();
        Path indexFile = this.createIndexFile(10, false);
        boolean finalMergeEnabled = false;
        boolean isLastEvent = false;
        int spillId = 0;
        int physicalOutputs = 10;
        String pathComponent = "/attempt_x_y_0/file.out";
        String auxiliaryService = this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        ShuffleUtils.generateEventOnSpill((List)events, (boolean)finalMergeEnabled, (boolean)isLastEvent, (OutputContext)this.outputContext, (int)spillId, (TezSpillRecord)new TezSpillRecord(indexFile, this.conf), (int)physicalOutputs, (boolean)true, (String)pathComponent, null, (boolean)false, (String)auxiliaryService, (Deflater)TezCommonUtils.newBestCompressionDeflater());
        Assert.assertTrue((events.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)(events.get(0) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(0);
        Assert.assertTrue((cdme.getCount() == physicalOutputs ? 1 : 0) != 0);
        Assert.assertTrue((cdme.getSourceIndexStart() == 0 ? 1 : 0) != 0);
        ByteBuffer payload = cdme.getUserPayload();
        ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)payload));
        Assert.assertTrue((dmeProto.getSpillId() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((dmeProto.hasLastEvent() && !dmeProto.getLastEvent() ? 1 : 0) != 0);
        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)dmeProto.getEmptyPartitions());
        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
        Assert.assertTrue((String)("emptyPartitionBitSet cardinality (expecting 5) = " + emptyPartitionsBitSet.cardinality()), (emptyPartitionsBitSet.cardinality() == 5 ? 1 : 0) != 0);
        events.clear();
    }

    @Test
    public void testGenerateOnSpillEvent_With_FinalMerge() throws Exception {
        LinkedList events = Lists.newLinkedList();
        Path indexFile = this.createIndexFile(10, false);
        boolean finalMergeEnabled = true;
        boolean isLastEvent = true;
        int spillId = 0;
        int physicalOutputs = 10;
        String pathComponent = "/attempt_x_y_0/file.out";
        String auxiliaryService = this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        ShuffleUtils.generateEventOnSpill((List)events, (boolean)finalMergeEnabled, (boolean)isLastEvent, (OutputContext)this.outputContext, (int)spillId, (TezSpillRecord)new TezSpillRecord(indexFile, this.conf), (int)physicalOutputs, (boolean)true, (String)pathComponent, null, (boolean)false, (String)auxiliaryService, (Deflater)TezCommonUtils.newBestCompressionDeflater());
        Assert.assertTrue((events.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)(events.get(0) instanceof VertexManagerEvent));
        Assert.assertTrue((boolean)(events.get(1) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(1);
        Assert.assertTrue((cdme.getCount() == physicalOutputs ? 1 : 0) != 0);
        Assert.assertTrue((cdme.getSourceIndexStart() == 0 ? 1 : 0) != 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        Assert.assertFalse((boolean)dmeProto.hasSpillId());
        Assert.assertFalse((dmeProto.hasLastEvent() || dmeProto.getLastEvent() ? 1 : 0) != 0);
        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)dmeProto.getEmptyPartitions());
        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
        Assert.assertTrue((String)("emptyPartitionBitSet cardinality (expecting 5) = " + emptyPartitionsBitSet.cardinality()), (emptyPartitionsBitSet.cardinality() == 5 ? 1 : 0) != 0);
    }

    @Test
    public void testGenerateOnSpillEvent_With_All_EmptyPartitions() throws Exception {
        LinkedList events = Lists.newLinkedList();
        Path indexFile = this.createIndexFile(10, true);
        boolean finalMergeDisabled = false;
        boolean isLastEvent = true;
        int spillId = 0;
        int physicalOutputs = 10;
        String pathComponent = "/attempt_x_y_0/file.out";
        String auxiliaryService = this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        ShuffleUtils.generateEventOnSpill((List)events, (boolean)finalMergeDisabled, (boolean)isLastEvent, (OutputContext)this.outputContext, (int)spillId, (TezSpillRecord)new TezSpillRecord(indexFile, this.conf), (int)physicalOutputs, (boolean)true, (String)pathComponent, null, (boolean)false, (String)auxiliaryService, (Deflater)TezCommonUtils.newBestCompressionDeflater());
        Assert.assertEquals((long)2L, (long)events.size());
        Assert.assertTrue((boolean)(events.get(0) instanceof VertexManagerEvent));
        Assert.assertTrue((boolean)(events.get(1) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(1);
        Assert.assertEquals((long)cdme.getCount(), (long)physicalOutputs);
        Assert.assertEquals((long)0L, (long)cdme.getSourceIndexStart());
        ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        Assert.assertEquals((long)0L, (long)dmeProto.getSpillId());
        Assert.assertTrue((dmeProto.hasLastEvent() && dmeProto.getLastEvent() ? 1 : 0) != 0);
        Assert.assertEquals((Object)"", (Object)dmeProto.getPathComponent());
        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)dmeProto.getEmptyPartitions());
        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
        Assert.assertEquals((String)("emptyPartitionBitSet cardinality (expecting 10) = " + emptyPartitionsBitSet.cardinality()), (long)10L, (long)emptyPartitionsBitSet.cardinality());
    }

    @Test
    public void testInternalErrorTranslation() throws Exception {
        String codecErrorMsg = "codec failure";
        CompressionInputStream mockCodecStream = (CompressionInputStream)Mockito.mock(CompressionInputStream.class);
        Mockito.when((Object)mockCodecStream.read((byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt())).thenThrow(new Throwable[]{new InternalError(codecErrorMsg)});
        Decompressor mockDecoder = (Decompressor)Mockito.mock(Decompressor.class);
        ConfigurableCodecForTest mockCodec = (ConfigurableCodecForTest)Mockito.mock(ConfigurableCodecForTest.class);
        Mockito.when((Object)mockCodec.getConf()).thenReturn(Mockito.mock(Configuration.class));
        Mockito.when((Object)mockCodec.createDecompressor()).thenReturn((Object)mockDecoder);
        Mockito.when((Object)mockCodec.createInputStream((InputStream)Mockito.any(), (Decompressor)Mockito.any())).thenReturn((Object)mockCodecStream);
        byte[] header = new byte[]{84, 73, 70, 1};
        try {
            ShuffleUtils.shuffleToMemory((byte[])new byte[1024], (InputStream)new ByteArrayInputStream(header), (int)1024, (int)128, (CompressionCodec)mockCodec, (boolean)false, (int)0, (Logger)((Logger)Mockito.mock(Logger.class)), null);
            Assert.fail((String)"shuffle was supposed to throw!");
        }
        catch (IOException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof InternalError));
            Assert.assertTrue((boolean)e.getMessage().contains(codecErrorMsg));
        }
    }

    @Test
    public void testExceptionTranslation() throws Exception {
        String codecErrorMsg = "codec failure";
        CompressionInputStream mockCodecStream = (CompressionInputStream)Mockito.mock(CompressionInputStream.class);
        Mockito.when((Object)mockCodecStream.read((byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt())).thenThrow(new Throwable[]{new IllegalArgumentException(codecErrorMsg)});
        Decompressor mockDecoder = (Decompressor)Mockito.mock(Decompressor.class);
        ConfigurableCodecForTest mockCodec = (ConfigurableCodecForTest)Mockito.mock(ConfigurableCodecForTest.class);
        Mockito.when((Object)mockCodec.getConf()).thenReturn(Mockito.mock(Configuration.class));
        Mockito.when((Object)mockCodec.createDecompressor()).thenReturn((Object)mockDecoder);
        Mockito.when((Object)mockCodec.createInputStream((InputStream)Mockito.any(), (Decompressor)Mockito.any())).thenReturn((Object)mockCodecStream);
        byte[] header = new byte[]{84, 73, 70, 1};
        try {
            ShuffleUtils.shuffleToMemory((byte[])new byte[1024], (InputStream)new ByteArrayInputStream(header), (int)1024, (int)128, (CompressionCodec)mockCodec, (boolean)false, (int)0, (Logger)((Logger)Mockito.mock(Logger.class)), null);
            Assert.fail((String)"shuffle was supposed to throw!");
        }
        catch (IOException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof IllegalArgumentException));
            Assert.assertTrue((boolean)e.getMessage().contains(codecErrorMsg));
        }
        CompressionInputStream mockCodecStream1 = (CompressionInputStream)Mockito.mock(CompressionInputStream.class);
        Mockito.when((Object)mockCodecStream1.read((byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt())).thenThrow(new Throwable[]{new SocketTimeoutException(codecErrorMsg)});
        ConfigurableCodecForTest mockCodec1 = (ConfigurableCodecForTest)Mockito.mock(ConfigurableCodecForTest.class);
        Mockito.when((Object)mockCodec1.getConf()).thenReturn(Mockito.mock(Configuration.class));
        Mockito.when((Object)mockCodec1.createDecompressor()).thenReturn((Object)mockDecoder);
        Mockito.when((Object)mockCodec1.createInputStream((InputStream)Mockito.any(), (Decompressor)Mockito.any())).thenReturn((Object)mockCodecStream1);
        try {
            ShuffleUtils.shuffleToMemory((byte[])new byte[1024], (InputStream)new ByteArrayInputStream(header), (int)1024, (int)128, (CompressionCodec)mockCodec1, (boolean)false, (int)0, (Logger)((Logger)Mockito.mock(Logger.class)), null);
            Assert.fail((String)"shuffle was supposed to throw!");
        }
        catch (IOException e) {
            Assert.assertTrue((boolean)(e instanceof SocketTimeoutException));
            Assert.assertTrue((boolean)e.getMessage().contains(codecErrorMsg));
        }
        CompressionInputStream mockCodecStream2 = (CompressionInputStream)Mockito.mock(CompressionInputStream.class);
        Mockito.when((Object)mockCodecStream2.read((byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt())).thenThrow(new Throwable[]{new InternalError(codecErrorMsg)});
        ConfigurableCodecForTest mockCodec2 = (ConfigurableCodecForTest)Mockito.mock(ConfigurableCodecForTest.class);
        Mockito.when((Object)mockCodec2.getConf()).thenReturn(Mockito.mock(Configuration.class));
        Mockito.when((Object)mockCodec2.createDecompressor()).thenReturn((Object)mockDecoder);
        Mockito.when((Object)mockCodec2.createInputStream((InputStream)Mockito.any(), (Decompressor)Mockito.any())).thenReturn((Object)mockCodecStream2);
        try {
            ShuffleUtils.shuffleToMemory((byte[])new byte[1024], (InputStream)new ByteArrayInputStream(header), (int)1024, (int)128, (CompressionCodec)mockCodec2, (boolean)false, (int)0, (Logger)((Logger)Mockito.mock(Logger.class)), null);
            Assert.fail((String)"shuffle was supposed to throw!");
        }
        catch (IOException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof InternalError));
            Assert.assertTrue((boolean)e.getMessage().contains(codecErrorMsg));
        }
    }

    @Test
    public void testShuffleToDiskChecksum() throws Exception {
        byte[] bogusData = new byte[1000];
        Arrays.fill(bogusData, (byte)0);
        ByteArrayInputStream in = new ByteArrayInputStream(bogusData);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ShuffleUtils.shuffleToDisk((OutputStream)baos, (String)"somehost", (InputStream)in, (long)bogusData.length, (long)2000L, (Logger)((Logger)Mockito.mock(Logger.class)), null, (boolean)false, (int)0, (boolean)false);
        Assert.assertArrayEquals((byte[])bogusData, (byte[])baos.toByteArray());
        in.reset();
        try {
            ShuffleUtils.shuffleToDisk((OutputStream)((OutputStream)Mockito.mock(OutputStream.class)), (String)"somehost", (InputStream)in, (long)bogusData.length, (long)2000L, (Logger)((Logger)Mockito.mock(Logger.class)), null, (boolean)false, (int)0, (boolean)true);
            Assert.fail((String)"shuffle was supposed to throw!");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testFetchStatsLogger() throws Exception {
        int i;
        Logger activeLogger = (Logger)Mockito.mock(Logger.class);
        Logger aggregateLogger = (Logger)Mockito.mock(Logger.class);
        ShuffleUtils.FetchStatsLogger logger = new ShuffleUtils.FetchStatsLogger(activeLogger, aggregateLogger);
        InputAttemptIdentifier ident = new InputAttemptIdentifier(1, 1);
        Mockito.when((Object)activeLogger.isInfoEnabled()).thenReturn((Object)false);
        for (i = 0; i < 1000; ++i) {
            logger.logIndividualFetchComplete(10L, 100L, 1000L, "testType", ident);
        }
        ((Logger)Mockito.verify((Object)activeLogger, (VerificationMode)Mockito.times((int)0))).info(Mockito.anyString());
        ((Logger)Mockito.verify((Object)aggregateLogger, (VerificationMode)Mockito.times((int)1))).info(Mockito.anyString(), (Object[])ArgumentMatchers.any());
        Mockito.when((Object)activeLogger.isInfoEnabled()).thenReturn((Object)true);
        for (i = 0; i < 1000; ++i) {
            logger.logIndividualFetchComplete(10L, 100L, 1000L, "testType", ident);
        }
        ((Logger)Mockito.verify((Object)activeLogger, (VerificationMode)Mockito.times((int)1000))).info(Mockito.anyString());
        ((Logger)Mockito.verify((Object)aggregateLogger, (VerificationMode)Mockito.times((int)1))).info(Mockito.anyString(), (Object[])ArgumentMatchers.any());
    }

    public static class ConfigurableCodecForTest
    implements CompressionCodec,
    Configurable {
        public Compressor createCompressor() {
            return null;
        }

        public Decompressor createDecompressor() {
            return null;
        }

        public CompressionInputStream createInputStream(InputStream arg0) throws IOException {
            return null;
        }

        public CompressionInputStream createInputStream(InputStream arg0, Decompressor arg1) throws IOException {
            return null;
        }

        public CompressionOutputStream createOutputStream(OutputStream arg0) throws IOException {
            return null;
        }

        public CompressionOutputStream createOutputStream(OutputStream arg0, Compressor arg1) throws IOException {
            return null;
        }

        public Class<? extends Compressor> getCompressorType() {
            return null;
        }

        public Class<? extends Decompressor> getDecompressorType() {
            return null;
        }

        public String getDefaultExtension() {
            return null;
        }

        public Configuration getConf() {
            return null;
        }

        public void setConf(Configuration arg0) {
        }
    }
}

