package org.apache.flink.runtime.executiongraph;

import akka.actor.Actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import org.apache.camel.EndpointConfiguration;
import org.apache.flink.api.common.io.StrictlyLocalAssignment;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.hadoop.hdfs.TestBlockReaderLocal;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.class */
public class LocalInputSplitsTest {
    private static final FiniteDuration TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
    private static ActorSystem system;
    private static TestActorRef<? extends Actor> taskManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/LocalInputSplitsTest$TestInputSplitSource.class */
    public static class TestInputSplitSource implements InputSplitSource<TestLocatableInputSplit>, StrictlyLocalAssignment {
        private static final long serialVersionUID = 1;
        private final TestLocatableInputSplit[] splits;

        public TestInputSplitSource(TestLocatableInputSplit[] testLocatableInputSplitArr) {
            this.splits = testLocatableInputSplitArr;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.core.io.InputSplitSource
        public TestLocatableInputSplit[] createInputSplits(int i) {
            return this.splits;
        }

        @Override // org.apache.flink.core.io.InputSplitSource
        public InputSplitAssigner getInputSplitAssigner(TestLocatableInputSplit[] testLocatableInputSplitArr) {
            Assert.fail("This method should not be called on StrictlyLocalAssignment splits.");
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/LocalInputSplitsTest$TestLocatableInputSplit.class */
    public static class TestLocatableInputSplit extends LocatableInputSplit {
        private static final long serialVersionUID = 1;

        public TestLocatableInputSplit(int i, String str) {
            super(i, str);
        }

        public TestLocatableInputSplit(int i, String[] strArr) {
            super(i, strArr);
        }
    }

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
        taskManager = TestActorRef.create(system, Props.create((Class<?>) ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class, new Object[0]));
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
        system = null;
    }

    @Test
    public void testNotEnoughSubtasks() {
        try {
            runTests(3, 1, 2, new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "host2"), new TestLocatableInputSplit(3, "host3")});
            Assert.fail("should throw an exception");
        } catch (JobException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testDisallowMultipleLocations() {
        try {
            runTests(2, 1, 2, new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, new String[]{"host1", "host2"}), new TestLocatableInputSplit(2, new String[]{"host1", "host2"})});
            Assert.fail("should throw an exception");
        } catch (JobException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testNonExistentHost() {
        try {
            runTests(2, 1, 2, new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "bogus_host")});
            Assert.fail("should throw an exception");
        } catch (JobException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testEqualSplitsPerHostAndSubtask() {
        try {
            String[] runTests = runTests(5, 2, 10, new TestLocatableInputSplit[]{new TestLocatableInputSplit(7, "host4"), new TestLocatableInputSplit(8, "host4"), new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "host1"), new TestLocatableInputSplit(3, "host2"), new TestLocatableInputSplit(4, "host2"), new TestLocatableInputSplit(5, "host3"), new TestLocatableInputSplit(6, "host3"), new TestLocatableInputSplit(9, "host5"), new TestLocatableInputSplit(10, "host5")});
            Assert.assertEquals("host1", runTests[0]);
            Assert.assertEquals("host1", runTests[1]);
            Assert.assertEquals("host2", runTests[2]);
            Assert.assertEquals("host2", runTests[3]);
            Assert.assertEquals("host3", runTests[4]);
            Assert.assertEquals("host3", runTests[5]);
            Assert.assertEquals("host4", runTests[6]);
            Assert.assertEquals("host4", runTests[7]);
            Assert.assertEquals("host5", runTests[8]);
            Assert.assertEquals("host5", runTests[9]);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testNonEqualSplitsPerhost() {
        try {
            String[] runTests = runTests(3, 2, 5, new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host3"), new TestLocatableInputSplit(2, "host1"), new TestLocatableInputSplit(3, "host1"), new TestLocatableInputSplit(4, "host1"), new TestLocatableInputSplit(5, "host1"), new TestLocatableInputSplit(6, "host1"), new TestLocatableInputSplit(7, "host2"), new TestLocatableInputSplit(8, "host2")});
            Assert.assertEquals("host1", runTests[0]);
            Assert.assertEquals("host1", runTests[1]);
            Assert.assertEquals("host2", runTests[2]);
            Assert.assertEquals("host2", runTests[3]);
            Assert.assertEquals("host3", runTests[4]);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWithSubtasksEmpty() {
        try {
            String[] runTests = runTests(3, 5, 7, new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "host2"), new TestLocatableInputSplit(3, "host2"), new TestLocatableInputSplit(4, "host2"), new TestLocatableInputSplit(5, "host3"), new TestLocatableInputSplit(6, "host3"), new TestLocatableInputSplit(7, "host3"), new TestLocatableInputSplit(8, "host3"), new TestLocatableInputSplit(9, "host3"), new TestLocatableInputSplit(10, "host3"), new TestLocatableInputSplit(11, "host3"), new TestLocatableInputSplit(12, "host3"), new TestLocatableInputSplit(13, "host3")});
            Assert.assertEquals("host1", runTests[0]);
            Assert.assertEquals("host2", runTests[1]);
            Assert.assertEquals("host2", runTests[2]);
            Assert.assertEquals("host3", runTests[3]);
            Assert.assertEquals("host3", runTests[4]);
            Assert.assertTrue(runTests[5].equals("host1") || runTests[5].equals("host2") || runTests[5].equals("host3"));
            Assert.assertTrue(runTests[6].equals("host1") || runTests[6].equals("host2") || runTests[6].equals("host3"));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultipleInstancesPerHost() {
        TestLocatableInputSplit[] testLocatableInputSplitArr = {new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "host1"), new TestLocatableInputSplit(3, "host2"), new TestLocatableInputSplit(4, "host2"), new TestLocatableInputSplit(5, "host3"), new TestLocatableInputSplit(6, "host3")};
        try {
            JobVertex jobVertex = new JobVertex("test vertex");
            jobVertex.setParallelism(6);
            jobVertex.setInvokableClass(DummyInvokable.class);
            jobVertex.setInputSplitSource(new TestInputSplitSource(testLocatableInputSplitArr));
            JobGraph jobGraph = new JobGraph("test job", jobVertex);
            ExecutionGraph executionGraph = new ExecutionGraph(jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT);
            executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
            executionGraph.setQueuedSchedulingAllowed(false);
            Scheduler scheduler = new Scheduler();
            Instance localInputSplitsTest = getInstance(new byte[]{10, 0, 1, 1}, TestBlockReaderLocal.BlockReaderLocalTest.TEST_LENGTH, "host1", 1);
            Instance localInputSplitsTest2 = getInstance(new byte[]{10, 0, 1, 1}, 12346, "host1", 1);
            Instance localInputSplitsTest3 = getInstance(new byte[]{10, 0, 1, 2}, TestBlockReaderLocal.BlockReaderLocalTest.TEST_LENGTH, "host2", 1);
            Instance localInputSplitsTest4 = getInstance(new byte[]{10, 0, 1, 2}, 12346, "host2", 1);
            Instance localInputSplitsTest5 = getInstance(new byte[]{10, 0, 1, 3}, TestBlockReaderLocal.BlockReaderLocalTest.TEST_LENGTH, "host3", 1);
            Instance localInputSplitsTest6 = getInstance(new byte[]{10, 0, 1, 3}, 12346, "host4", 1);
            scheduler.newInstanceAvailable(localInputSplitsTest);
            scheduler.newInstanceAvailable(localInputSplitsTest2);
            scheduler.newInstanceAvailable(localInputSplitsTest3);
            scheduler.newInstanceAvailable(localInputSplitsTest4);
            scheduler.newInstanceAvailable(localInputSplitsTest5);
            scheduler.newInstanceAvailable(localInputSplitsTest6);
            executionGraph.scheduleForExecution(scheduler);
            ExecutionVertex[] taskVertices = executionGraph.getVerticesTopologically().iterator().next().getTaskVertices();
            Assert.assertEquals(6L, taskVertices.length);
            Instance simpleSlot = taskVertices[0].getCurrentAssignedResource().getInstance();
            Instance simpleSlot2 = taskVertices[1].getCurrentAssignedResource().getInstance();
            Instance simpleSlot3 = taskVertices[2].getCurrentAssignedResource().getInstance();
            Instance simpleSlot4 = taskVertices[3].getCurrentAssignedResource().getInstance();
            Instance simpleSlot5 = taskVertices[4].getCurrentAssignedResource().getInstance();
            Instance simpleSlot6 = taskVertices[5].getCurrentAssignedResource().getInstance();
            Assert.assertTrue(simpleSlot == localInputSplitsTest || simpleSlot == localInputSplitsTest2);
            Assert.assertTrue(simpleSlot2 == localInputSplitsTest || simpleSlot2 == localInputSplitsTest2);
            Assert.assertTrue(simpleSlot3 == localInputSplitsTest3 || simpleSlot3 == localInputSplitsTest4);
            Assert.assertTrue(simpleSlot4 == localInputSplitsTest3 || simpleSlot4 == localInputSplitsTest4);
            Assert.assertTrue(simpleSlot5 == localInputSplitsTest5 || simpleSlot5 == localInputSplitsTest6);
            Assert.assertTrue(simpleSlot6 == localInputSplitsTest5 || simpleSlot6 == localInputSplitsTest6);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private static String[] runTests(int i, int i2, int i3, TestLocatableInputSplit[] testLocatableInputSplitArr) throws Exception {
        JobVertex jobVertex = new JobVertex("test vertex");
        jobVertex.setParallelism(i3);
        jobVertex.setInvokableClass(DummyInvokable.class);
        jobVertex.setInputSplitSource(new TestInputSplitSource(testLocatableInputSplitArr));
        JobGraph jobGraph = new JobGraph("test job", jobVertex);
        ExecutionGraph executionGraph = new ExecutionGraph(jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT);
        executionGraph.setQueuedSchedulingAllowed(false);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        executionGraph.scheduleForExecution(getScheduler(i, i2));
        ExecutionVertex[] taskVertices = executionGraph.getVerticesTopologically().iterator().next().getTaskVertices();
        Assert.assertEquals(i3, taskVertices.length);
        String[] strArr = new String[i3];
        for (int i4 = 0; i4 < i3; i4++) {
            strArr[i4] = taskVertices[i4].getCurrentAssignedResourceLocation().getHostname();
        }
        return strArr;
    }

    private static Scheduler getScheduler(int i, int i2) throws Exception {
        Scheduler scheduler = new Scheduler();
        for (int i3 = 0; i3 < i; i3++) {
            scheduler.newInstanceAvailable(getInstance(new byte[]{10, 0, 1, (byte) (1 + i3)}, 12001 + i3, EndpointConfiguration.URI_HOST + (i3 + 1), i2));
        }
        return scheduler;
    }

    private static Instance getInstance(byte[] bArr, int i, String str, int i2) throws Exception {
        HardwareDescription hardwareDescription = new HardwareDescription(4, NativeIO.Windows.GENERIC_READ, 1073741824L, 536870912L);
        InstanceConnectionInfo instanceConnectionInfo = (InstanceConnectionInfo) Mockito.mock(InstanceConnectionInfo.class);
        Mockito.when(instanceConnectionInfo.address()).thenReturn(InetAddress.getByAddress(bArr));
        Mockito.when(Integer.valueOf(instanceConnectionInfo.dataPort())).thenReturn(Integer.valueOf(i));
        Mockito.when(instanceConnectionInfo.getInetAdress()).thenReturn(InetAddress.getByAddress(bArr).toString());
        Mockito.when(instanceConnectionInfo.getHostname()).thenReturn(str);
        Mockito.when(instanceConnectionInfo.getFQDNHostname()).thenReturn(str);
        return new Instance(taskManager, instanceConnectionInfo, new InstanceID(), hardwareDescription, i2);
    }
}
