package org.apache.spark.network.shuffle;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.sasl.SaslTimeoutException;
import org.apache.spark.network.shuffle.RetryingBlockTransferor;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.Stubber;

/* loaded from: input_file:org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.class */
public class RetryingBlockTransferorSuite {
    private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
    private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
    private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
    private static Map<String, String> configMap;
    private static RetryingBlockTransferor _retryingBlockTransferor;
    private static final int MAX_RETRIES = 2;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Before
    public void initMap() {
        configMap = new HashMap<String, String>() { // from class: org.apache.spark.network.shuffle.RetryingBlockTransferorSuite.1
            {
                put("spark.shuffle.io.maxRetries", Integer.toString(RetryingBlockTransferorSuite.MAX_RETRIES));
                put("spark.shuffle.io.retryWait", "0");
            }
        };
    }

    @Test
    public void testNoFailures() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        performInteractions(Arrays.asList(ImmutableMap.builder().put("b0", this.block0).put("b1", this.block1).build()), blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener)).onBlockTransferSuccess("b0", this.block0);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener)).onBlockTransferSuccess("b1", this.block1);
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
    }

    @Test
    public void testUnrecoverableFailure() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        performInteractions(Arrays.asList(ImmutableMap.builder().put("b0", new RuntimeException("Ouch!")).put("b1", this.block1).build()), blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener)).onBlockTransferFailure((String) Mockito.eq("b0"), (Throwable) Mockito.any());
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener)).onBlockTransferSuccess("b1", this.block1);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.atLeastOnce())).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
    }

    @Test
    public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        performInteractions(Arrays.asList(ImmutableMap.builder().put("b0", new IOException("Connection failed or something")).put("b1", this.block1).build(), ImmutableMap.builder().put("b0", this.block0).put("b1", this.block1).build()), blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b0", this.block0);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b1", this.block1);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.atLeastOnce())).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
    }

    @Test
    public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        performInteractions(Arrays.asList(ImmutableMap.builder().put("b0", this.block0).put("b1", new IOException("Connection failed or something")).build(), ImmutableMap.builder().put("b1", this.block1).build()), blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b0", this.block0);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b1", this.block1);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.atLeastOnce())).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
    }

    @Test
    public void testTwoIOExceptions() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        performInteractions(Arrays.asList(ImmutableMap.builder().put("b0", new IOException()).put("b1", new IOException()).build(), ImmutableMap.builder().put("b0", this.block0).put("b1", new IOException()).build(), ImmutableMap.builder().put("b1", this.block1).build()), blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b0", this.block0);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b1", this.block1);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.atLeastOnce())).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
    }

    @Test
    public void testThreeIOExceptions() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        performInteractions(Arrays.asList(ImmutableMap.builder().put("b0", new IOException()).put("b1", new IOException()).build(), ImmutableMap.builder().put("b0", this.block0).put("b1", new IOException()).build(), ImmutableMap.builder().put("b1", new IOException()).build(), ImmutableMap.builder().put("b1", this.block1).build()), blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b0", this.block0);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferFailure((String) Mockito.eq("b1"), (Throwable) Mockito.any());
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.atLeastOnce())).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
    }

    @Test
    public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        performInteractions(Arrays.asList(ImmutableMap.builder().put("b0", new IOException()).put("b1", new RuntimeException()).put("b2", this.block2).build(), ImmutableMap.builder().put("b0", this.block0).put("b1", new RuntimeException()).put("b2", new IOException()).build(), ImmutableMap.builder().put("b2", this.block2).build()), blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b0", this.block0);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferFailure((String) Mockito.eq("b1"), (Throwable) Mockito.any());
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b2", this.block2);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.atLeastOnce())).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
    }

    @Test
    public void testSaslTimeoutFailure() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        SaslTimeoutException saslTimeoutException = new SaslTimeoutException(new TimeoutException());
        performInteractions(Arrays.asList(ImmutableMap.builder().put("b0", saslTimeoutException).build(), ImmutableMap.builder().put("b0", this.block0).build()), blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferFailure("b0", saslTimeoutException);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener)).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
    }

    @Test
    public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        List asList = Arrays.asList(ImmutableMap.builder().put("b0", new SaslTimeoutException(new TimeoutException())).build(), ImmutableMap.builder().put("b0", this.block0).build());
        configMap.put("spark.shuffle.sasl.enableRetries", "true");
        performInteractions(asList, blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b0", this.block0);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener)).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
        if (!$assertionsDisabled && _retryingBlockTransferor.getRetryCount() != 0) {
            throw new AssertionError();
        }
    }

    @Test
    public void testRepeatedSaslRetryFailures() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        SaslTimeoutException saslTimeoutException = new SaslTimeoutException(new TimeoutException());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            arrayList.add(ImmutableMap.builder().put("b0", saslTimeoutException).build());
        }
        configMap.put("spark.shuffle.sasl.enableRetries", "true");
        performInteractions(arrayList, blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferFailure("b0", saslTimeoutException);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.times(3))).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
        if (!$assertionsDisabled && _retryingBlockTransferor.getRetryCount() != MAX_RETRIES) {
            throw new AssertionError();
        }
    }

    @Test
    public void testBlockTransferFailureAfterSasl() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        List asList = Arrays.asList(ImmutableMap.builder().put("b0", new SaslTimeoutException(new TimeoutException())).put("b1", new IOException()).build(), ImmutableMap.builder().put("b0", this.block0).put("b1", new IOException()).build(), ImmutableMap.builder().put("b1", this.block1).build());
        configMap.put("spark.shuffle.sasl.enableRetries", "true");
        performInteractions(asList, blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b0", this.block0);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferSuccess("b1", this.block1);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.atLeastOnce())).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
        if (!$assertionsDisabled && _retryingBlockTransferor.getRetryCount() != 1) {
            throw new AssertionError();
        }
    }

    @Test
    public void testIOExceptionFailsConnectionEvenWithSaslException() throws IOException, InterruptedException {
        BlockFetchingListener blockFetchingListener = (BlockFetchingListener) Mockito.mock(BlockFetchingListener.class);
        SaslTimeoutException saslTimeoutException = new SaslTimeoutException("initial", new TimeoutException());
        SaslTimeoutException saslTimeoutException2 = new SaslTimeoutException("final", new TimeoutException());
        IOException iOException = new IOException();
        List asList = Arrays.asList(ImmutableMap.of("b0", saslTimeoutException), ImmutableMap.of("b0", iOException), ImmutableMap.of("b0", saslTimeoutException), ImmutableMap.of("b0", iOException), ImmutableMap.of("b0", saslTimeoutException2), ImmutableMap.of("b0", iOException), ImmutableMap.of("b0", this.block0));
        configMap.put("spark.shuffle.sasl.enableRetries", "true");
        performInteractions(asList, blockFetchingListener);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.timeout(5000L))).onBlockTransferFailure("b0", saslTimeoutException2);
        ((BlockFetchingListener) Mockito.verify(blockFetchingListener, Mockito.atLeastOnce())).getTransferType();
        Mockito.verifyNoMoreInteractions(new Object[]{blockFetchingListener});
        if (!$assertionsDisabled && _retryingBlockTransferor.getRetryCount() != MAX_RETRIES) {
            throw new AssertionError();
        }
    }

    private static void performInteractions(List<? extends Map<String, Object>> list, BlockFetchingListener blockFetchingListener) throws IOException, InterruptedException {
        TransportConf transportConf = new TransportConf("shuffle", new MapConfigProvider(configMap));
        RetryingBlockTransferor.BlockTransferStarter blockTransferStarter = (RetryingBlockTransferor.BlockTransferStarter) Mockito.mock(RetryingBlockTransferor.BlockTransferStarter.class);
        Stubber stubber = null;
        LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
        for (Map<String, Object> map : list) {
            newLinkedHashSet.addAll(map.keySet());
            Answer answer = invocationOnMock -> {
                try {
                    Assert.assertArrayEquals((String[]) map.keySet().toArray(new String[map.size()]), (String[]) invocationOnMock.getArguments()[0]);
                    BlockFetchingListener blockFetchingListener2 = (BlockFetchingListener) invocationOnMock.getArguments()[1];
                    for (Map.Entry entry : map.entrySet()) {
                        String str = (String) entry.getKey();
                        Object value = entry.getValue();
                        if (value instanceof ManagedBuffer) {
                            blockFetchingListener2.onBlockFetchSuccess(str, (ManagedBuffer) value);
                        } else if (value instanceof Exception) {
                            blockFetchingListener2.onBlockFetchFailure(str, (Exception) value);
                        } else {
                            Assert.fail("Can only handle ManagedBuffers and Exceptions, got " + value);
                        }
                    }
                    return null;
                } catch (Throwable th) {
                    th.printStackTrace();
                    throw th;
                }
            };
            if (stubber == null) {
                stubber = Mockito.doAnswer(answer);
            } else {
                stubber.doAnswer(answer);
            }
        }
        Assert.assertNotNull(stubber);
        ((RetryingBlockTransferor.BlockTransferStarter) stubber.when(blockTransferStarter)).createAndStart((String[]) Mockito.any(), (BlockTransferListener) Mockito.any());
        _retryingBlockTransferor = new RetryingBlockTransferor(transportConf, blockTransferStarter, (String[]) newLinkedHashSet.toArray(new String[newLinkedHashSet.size()]), blockFetchingListener);
        _retryingBlockTransferor.start();
    }

    static {
        $assertionsDisabled = !RetryingBlockTransferorSuite.class.desiredAssertionStatus();
    }
}
