package org.apache.flink.runtime.source.coordinator;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.class */
public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
    @Test
    public void testRegisterReader() throws Exception {
        sourceReady();
        List<ReaderInfo> registerReaders = registerReaders();
        Assert.assertTrue(this.context.registeredReaders().containsKey(0));
        Assert.assertTrue(this.context.registeredReaders().containsKey(1));
        Assert.assertEquals(registerReaders.get(0), this.context.registeredReaders().get(0));
        Assert.assertEquals(registerReaders.get(1), this.context.registeredReaders().get(1));
        Assert.assertThat(getEnumerator().getRegisteredReaders(), Matchers.containsInAnyOrder(new Integer[]{0, 1, 2}));
    }

    @Test
    public void testTaskFailureUnregistersReader() throws Exception {
        sourceReady();
        List<ReaderInfo> registerReaders = registerReaders();
        this.sourceCoordinator.subtaskFailed(0, (Throwable) null);
        waitForCoordinatorToProcessActions();
        Assert.assertEquals("Only reader 2 should be registered.", 2L, this.context.registeredReaders().size());
        Assert.assertNull(this.context.registeredReaders().get(0));
        Assert.assertEquals(registerReaders.get(1), this.context.registeredReaders().get(1));
        Assert.assertEquals(registerReaders.get(2), this.context.registeredReaders().get(2));
    }

    @Test
    public void testUnregisterUnregisteredReader() {
        this.context.unregisterSourceReader(0);
    }

    @Test
    public void testAssignSplitsFromCoordinatorExecutor() throws Exception {
        testAssignSplits(true);
    }

    @Test
    public void testAssignSplitsFromOtherThread() throws Exception {
        testAssignSplits(false);
    }

    private void testAssignSplits(boolean z) throws Exception {
        sourceReady();
        registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        if (z) {
            this.coordinatorExecutor.submit(() -> {
                this.context.assignSplits(splitsAssignment);
            }).get();
        } else {
            this.context.assignSplits(splitsAssignment);
        }
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("0"), (Collection) this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection) this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
        Assert.assertEquals("There should be two events sent to the subtasks.", 2L, this.receivingTasks.getNumberOfSentEvents());
        List<OperatorEvent> sentEventsForSubtask = this.receivingTasks.getSentEventsForSubtask(0);
        Assert.assertEquals(1L, sentEventsForSubtask.size());
        AddSplitEvent addSplitEvent = (OperatorEvent) sentEventsForSubtask.get(0);
        Assert.assertTrue(addSplitEvent instanceof AddSplitEvent);
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("0"), addSplitEvent.splits(new MockSourceSplitSerializer()));
    }

    @Test
    public void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() throws Exception {
        testAssignSplitToUnregisterdReader(true);
    }

    @Test
    public void testAssignSplitToUnregisteredReaderFromOtherThread() throws Exception {
        testAssignSplitToUnregisterdReader(false);
    }

    private void testAssignSplitToUnregisterdReader(boolean z) throws Exception {
        sourceReady();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        CoordinatorTestUtils.verifyException(() -> {
            if (z) {
                this.coordinatorExecutor.submit(() -> {
                    this.context.assignSplits(splitsAssignment);
                }).get();
            } else {
                this.context.assignSplits(splitsAssignment);
            }
        }, "assignSplits() should fail to assign the splits to a reader that is not registered.", "Cannot assign splits");
    }

    @Test
    public void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference(null);
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService2 = new ManuallyTriggeredScheduledExecutorService();
        SourceCoordinatorContext sourceCoordinatorContext = new SourceCoordinatorContext(manuallyTriggeredScheduledExecutorService2, manuallyTriggeredScheduledExecutorService, new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString(), getClass().getClassLoader()), this.operatorCoordinatorContext, new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker);
        sourceCoordinatorContext.callAsync(() -> {
            throw new InterruptedException();
        }, (obj, th) -> {
            if (th != null) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        manuallyTriggeredScheduledExecutorService.triggerAll();
        sourceCoordinatorContext.close();
        manuallyTriggeredScheduledExecutorService2.triggerAll();
        Assert.assertTrue(atomicReference.get() instanceof InterruptedException);
        Assert.assertFalse(this.operatorCoordinatorContext.isJobFailed());
    }

    private List<ReaderInfo> registerReaders() {
        List<ReaderInfo> asList = Arrays.asList(new ReaderInfo(0, "subtask_0_location"), new ReaderInfo(1, "subtask_1_location"), new ReaderInfo(2, "subtask_2_location"));
        for (ReaderInfo readerInfo : asList) {
            this.sourceCoordinator.handleEventFromOperator(readerInfo.getSubtaskId(), new ReaderRegistrationEvent(readerInfo.getSubtaskId(), readerInfo.getLocation()));
        }
        waitForCoordinatorToProcessActions();
        return asList;
    }
}
