/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.Fetcher;
import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.apache.tez.runtime.library.common.shuffle.InputHost;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestShuffleManager {
    private static final String FETCHER_HOST = "localhost";
    private static final int PORT = 8080;
    private static final String PATH_COMPONENT = "attempttmp";
    private final Configuration conf = new Configuration();
    private TezExecutors sharedExecutor;

    @Before
    public void setup() {
        this.sharedExecutor = new TezSharedExecutor(this.conf);
    }

    @After
    public void cleanup() {
        this.sharedExecutor.shutdownNow();
    }

    @Test(timeout=50000L)
    public void testMultiplePartitions() throws Exception {
        int numOfMappers = 3;
        int numOfPartitions = 5;
        int firstPart = 2;
        InputContext inputContext = this.createInputContext();
        ShuffleManagerForTest shuffleManager = this.createShuffleManager(inputContext, 15);
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, (ShuffleManager)shuffleManager, inputAllocator, null, false, 0, false);
        shuffleManager.run();
        LinkedList<Event> eventList = new LinkedList<Event>();
        int targetIndex = 0;
        for (int i = 0; i < 3; ++i) {
            Event dme;
            int j;
            String mapperHost = "host" + i;
            int srcIndex = 20;
            eventList.clear();
            for (j = 0; j < 2; ++j) {
                dme = this.createDataMovementEvent(mapperHost, srcIndex++, targetIndex++);
                eventList.add(dme);
            }
            handler.handleEvents(eventList);
            Thread.sleep(500L);
            eventList.clear();
            for (j = 0; j < 3; ++j) {
                dme = this.createDataMovementEvent(mapperHost, srcIndex++, targetIndex++);
                eventList.add(dme);
            }
            handler.handleEvents(eventList);
        }
        int waitCount = 100;
        while (!(waitCount-- <= 0 || shuffleManager.isFetcherExecutorShutdown() && 15 == shuffleManager.getNumOfCompletedInputs())) {
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)shuffleManager.isFetcherExecutorShutdown());
        Assert.assertEquals((long)15L, (long)shuffleManager.getNumOfCompletedInputs());
    }

    private InputContext createInputContext() throws IOException {
        DataOutputBuffer port_dob = new DataOutputBuffer();
        port_dob.writeInt(8080);
        ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
        port_dob.close();
        ExecutionContext executionContext = (ExecutionContext)Mockito.mock(ExecutionContext.class);
        ((ExecutionContext)Mockito.doReturn((Object)FETCHER_HOST).when((Object)executionContext)).getHostName();
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ((InputContext)Mockito.doReturn((Object)new TezCounters()).when((Object)inputContext)).getCounters();
        ((InputContext)Mockito.doReturn((Object)"sourceVertex").when((Object)inputContext)).getSourceVertexName();
        ((InputContext)Mockito.doReturn((Object)shuffleMetaData).when((Object)inputContext)).getServiceProviderMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        ((InputContext)Mockito.doReturn((Object)executionContext).when((Object)inputContext)).getExecutionContext();
        Mockito.when((Object)inputContext.createTezFrameworkExecutorService(Matchers.anyInt(), Matchers.anyString())).thenAnswer((Answer)new Answer<ExecutorService>(){

            public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
                return TestShuffleManager.this.sharedExecutor.createExecutorService(((Integer)invocation.getArgumentAt(0, Integer.class)).intValue(), (String)invocation.getArgumentAt(1, String.class));
            }
        });
        return inputContext;
    }

    @Test(timeout=5000L)
    public void testUseSharedExecutor() throws Exception {
        InputContext inputContext = this.createInputContext();
        this.createShuffleManager(inputContext, 2);
        ((InputContext)Mockito.verify((Object)inputContext, (VerificationMode)Mockito.times((int)0))).createTezFrameworkExecutorService(Matchers.anyInt(), Matchers.anyString());
        inputContext = this.createInputContext();
        this.conf.setBoolean("tez.runtime.shuffle.fetcher.use-shared-pool", true);
        this.createShuffleManager(inputContext, 2);
        ((InputContext)Mockito.verify((Object)inputContext)).createTezFrameworkExecutorService(Matchers.anyInt(), Matchers.anyString());
    }

    @Test(timeout=20000L)
    public void testProgressWithEmptyPendingHosts() throws Exception {
        InputContext inputContext = this.createInputContext();
        final ShuffleManager shuffleManager = (ShuffleManager)Mockito.spy((Object)((Object)this.createShuffleManager(inputContext, 1)));
        Thread schedulerGetHostThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    shuffleManager.run();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        schedulerGetHostThread.start();
        Thread.currentThread();
        Thread.sleep(4000L);
        schedulerGetHostThread.interrupt();
        ((InputContext)Mockito.verify((Object)inputContext, (VerificationMode)Mockito.atLeast((int)3))).notifyProgress();
    }

    @Test(timeout=200000L)
    public void testFetchFailed() throws Exception {
        InputContext inputContext = this.createInputContext();
        final ShuffleManager shuffleManager = (ShuffleManager)Mockito.spy((Object)((Object)this.createShuffleManager(inputContext, 1)));
        Thread schedulerGetHostThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    shuffleManager.run();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        InputAttemptFetchFailure inputAttemptFetchFailure = new InputAttemptFetchFailure(new InputAttemptIdentifier(1, 1));
        schedulerGetHostThread.start();
        Thread.sleep(1000L);
        shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
        Thread.sleep(1000L);
        ArgumentCaptor captor = ArgumentCaptor.forClass(List.class);
        ((InputContext)Mockito.verify((Object)inputContext, (VerificationMode)Mockito.times((int)1))).sendEvents((List)captor.capture());
        Assert.assertEquals((String)("Size was: " + captor.getAllValues().size()), (long)captor.getAllValues().size(), (long)1L);
        List capturedList = (List)captor.getAllValues().get(0);
        Assert.assertEquals((String)("Size was: " + capturedList.size()), (long)capturedList.size(), (long)1L);
        InputReadErrorEvent inputEvent = (InputReadErrorEvent)capturedList.get(0);
        Assert.assertEquals((String)("Number of failures was: " + inputEvent.getNumFailures()), (long)inputEvent.getNumFailures(), (long)1L);
        shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
        shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
        Thread.sleep(1000L);
        ((InputContext)Mockito.verify((Object)inputContext, (VerificationMode)Mockito.times((int)1))).sendEvents((List)Matchers.any());
        Thread.sleep(5000L);
        captor = ArgumentCaptor.forClass(List.class);
        ((InputContext)Mockito.verify((Object)inputContext, (VerificationMode)Mockito.times((int)2))).sendEvents((List)captor.capture());
        Assert.assertEquals((String)("Size was: " + captor.getAllValues().size()), (long)captor.getAllValues().size(), (long)2L);
        capturedList = (List)captor.getAllValues().get(1);
        Assert.assertEquals((String)("Size was: " + capturedList.size()), (long)capturedList.size(), (long)1L);
        inputEvent = (InputReadErrorEvent)capturedList.get(0);
        Assert.assertEquals((String)("Number of failures was: " + inputEvent.getNumFailures()), (long)inputEvent.getNumFailures(), (long)2L);
        schedulerGetHostThread.interrupt();
    }

    private ShuffleManagerForTest createShuffleManager(InputContext inputContext, int expectedNumOfPhysicalInputs) throws IOException {
        Path outDirBase = new Path(".", "outDir");
        String[] outDirs = new String[]{outDirBase.toString()};
        ((InputContext)Mockito.doReturn((Object)outDirs).when((Object)inputContext)).getWorkDirs();
        this.conf.setStrings("tez.runtime.framework.local.dirs", inputContext.getWorkDirs());
        this.conf.setInt("tez.runtime.shuffle.batch.wait", 5000);
        DataOutputBuffer out = new DataOutputBuffer();
        Token token = new Token((TokenIdentifier)new JobTokenIdentifier(), (SecretManager)new JobTokenSecretManager(null));
        token.write((DataOutput)out);
        ((InputContext)Mockito.doReturn((Object)ByteBuffer.wrap(out.getData())).when((Object)inputContext)).getServiceConsumerMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        return new ShuffleManagerForTest(inputContext, this.conf, expectedNumOfPhysicalInputs, 1024, false, -1, null, inputAllocator);
    }

    private Event createDataMovementEvent(String host, int srcIndex, int targetIndex) {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        builder.setHost(host);
        builder.setPort(8080);
        builder.setPathComponent(PATH_COMPONENT);
        DataMovementEvent dme = DataMovementEvent.create((int)srcIndex, (int)targetIndex, (int)0, (ByteBuffer)builder.build().toByteString().asReadOnlyByteBuffer());
        return dme;
    }

    @VisibleForTesting
    static class TestFetchedInput
    extends FetchedInput {
        public TestFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
            super(inputAttemptIdentifier, null);
        }

        public long getSize() {
            return -1L;
        }

        public FetchedInput.Type getType() {
            return FetchedInput.Type.MEMORY;
        }

        public OutputStream getOutputStream() throws IOException {
            return null;
        }

        public InputStream getInputStream() throws IOException {
            return null;
        }

        public void commit() throws IOException {
        }

        public void abort() throws IOException {
        }

        public void free() {
        }
    }

    private static class ShuffleManagerForTest
    extends ShuffleManager {
        public ShuffleManagerForTest(InputContext inputContext, Configuration conf, int numInputs, int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength, CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
            super(inputContext, conf, numInputs, bufferSize, ifileReadAheadEnabled, ifileReadAheadLength, codec, inputAllocator);
        }

        Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
            final Fetcher fetcher = (Fetcher)Mockito.spy((Object)super.constructFetcherForHost(inputHost, conf));
            final FetchResult mockFetcherResult = (FetchResult)Mockito.mock(FetchResult.class);
            try {
                ((Fetcher)Mockito.doAnswer((Answer)new Answer<FetchResult>(){

                    public FetchResult answer(InvocationOnMock invocation) throws Throwable {
                        for (InputAttemptIdentifier input : fetcher.getSrcAttempts()) {
                            this.fetchSucceeded(fetcher.getHost(), input, new TestFetchedInput(input), 0L, 0L, 0L);
                        }
                        return mockFetcherResult;
                    }
                }).when((Object)fetcher)).callInternal();
            }
            catch (Exception exception) {
                // empty catch block
            }
            return fetcher;
        }

        public int getNumOfCompletedInputs() {
            return this.completedInputSet.cardinality();
        }

        boolean isFetcherExecutorShutdown() {
            return this.fetcherExecutor.isShutdown();
        }
    }
}

