/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ExceptionReporter;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetchedInputAllocatorOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestShuffleScheduler {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testNumParallelScheduledFetchers() throws IOException, InterruptedException {
        InputContext inputContext = this.createTezInputContext();
        TezConfiguration conf = new TezConfiguration();
        conf.setInt("tez.runtime.shuffle.parallel.copies", 10);
        int numInputs = 50;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        MergeManager mergeManager = (MergeManager)Mockito.mock(MergeManager.class);
        final ShuffleSchedulerForTest scheduler = new ShuffleSchedulerForTest(inputContext, (Configuration)conf, numInputs, shuffle, mergeManager, (FetchedInputAllocatorOrderedGrouped)mergeManager, System.currentTimeMillis(), null, false, 0, "srcName", true);
        Future<Void> executorFuture = null;
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            executorFuture = executor.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    scheduler.start();
                    return null;
                }
            });
            InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
            for (int i = 0; i < numInputs; ++i) {
                InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
                scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
                identifiers[i] = inputAttemptIdentifier;
            }
            Thread.sleep(2000L);
            Assert.assertEquals((long)10L, (long)scheduler.numFetchersCreated.get());
        }
        finally {
            scheduler.close();
            if (executorFuture != null) {
                executorFuture.cancel(true);
            }
            executor.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testSimpleFlow() throws Exception {
        InputContext inputContext = this.createTezInputContext();
        TezConfiguration conf = new TezConfiguration();
        int numInputs = 10;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        MergeManager mergeManager = (MergeManager)Mockito.mock(MergeManager.class);
        final ShuffleSchedulerForTest scheduler = new ShuffleSchedulerForTest(inputContext, (Configuration)conf, numInputs, shuffle, mergeManager, (FetchedInputAllocatorOrderedGrouped)mergeManager, System.currentTimeMillis(), null, false, 0, "srcName");
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            Future<Void> executorFuture = executor.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    scheduler.start();
                    return null;
                }
            });
            InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
            for (int i = 0; i < numInputs; ++i) {
                InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
                scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
                identifiers[i] = inputAttemptIdentifier;
            }
            MapHost[] mapHosts = new MapHost[numInputs];
            int count = 0;
            for (MapHost mh : scheduler.mapLocations.values()) {
                mapHosts[count++] = mh;
            }
            for (int i = 0; i < numInputs; ++i) {
                MapOutput mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)identifiers[i], (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
                scheduler.copySucceeded(identifiers[i], mapHosts[i], 20L, 25L, 100L, mapOutput, false);
                scheduler.freeHost(mapHosts[i]);
            }
            ((InputContext)Mockito.verify((Object)inputContext, (VerificationMode)Mockito.atLeast((int)numInputs))).notifyProgress();
            executorFuture.get();
        }
        finally {
            scheduler.close();
            executor.shutdownNow();
        }
    }

    @Test(timeout=60000L)
    public void testReducerHealth_1() throws IOException {
        TezConfiguration conf = new TezConfiguration();
        this._testReducerHealth_1((Configuration)conf);
        conf.setInt("tez.runtime.shuffle.min.failures.per.host", 4000);
        this._testReducerHealth_1((Configuration)conf);
    }

    public void _testReducerHealth_1(Configuration conf) throws IOException {
        InputAttemptIdentifier inputAttemptIdentifier;
        int i;
        long startTime = System.currentTimeMillis() - 500000L;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        ShuffleSchedulerForTest scheduler = this.createScheduler(startTime, 320, shuffle, conf);
        int totalProducerNodes = 20;
        for (i = 0; i < 320; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.addKnownMapOutput("host" + i % totalProducerNodes, 10000, i, "hostUrl", inputAttemptIdentifier);
        }
        for (i = 0; i < 100; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            MapOutput mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)inputAttemptIdentifier, (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
            scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), 100L, 200L, startTime + (long)(i * 100), mapOutput, false);
        }
        for (i = 100; i < 199; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
        }
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(new InputIdentifier(200), 0, "attempt_");
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(200, "host" + 200 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        int minFailurePerHost = conf.getInt("tez.runtime.shuffle.min.failures.per.host", 4);
        if (minFailurePerHost <= 4) {
            ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.atLeast((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
        } else if (minFailurePerHost > 100) {
            ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.atLeast((int)1))).reportException((Throwable)Matchers.any(Throwable.class));
        }
    }

    @Test(timeout=60000L)
    public void testReducerHealth_2() throws IOException, InterruptedException {
        int i;
        InputAttemptIdentifier inputAttemptIdentifier;
        int i2;
        long startTime = System.currentTimeMillis() - 500000L;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        ShuffleSchedulerForTest scheduler = this.createScheduler(startTime, 320, shuffle);
        int totalProducerNodes = 20;
        for (i2 = 0; i2 < 200; ++i2) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i2), 0, "attempt_");
            scheduler.addKnownMapOutput("host" + i2 % totalProducerNodes, 10000, i2, "hostUrl", inputAttemptIdentifier);
        }
        Assert.assertEquals((long)320L, (long)scheduler.remainingMaps.get());
        for (i2 = 200; i2 < 320; ++i2) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i2), 0, "attempt_");
            scheduler.copySucceeded(inputAttemptIdentifier, null, 0L, 0L, 0L, null, true);
        }
        Assert.assertEquals((long)200L, (long)scheduler.remainingMaps.get());
        for (i2 = 0; i2 < 190; ++i2) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i2), 0, "attempt_");
            MapOutput mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)inputAttemptIdentifier, (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
            scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i2, "host" + i2 % totalProducerNodes + ":" + 10000, ""), 100L, 200L, startTime + (long)(i2 * 100), mapOutput, false);
        }
        Assert.assertEquals((long)10L, (long)scheduler.remainingMaps.get());
        for (i2 = 190; i2 < 200; ++i2) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i2), 0, "attempt_");
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i2, "host" + i2 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        }
        ((ExceptionReporter)Mockito.verify((Object)scheduler.reporter, (VerificationMode)Mockito.times((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
        scheduler.lastProgressTime = System.currentTimeMillis() - 250000L;
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(new InputIdentifier(190), 0, "attempt_");
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(190, "host" + 190 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        ((ExceptionReporter)Mockito.verify((Object)scheduler.reporter, (VerificationMode)Mockito.times((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
        Assert.assertEquals((long)11L, (long)scheduler.failedShufflesSinceLastCompletion);
        for (i = 190; i < 200; ++i) {
            inputAttemptIdentifier2 = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
        }
        Assert.assertEquals((long)61L, (long)scheduler.failedShufflesSinceLastCompletion);
        Assert.assertEquals((long)10L, (long)scheduler.remainingMaps.get());
        ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.atLeast((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
        for (i = 110; i < 120; ++i) {
            inputAttemptIdentifier2 = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
        }
        ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.atLeast((int)1))).reportException((Throwable)Matchers.any(Throwable.class));
    }

    @Test(timeout=60000L)
    public void testReducerHealth_3() throws IOException {
        InputAttemptIdentifier inputAttemptIdentifier;
        int i;
        long startTime = System.currentTimeMillis() - 500000L;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        ShuffleSchedulerForTest scheduler = this.createScheduler(startTime, 320, shuffle);
        int totalProducerNodes = 20;
        for (i = 0; i < 320; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.addKnownMapOutput("host" + i % totalProducerNodes, 10000, i, "hostUrl", inputAttemptIdentifier);
        }
        for (i = 0; i < 319; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            MapOutput mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)inputAttemptIdentifier, (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
            scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), 100L, 200L, startTime + (long)(i * 100), mapOutput, false);
        }
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.lastProgressTime = System.currentTimeMillis() - 1000000L;
        Assert.assertEquals((long)scheduler.remainingMaps.get(), (long)1L);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.times((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
    }

    @Test(timeout=60000L)
    public void testReducerHealth_4() throws IOException {
        MapOutput mapOutput;
        InputAttemptIdentifier inputAttemptIdentifier;
        int i;
        long startTime = System.currentTimeMillis() - 500000L;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        ShuffleSchedulerForTest scheduler = this.createScheduler(startTime, 320, shuffle);
        int totalProducerNodes = 20;
        for (i = 0; i < 320; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.addKnownMapOutput("host" + i % totalProducerNodes, 10000, i, "hostUrl", inputAttemptIdentifier);
        }
        for (i = 0; i < 64; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)inputAttemptIdentifier, (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
            scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), 100L, 200L, startTime + (long)(i * 100), mapOutput, false);
        }
        for (i = 64; i < 319; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)inputAttemptIdentifier, (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
            scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), 100L, 200L, startTime + (long)(i * 100), mapOutput, false);
        }
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.lastProgressTime = System.currentTimeMillis() - 100000L;
        Assert.assertEquals((long)scheduler.remainingMaps.get(), (long)1L);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.times((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
        scheduler.lastProgressTime = System.currentTimeMillis() - 300000L;
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(319, "host" + 319 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.times((int)1))).reportException((Throwable)Matchers.any(Throwable.class));
    }

    @Test(timeout=60000L)
    public void testReducerHealth_5() throws IOException {
        InputAttemptIdentifier inputAttemptIdentifier;
        int i;
        long startTime = System.currentTimeMillis() - 500000L;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        ShuffleSchedulerForTest scheduler = this.createScheduler(startTime, 320, shuffle);
        int totalProducerNodes = 20;
        for (i = 0; i < 319; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.addKnownMapOutput("host" + i % totalProducerNodes, 10000, i, "hostUrl", inputAttemptIdentifier);
        }
        for (i = 0; i < 319; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            MapOutput mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)inputAttemptIdentifier, (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
            scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), 100L, 200L, startTime + (long)(i * 100), mapOutput, false);
        }
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(new InputIdentifier(318), 0, "attempt_");
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(318, "host" + 318 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.lastProgressTime = System.currentTimeMillis() - 1000000L;
        Assert.assertEquals((long)scheduler.remainingMaps.get(), (long)1L);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(318, "host" + 318 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(318, "host" + 318 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        scheduler.copyFailed(inputAttemptIdentifier2, new MapHost(318, "host" + 318 % totalProducerNodes + ":" + 10000, ""), false, true, false);
        ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.times((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
    }

    @Test(timeout=60000L)
    public void testReducerHealth_6() throws IOException {
        TezConfiguration conf = new TezConfiguration();
        conf.setBoolean("tez.runtime.shuffle.failed.check.since-last.completion", true);
        this._testReducerHealth_6((Configuration)conf);
        conf.setBoolean("tez.runtime.shuffle.failed.check.since-last.completion", false);
        this._testReducerHealth_6((Configuration)conf);
    }

    public void _testReducerHealth_6(Configuration conf) throws IOException {
        InputAttemptIdentifier inputAttemptIdentifier;
        int i;
        long startTime = System.currentTimeMillis() - 500000L;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        ShuffleSchedulerForTest scheduler = this.createScheduler(startTime, 320, shuffle, conf);
        int totalProducerNodes = 20;
        for (i = 0; i < 320; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.addKnownMapOutput("host" + i % totalProducerNodes, 10000, i, "hostUrl", inputAttemptIdentifier);
        }
        for (i = 0; i < 10; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            MapOutput mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)inputAttemptIdentifier, (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
            scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), 100L, 200L, startTime + (long)(i * 100), mapOutput, false);
        }
        for (i = 10; i < 15; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
        }
        Assert.assertTrue((scheduler.failureCounts.size() >= 5 ? 1 : 0) != 0);
        Assert.assertEquals((long)scheduler.remainingMaps.get(), (long)310L);
        ((ExceptionReporter)Mockito.verify((Object)scheduler.reporter, (VerificationMode)Mockito.times((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
        for (i = 10; i < 15; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
        }
        boolean checkFailedFetchSinceLastCompletion = conf.getBoolean("tez.runtime.shuffle.failed.check.since-last.completion", true);
        if (checkFailedFetchSinceLastCompletion) {
            ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.atLeast((int)1))).reportException((Throwable)Matchers.any(Throwable.class));
        } else {
            ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.atLeast((int)0))).reportException((Throwable)Matchers.any(Throwable.class));
        }
    }

    @Test(timeout=60000L)
    public void testReducerHealth_7() throws IOException {
        InputAttemptIdentifier inputAttemptIdentifier;
        int i;
        long startTime = System.currentTimeMillis() - 500000L;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        ShuffleSchedulerForTest scheduler = this.createScheduler(startTime, 320, shuffle);
        int totalProducerNodes = 20;
        for (i = 0; i < 320; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.addKnownMapOutput("host" + i % totalProducerNodes, 10000, i, "hostUrl", inputAttemptIdentifier);
        }
        for (i = 0; i < 100; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            MapOutput mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)inputAttemptIdentifier, (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
            scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), 100L, 200L, startTime + (long)(i * 100), mapOutput, false);
        }
        for (i = 100; i < 199; ++i) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
            scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + i % totalProducerNodes + ":" + 10000, ""), false, true, false);
        }
        ((Shuffle)Mockito.verify((Object)shuffle, (VerificationMode)Mockito.atLeast((int)1))).reportException((Throwable)Matchers.any(Throwable.class));
    }

    private ShuffleSchedulerForTest createScheduler(long startTime, int numInputs, Shuffle shuffle, Configuration conf) throws IOException {
        InputContext inputContext = this.createTezInputContext();
        MergeManager mergeManager = (MergeManager)Mockito.mock(MergeManager.class);
        ShuffleSchedulerForTest scheduler = new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager, (FetchedInputAllocatorOrderedGrouped)mergeManager, startTime, null, false, 0, "srcName");
        return scheduler;
    }

    private ShuffleSchedulerForTest createScheduler(long startTime, int numInputs, Shuffle shuffle) throws IOException {
        return this.createScheduler(startTime, numInputs, shuffle, (Configuration)new TezConfiguration());
    }

    @Test(timeout=60000L)
    public void testPenalty() throws IOException, InterruptedException {
        long startTime = System.currentTimeMillis();
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        ShuffleSchedulerForTest scheduler = this.createScheduler(startTime, 1, shuffle);
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt_");
        scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier);
        Assert.assertTrue((scheduler.pendingHosts.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((((MapHost)scheduler.pendingHosts.iterator().next()).getState() == MapHost.State.PENDING ? 1 : 0) != 0);
        MapHost mapHost = (MapHost)scheduler.pendingHosts.iterator().next();
        scheduler.copyFailed(inputAttemptIdentifier, mapHost, false, true, false);
        MapHost host = scheduler.getHost();
        Assert.assertFalse((String)host.getIdentifier(), (boolean)host.getIdentifier().equalsIgnoreCase("host0:10000"));
        Thread.sleep(3000L);
        host = scheduler.getHost();
        Assert.assertFalse((String)host.getIdentifier(), (boolean)host.getIdentifier().equalsIgnoreCase("host0:10000"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testShutdown() throws Exception {
        InputContext inputContext = this.createTezInputContext();
        TezConfiguration conf = new TezConfiguration();
        int numInputs = 10;
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        MergeManager mergeManager = (MergeManager)Mockito.mock(MergeManager.class);
        final ShuffleSchedulerForTest scheduler = new ShuffleSchedulerForTest(inputContext, (Configuration)conf, numInputs, shuffle, mergeManager, (FetchedInputAllocatorOrderedGrouped)mergeManager, System.currentTimeMillis(), null, false, 0, "srcName");
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            Future<Void> executorFuture = executor.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    scheduler.start();
                    return null;
                }
            });
            InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
            for (int i = 0; i < numInputs; ++i) {
                InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
                scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
                identifiers[i] = inputAttemptIdentifier;
            }
            MapHost[] mapHosts = new MapHost[numInputs];
            int count = 0;
            for (MapHost mh : scheduler.mapLocations.values()) {
                mapHosts[count++] = mh;
            }
            for (int i = 0; i < numInputs - 1; ++i) {
                MapOutput mapOutput = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)identifiers[i], (FetchedInputAllocatorOrderedGrouped)((FetchedInputAllocatorOrderedGrouped)Mockito.mock(FetchedInputAllocatorOrderedGrouped.class)), (int)100, (boolean)false);
                scheduler.copySucceeded(identifiers[i], mapHosts[i], 20L, 25L, 100L, mapOutput, false);
                scheduler.freeHost(mapHosts[i]);
            }
            scheduler.close();
            executorFuture.get();
        }
        finally {
            scheduler.close();
            executor.shutdownNow();
        }
    }

    private InputContext createTezInputContext() throws IOException {
        ApplicationId applicationId = ApplicationId.newInstance((long)1L, (int)1);
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ((InputContext)Mockito.doReturn((Object)applicationId).when((Object)inputContext)).getApplicationId();
        ((InputContext)Mockito.doReturn((Object)"sourceVertex").when((Object)inputContext)).getSourceVertexName();
        Mockito.when((Object)inputContext.getCounters()).thenReturn((Object)new TezCounters());
        ExecutionContextImpl executionContext = new ExecutionContextImpl("localhost");
        ((InputContext)Mockito.doReturn((Object)executionContext).when((Object)inputContext)).getExecutionContext();
        ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4);
        ((InputContext)Mockito.doReturn((Object)shuffleBuffer).when((Object)inputContext)).getServiceProviderMetaData(Matchers.anyString());
        Token sessionToken = new Token((TokenIdentifier)new JobTokenIdentifier(new Text("text")), (SecretManager)new JobTokenSecretManager());
        ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData((Token)sessionToken);
        ((InputContext)Mockito.doReturn((Object)tokenBuffer).when((Object)inputContext)).getServiceConsumerMetaData(Matchers.anyString());
        return inputContext;
    }

    private static class ShuffleSchedulerForTest
    extends ShuffleScheduler {
        private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
        private final boolean fetcherShouldWait;
        private final ExceptionReporter reporter;

        public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf, int numberOfInputs, Shuffle shuffle, MergeManager mergeManager, FetchedInputAllocatorOrderedGrouped allocator, long startTime, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, String srcNameTrimmed) throws IOException {
            this(inputContext, conf, numberOfInputs, shuffle, mergeManager, allocator, startTime, codec, ifileReadAhead, ifileReadAheadLength, srcNameTrimmed, false);
        }

        public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf, int numberOfInputs, Shuffle shuffle, MergeManager mergeManager, FetchedInputAllocatorOrderedGrouped allocator, long startTime, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, String srcNameTrimmed, boolean fetcherShouldWait) throws IOException {
            super(inputContext, conf, numberOfInputs, (ExceptionReporter)shuffle, mergeManager, allocator, startTime, codec, ifileReadAhead, ifileReadAheadLength, srcNameTrimmed);
            this.fetcherShouldWait = fetcherShouldWait;
            this.reporter = shuffle;
        }

        FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
            this.numFetchersCreated.incrementAndGet();
            FetcherOrderedGrouped mockFetcher = (FetcherOrderedGrouped)Mockito.mock(FetcherOrderedGrouped.class);
            ((FetcherOrderedGrouped)Mockito.doAnswer((Answer)new Answer(){

                public Object answer(InvocationOnMock invocation) throws Throwable {
                    if (ShuffleSchedulerForTest.this.fetcherShouldWait) {
                        Thread.sleep(100000L);
                    }
                    return null;
                }
            }).when((Object)mockFetcher)).callInternal();
            return mockFetcher;
        }
    }
}

