/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.output;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.IOStatistics;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.testutils.KVDataGen;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestOnFileUnorderedKVOutput {
    private static final Logger LOG = LoggerFactory.getLogger(TestOnFileUnorderedKVOutput.class);
    private static Configuration defaultConf = new Configuration();
    private static FileSystem localFs = null;
    private static Path workDir = null;
    private static final int shufflePort = 2112;
    LogicalIOProcessorRuntimeTask task;

    @Before
    public void setup() throws Exception {
        localFs.mkdirs(workDir);
    }

    @After
    public void cleanup() throws Exception {
        localFs.delete(workDir, true);
    }

    @Test(timeout=5000L)
    public void testGeneratedDataMovementEvent() throws Exception {
        Configuration conf = new Configuration();
        conf.set("tez.runtime.key.class", Text.class.getName());
        conf.set("tez.runtime.value.class", IntWritable.class.getName());
        OutputContext outputContext = this.createOutputContext(conf);
        UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
        List events = null;
        events = kvOutput.initialize();
        kvOutput.start();
        Assert.assertTrue((events != null && events.size() == 0 ? 1 : 0) != 0);
        KeyValuesWriter kvWriter = kvOutput.getWriter();
        List<KVDataGen.KVPair> data = KVDataGen.generateTestData(true, 0);
        for (KVDataGen.KVPair kvp : data) {
            kvWriter.write((Object)kvp.getKey(), (Object)kvp.getvalue());
        }
        events = kvOutput.close();
        Assert.assertEquals((long)45L, (long)((IOStatistics)this.task.getTaskStatistics().getIOStatistics().values().iterator().next()).getDataSize());
        Assert.assertEquals((long)5L, (long)((IOStatistics)this.task.getTaskStatistics().getIOStatistics().values().iterator().next()).getItemsProcessed());
        Assert.assertTrue((events != null && events.size() == 1 ? 1 : 0) != 0);
        CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
        Assert.assertEquals((String)"Invalid source index", (long)0L, (long)dmEvent.getSourceIndexStart());
        ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)dmEvent.getUserPayload()));
        Assert.assertFalse((boolean)shufflePayload.hasEmptyPartitions());
        Assert.assertEquals((Object)outputContext.getUniqueIdentifier(), (Object)shufflePayload.getPathComponent());
        Assert.assertEquals((long)2112L, (long)shufflePayload.getPort());
        Assert.assertEquals((Object)"localhost", (Object)shufflePayload.getHost());
    }

    @Test(timeout=30000L)
    public void testWithPipelinedShuffle() throws Exception {
        Configuration conf = new Configuration();
        conf.set("tez.runtime.key.class", Text.class.getName());
        conf.set("tez.runtime.value.class", IntWritable.class.getName());
        conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        conf.setInt("tez.runtime.unordered.output.buffer.size-mb", 1);
        OutputContext outputContext = this.createOutputContext(conf);
        UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
        List events = null;
        events = kvOutput.initialize();
        kvOutput.start();
        Assert.assertTrue((events != null && events.size() == 0 ? 1 : 0) != 0);
        KeyValuesWriter kvWriter = kvOutput.getWriter();
        for (int i = 0; i < 500; ++i) {
            kvWriter.write((Object)new Text(RandomStringUtils.randomAscii((int)10000)), (Object)new IntWritable(i));
        }
        events = kvOutput.close();
        Assert.assertTrue((events != null && events.size() == 0 ? 1 : 0) != 0);
        ArgumentCaptor eventsCaptor = ArgumentCaptor.forClass(List.class);
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.atLeast((int)1))).sendEvents((List)eventsCaptor.capture());
        events = (List)eventsCaptor.getValue();
        CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
        Assert.assertEquals((String)"Invalid source index", (long)0L, (long)dmEvent.getSourceIndexStart());
        ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)dmEvent.getUserPayload()));
        Assert.assertTrue((boolean)shufflePayload.hasLastEvent());
        Assert.assertFalse((boolean)shufflePayload.hasEmptyPartitions());
        Assert.assertEquals((long)2112L, (long)shufflePayload.getPort());
        Assert.assertEquals((Object)"localhost", (Object)shufflePayload.getHost());
    }

    private OutputContext createOutputContext(Configuration conf) throws IOException {
        int appAttemptNumber = 1;
        TezUmbilical tezUmbilical = (TezUmbilical)Mockito.mock(TezUmbilical.class);
        String dagName = "currentDAG";
        String taskVertexName = "currentVertex";
        String destinationVertexName = "destinationVertex";
        TezDAGID dagID = TezDAGID.getInstance((String)"2000", (int)1, (int)1);
        TezVertexID vertexID = TezVertexID.getInstance((TezDAGID)dagID, (int)1);
        TezTaskID taskID = TezTaskID.getInstance((TezVertexID)vertexID, (int)1);
        TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance((TezTaskID)taskID, (int)1);
        UserPayload userPayload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        TaskSpec mockSpec = (TaskSpec)Mockito.mock(TaskSpec.class);
        Mockito.when((Object)mockSpec.getInputs()).thenReturn(Collections.singletonList(Mockito.mock(InputSpec.class)));
        Mockito.when((Object)mockSpec.getOutputs()).thenReturn(Collections.singletonList(Mockito.mock(OutputSpec.class)));
        this.task = new LogicalIOProcessorRuntimeTask(mockSpec, appAttemptNumber, new Configuration(), new String[]{"/"}, tezUmbilical, null, null, null, null, "", null, 1024L, false, (HadoopShim)new DefaultHadoopShim());
        LogicalIOProcessorRuntimeTask runtimeTask = (LogicalIOProcessorRuntimeTask)Mockito.spy((Object)this.task);
        HashMap auxEnv = new HashMap();
        ByteBuffer bb = ByteBuffer.allocate(4);
        bb.putInt(2112);
        bb.position(0);
        AuxiliaryServiceHelper.setServiceDataIntoEnv((String)"mapreduce_shuffle", (ByteBuffer)bb, auxEnv);
        OutputDescriptor outputDescriptor = (OutputDescriptor)Mockito.mock(OutputDescriptor.class);
        Mockito.when((Object)outputDescriptor.getClassName()).thenReturn((Object)"OutputDescriptor");
        TezOutputContextImpl realOutputContext = new TezOutputContextImpl(conf, new String[]{workDir.toString()}, appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName, -1, taskAttemptID, 0, userPayload, runtimeTask, null, auxEnv, new MemoryDistributor(1, 1, conf), outputDescriptor, null, (ExecutionContext)new ExecutionContextImpl("localhost"), 2048L);
        ((LogicalIOProcessorRuntimeTask)Mockito.verify((Object)runtimeTask, (VerificationMode)Mockito.times((int)1))).addAndGetTezCounter(destinationVertexName);
        ((LogicalIOProcessorRuntimeTask)Mockito.verify((Object)runtimeTask, (VerificationMode)Mockito.times((int)1))).getTaskStatistics();
        Assert.assertTrue((boolean)this.task.getTaskStatistics().getIOStatistics().containsKey(destinationVertexName));
        OutputContext outputContext = (OutputContext)Mockito.spy((Object)realOutputContext);
        ((OutputContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                long requestedSize = (Long)invocation.getArguments()[0];
                MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler)invocation.getArguments()[1];
                callback.memoryAssigned(requestedSize);
                return null;
            }
        }).when((Object)outputContext)).requestInitialMemory(Matchers.anyLong(), (MemoryUpdateCallback)Matchers.any(MemoryUpdateCallback.class));
        return outputContext;
    }

    static {
        defaultConf.set("fs.defaultFS", "file:///");
        try {
            localFs = FileSystem.getLocal((Configuration)defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestOnFileUnorderedKVOutput.class.getName()).makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
            LOG.info("Using workDir: " + workDir);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

