package org.apache.flink.runtime.source.coordinator;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.class */
public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
    @Test
    public void testThrowExceptionWhenNotStarted() {
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.checkpointComplete(100L);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.handleEventFromOperator(0, (OperatorEvent) null);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.subtaskFailed(0, (Throwable) null);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
    }

    @Test
    public void testRestCheckpointAfterCoordinatorStarted() throws Exception {
        this.sourceCoordinator.start();
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.resetToCheckpoint((byte[]) null);
        }, "Reset to checkpoint should fail after the coordinator has started", String.format("The coordinator for source %s has started. The source coordinator state can only be reset to a checkpoint before it starts.", "TestOperator"));
    }

    @Test
    public void testStart() throws Exception {
        Assert.assertFalse(this.enumerator.started());
        this.sourceCoordinator.start();
        Assert.assertTrue(this.enumerator.started());
    }

    @Test
    public void testClosed() throws Exception {
        Assert.assertFalse(this.enumerator.closed());
        this.sourceCoordinator.start();
        this.sourceCoordinator.close();
        Assert.assertTrue(this.enumerator.closed());
    }

    @Test
    public void testReaderRegistration() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, new ReaderRegistrationEvent(0, "location_0"));
        check(() -> {
            Assert.assertEquals("2 splits should have been assigned to reader 0", 4L, this.enumerator.getUnassignedSplits().size());
            Assert.assertTrue(this.context.registeredReaders().containsKey(0));
            Assert.assertTrue(this.enumerator.getHandledSourceEvent().isEmpty());
            CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), (Collection) this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
        });
    }

    @Test
    public void testHandleSourceEvent() throws Exception {
        this.sourceCoordinator.start();
        SourceEvent sourceEvent = new SourceEvent() { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.1
        };
        this.sourceCoordinator.handleEventFromOperator(0, new SourceEventWrapper(sourceEvent));
        check(() -> {
            Assert.assertEquals(1L, this.enumerator.getHandledSourceEvent().size());
            Assert.assertEquals(sourceEvent, this.enumerator.getHandledSourceEvent().get(0));
        });
    }

    @Test
    public void testCheckpointCoordinatorAndRestore() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, new ReaderRegistrationEvent(0, "location_0"));
        CompletableFuture completableFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, completableFuture);
        byte[] bArr = (byte[]) completableFuture.get();
        SourceCoordinator newSourceCoordinator = getNewSourceCoordinator();
        newSourceCoordinator.resetToCheckpoint(bArr);
        MockSplitEnumerator enumerator = newSourceCoordinator.getEnumerator();
        SourceCoordinatorContext context = newSourceCoordinator.getContext();
        Assert.assertEquals("2 splits should have been assigned to reader 0", 4L, enumerator.getUnassignedSplits().size());
        Assert.assertTrue(enumerator.getHandledSourceEvent().isEmpty());
        Assert.assertEquals(1L, context.registeredReaders().size());
        Assert.assertTrue(context.registeredReaders().containsKey(0));
    }

    @Test
    public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, new ReaderRegistrationEvent(0, "location_0"));
        CompletableFuture completableFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, completableFuture);
        completableFuture.get();
        this.enumerator.addNewSplits(Collections.singletonList(new MockSourceSplit(6)));
        CompletableFuture completableFuture2 = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(101L, completableFuture2);
        completableFuture2.get();
        check(() -> {
            Assert.assertEquals(4L, this.enumerator.getUnassignedSplits().size());
            CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), (Collection) ((Map) this.splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L)).get(0));
            Assert.assertTrue(this.splitSplitAssignmentTracker.uncheckpointedAssignments().isEmpty());
            CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), (Collection) this.splitSplitAssignmentTracker.assignmentsByCheckpointId(100L).get(0));
            CoordinatorTestUtils.verifyAssignment(Arrays.asList("6"), (Collection) this.splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
            List<OperatorEvent> list = this.operatorCoordinatorContext.getEventsToOperator().get(0);
            Assert.assertEquals(2L, list.size());
            try {
                CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), list.get(0).splits(new MockSourceSplitSerializer()));
                CoordinatorTestUtils.verifyAssignment(Arrays.asList("6"), list.get(1).splits(new MockSourceSplitSerializer()));
            } catch (IOException e) {
                Assert.fail("Failed to deserialize splits.");
            }
        });
        this.sourceCoordinator.subtaskFailed(0, (Throwable) null);
        check(() -> {
            Assert.assertFalse("Reader 0 should have been unregistered.", this.context.registeredReaders().containsKey(0));
            Iterator it = this.splitSplitAssignmentTracker.assignmentsByCheckpointId().values().iterator();
            while (it.hasNext()) {
                Assert.assertFalse("Assignment in uncompleted checkpoint should have been reverted.", ((Map) it.next()).containsKey(0));
            }
            Assert.assertFalse(this.splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
            Assert.assertEquals(7L, this.enumerator.getUnassignedSplits().size());
        });
    }

    @Test
    public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, new ReaderRegistrationEvent(0, "location_0"));
        CompletableFuture completableFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, completableFuture);
        completableFuture.get();
        this.sourceCoordinator.checkpointComplete(100L);
        this.sourceCoordinator.subtaskFailed(0, (Throwable) null);
        check(() -> {
            Assert.assertFalse(this.context.registeredReaders().containsKey(0));
            Assert.assertEquals(4L, this.enumerator.getUnassignedSplits().size());
            Assert.assertFalse(this.splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
            Assert.assertTrue(this.splitSplitAssignmentTracker.assignmentsByCheckpointId().isEmpty());
        });
    }

    private void check(Runnable runnable) {
        try {
            this.coordinatorExecutor.submit(runnable).get();
        } catch (Exception e) {
            Assert.fail("Test failed due to " + e);
        }
    }
}
