/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.Fetcher;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
import org.apache.tez.runtime.library.common.shuffle.LocalDiskFetchedInput;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestFetcher {
    private static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
    private static String HOST = "localhost";
    private static int PORT = 41;

    @Test(timeout=3000L)
    public void testLocalFetchModeSetting() throws Exception {
        TezConfiguration conf = new TezConfiguration();
        conf.setBoolean("tez.runtime.optimize.local.fetch", true);
        InputAttemptIdentifier[] srcAttempts = new InputAttemptIdentifier[]{new InputAttemptIdentifier(0, 1, "attemptpathComponent_1")};
        FetcherCallback fetcherCallback = (FetcherCallback)Mockito.mock(FetcherCallback.class);
        boolean ENABLE_LOCAL_FETCH = true;
        boolean DISABLE_LOCAL_FETCH = false;
        Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance((long)0L, (int)1), 1, null, "fetcherTest", (Configuration)conf, true, HOST, PORT, false);
        builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
        Fetcher fetcher = (Fetcher)Mockito.spy((Object)builder.build());
        FetchResult fr = new FetchResult(HOST, PORT, 0, Arrays.asList(srcAttempts));
        Fetcher.HostFetchResult hfr = new Fetcher.HostFetchResult(fr, srcAttempts, false);
        ((Fetcher)Mockito.doReturn((Object)hfr).when((Object)fetcher)).setupLocalDiskFetch();
        ((Fetcher)Mockito.doReturn(null).when((Object)fetcher)).doHttpFetch();
        ((Fetcher)Mockito.doNothing().when((Object)fetcher)).shutdown();
        fetcher.call();
        ((Fetcher)Mockito.verify((Object)fetcher)).setupLocalDiskFetch();
        ((Fetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).doHttpFetch();
        builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance((long)0L, (int)1), -1, null, "fetcherTest", (Configuration)conf, true, HOST, PORT, false);
        builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
        fetcher = (Fetcher)Mockito.spy((Object)builder.build());
        ((Fetcher)Mockito.doReturn(null).when((Object)fetcher)).setupLocalDiskFetch();
        ((Fetcher)Mockito.doReturn((Object)hfr).when((Object)fetcher)).doHttpFetch();
        ((Fetcher)Mockito.doNothing().when((Object)fetcher)).shutdown();
        fetcher.call();
        ((Fetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).setupLocalDiskFetch();
        ((Fetcher)Mockito.verify((Object)fetcher)).doHttpFetch();
        builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance((long)0L, (int)1), -1, null, "fetcherTest", (Configuration)conf, true, HOST, PORT, false);
        builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
        fetcher = (Fetcher)Mockito.spy((Object)builder.build());
        ((Fetcher)Mockito.doReturn(null).when((Object)fetcher)).setupLocalDiskFetch();
        ((Fetcher)Mockito.doReturn((Object)hfr).when((Object)fetcher)).doHttpFetch();
        ((Fetcher)Mockito.doNothing().when((Object)fetcher)).shutdown();
        fetcher.call();
        ((Fetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).setupLocalDiskFetch();
        ((Fetcher)Mockito.verify((Object)fetcher)).doHttpFetch();
        conf.setBoolean("tez.runtime.optimize.local.fetch", false);
        builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance((long)0L, (int)1), 1, null, "fetcherTest", (Configuration)conf, false, HOST, PORT, false);
        builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
        fetcher = (Fetcher)Mockito.spy((Object)builder.build());
        ((Fetcher)Mockito.doReturn(null).when((Object)fetcher)).setupLocalDiskFetch();
        ((Fetcher)Mockito.doReturn((Object)hfr).when((Object)fetcher)).doHttpFetch();
        ((Fetcher)Mockito.doNothing().when((Object)fetcher)).shutdown();
        fetcher.call();
        ((Fetcher)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.never())).setupLocalDiskFetch();
        ((Fetcher)Mockito.verify((Object)fetcher)).doHttpFetch();
    }

    @Test(timeout=3000L)
    public void testSetupLocalDiskFetch() throws Exception {
        InputAttemptIdentifier[] srcAttempts = new InputAttemptIdentifier[]{new InputAttemptIdentifier(0, 1, "attemptpathComponent_0"), new InputAttemptIdentifier(1, 2, "attemptpathComponent_1"), new InputAttemptIdentifier(2, 3, "attemptpathComponent_2"), new InputAttemptIdentifier(3, 4, "attemptpathComponent_3"), new InputAttemptIdentifier(4, 5, "attemptpathComponent_4")};
        int FIRST_FAILED_ATTEMPT_IDX = 2;
        int SECOND_FAILED_ATTEMPT_IDX = 4;
        int[] sucessfulAttempts = new int[]{0, 1, 3};
        TezConfiguration conf = new TezConfiguration();
        conf.set("tez.runtime.optimize.local.fetch", "true");
        int partition = 42;
        FetcherCallback callback = (FetcherCallback)Mockito.mock(FetcherCallback.class);
        Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, ApplicationId.newInstance((long)0L, (int)1), 1, null, "fetcherTest", (Configuration)conf, true, HOST, PORT, false);
        builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
        Fetcher fetcher = (Fetcher)Mockito.spy((Object)builder.build());
        ((Fetcher)Mockito.doAnswer((Answer)new Answer<Path>(){

            public Path answer(InvocationOnMock invocation) throws Throwable {
                Object[] args = invocation.getArguments();
                return new Path(TestFetcher.SHUFFLE_INPUT_FILE_PREFIX + args[0]);
            }
        }).when((Object)fetcher)).getShuffleInputFileName(Matchers.anyString(), Matchers.anyString());
        ((Fetcher)Mockito.doAnswer((Answer)new Answer<TezIndexRecord>(){

            public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
                int len;
                Object[] args = invocation.getArguments();
                InputAttemptIdentifier srcAttemptId = (InputAttemptIdentifier)args[0];
                String pathComponent = srcAttemptId.getPathComponent();
                long p = Long.valueOf(pathComponent.substring((len = pathComponent.length()) - 1, len));
                if (p == 2L || p == 4L) {
                    throw new IOException("failing on 3/5th input to simulate failure case");
                }
                return new TezIndexRecord(p * 10L, p * 1000L, p * 100L);
            }
        }).when((Object)fetcher)).getTezIndexRecord((InputAttemptIdentifier)Mockito.any(InputAttemptIdentifier.class));
        ((Fetcher)Mockito.doNothing().when((Object)fetcher)).shutdown();
        ((FetcherCallback)Mockito.doNothing().when((Object)callback)).fetchSucceeded(Matchers.anyString(), (InputAttemptIdentifier)Mockito.any(InputAttemptIdentifier.class), (FetchedInput)Mockito.any(FetchedInput.class), Matchers.anyLong(), Matchers.anyLong(), Matchers.anyLong());
        ((FetcherCallback)Mockito.doNothing().when((Object)callback)).fetchFailed(Matchers.anyString(), (InputAttemptIdentifier)Mockito.any(InputAttemptIdentifier.class), Matchers.eq((boolean)false));
        FetchResult fetchResult = (FetchResult)fetcher.call();
        ((Fetcher)Mockito.verify((Object)fetcher)).setupLocalDiskFetch();
        for (int i : sucessfulAttempts) {
            this.verifyFetchSucceeded(callback, srcAttempts[i], (Configuration)conf);
        }
        ((FetcherCallback)Mockito.verify((Object)callback)).fetchFailed((String)Matchers.eq((Object)HOST), (InputAttemptIdentifier)Matchers.eq((Object)srcAttempts[2]), Matchers.eq((boolean)false));
        ((FetcherCallback)Mockito.verify((Object)callback)).fetchFailed((String)Matchers.eq((Object)HOST), (InputAttemptIdentifier)Matchers.eq((Object)srcAttempts[4]), Matchers.eq((boolean)false));
        Assert.assertEquals((String)"fetchResult host", (Object)fetchResult.getHost(), (Object)HOST);
        Assert.assertEquals((String)"fetchResult partition", (long)fetchResult.getPartition(), (long)partition);
        Assert.assertEquals((String)"fetchResult port", (long)fetchResult.getPort(), (long)PORT);
        ArrayList pendingInputs = Lists.newArrayList((Iterable)fetchResult.getPendingInputs());
        Assert.assertEquals((String)"fetchResult pendingInput size", (long)pendingInputs.size(), (long)2L);
        Assert.assertEquals((String)"fetchResult failed attempt", pendingInputs.get(0), (Object)srcAttempts[2]);
        Assert.assertEquals((String)"fetchResult failed attempt", pendingInputs.get(1), (Object)srcAttempts[4]);
    }

    protected void verifyFetchSucceeded(FetcherCallback callback, InputAttemptIdentifier srcAttempId, Configuration conf) throws IOException {
        String pathComponent = srcAttempId.getPathComponent();
        int len = pathComponent.length();
        long p = Long.valueOf(pathComponent.substring(len - 1, len));
        ArgumentCaptor capturedFetchedInput = ArgumentCaptor.forClass(LocalDiskFetchedInput.class);
        ((FetcherCallback)Mockito.verify((Object)callback)).fetchSucceeded((String)Matchers.eq((Object)HOST), (InputAttemptIdentifier)Matchers.eq((Object)srcAttempId), (FetchedInput)capturedFetchedInput.capture(), Matchers.eq((long)(p * 100L)), Matchers.eq((long)(p * 1000L)), Matchers.anyLong());
        LocalDiskFetchedInput f = (LocalDiskFetchedInput)capturedFetchedInput.getValue();
        Assert.assertEquals((String)"success callback filename", (Object)f.getInputFile().toString(), (Object)(SHUFFLE_INPUT_FILE_PREFIX + pathComponent));
        Assert.assertTrue((String)"success callback fs", (boolean)(f.getLocalFS() instanceof LocalFileSystem));
        Assert.assertEquals((String)"success callback filesystem", (long)f.getStartOffset(), (long)(p * 10L));
        Assert.assertEquals((String)"success callback raw size", (long)f.getActualSize(), (long)(p * 1000L));
        Assert.assertEquals((String)"success callback compressed size", (long)f.getCompressedSize(), (long)(p * 100L));
        Assert.assertEquals((String)"success callback input id", (Object)f.getInputAttemptIdentifier(), (Object)srcAttempId);
        Assert.assertEquals((String)"success callback type", (Object)f.getType(), (Object)FetchedInput.Type.DISK_DIRECT);
    }

    @Test(timeout=5000L)
    public void testInputAttemptIdentifierMap() {
        InputAttemptIdentifier[] srcAttempts = new InputAttemptIdentifier[]{new InputAttemptIdentifier(new InputIdentifier(0), 1, "attemptpathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), new InputAttemptIdentifier(new InputIdentifier(0), 1, "attemptpathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), new InputAttemptIdentifier(new InputIdentifier(1), 1, "attemptpathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), new InputAttemptIdentifier(new InputIdentifier(1), 2, "attemptpathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), new InputAttemptIdentifier(new InputIdentifier(1), 1, "attemptpathComponent_2", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), new InputAttemptIdentifier(new InputIdentifier(1), 1, "attemptpathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), new InputAttemptIdentifier(new InputIdentifier(2), 1, "attemptpathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)};
        InputAttemptIdentifier[] expectedSrcAttempts = new InputAttemptIdentifier[]{new InputAttemptIdentifier(new InputIdentifier(0), 1, "attemptpathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), new InputAttemptIdentifier(new InputIdentifier(1), 1, "attemptpathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), new InputAttemptIdentifier(new InputIdentifier(1), 2, "attemptpathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), new InputAttemptIdentifier(new InputIdentifier(1), 1, "attemptpathComponent_2", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), new InputAttemptIdentifier(new InputIdentifier(1), 1, "attemptpathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), new InputAttemptIdentifier(new InputIdentifier(2), 1, "attemptpathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)};
        TezConfiguration conf = new TezConfiguration();
        conf.set("tez.runtime.optimize.local.fetch", "true");
        int partition = 42;
        FetcherCallback callback = (FetcherCallback)Mockito.mock(FetcherCallback.class);
        Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, ApplicationId.newInstance((long)0L, (int)1), 1, null, "fetcherTest", (Configuration)conf, true, HOST, PORT, false);
        builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
        Fetcher fetcher = (Fetcher)Mockito.spy((Object)builder.build());
        fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
        Assert.assertTrue((expectedSrcAttempts.length == fetcher.srcAttemptsRemaining.size() ? 1 : 0) != 0);
        Iterator iterator = fetcher.srcAttemptsRemaining.entrySet().iterator();
        int count = 0;
        while (iterator.hasNext()) {
            String key = (String)iterator.next().getKey();
            Assert.assertTrue((expectedSrcAttempts[count++].toString().compareTo(key) == 0 ? 1 : 0) != 0);
        }
    }
}

