package org.apache.flink.runtime.source.coordinator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.class */
public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
    @Test
    public void testRegisterReader() {
        List<ReaderInfo> registerReaders = registerReaders();
        Assert.assertTrue(this.context.registeredReaders().containsKey(0));
        Assert.assertTrue(this.context.registeredReaders().containsKey(1));
        Assert.assertEquals(registerReaders.get(0), this.context.registeredReaders().get(0));
        Assert.assertEquals(registerReaders.get(1), this.context.registeredReaders().get(1));
    }

    @Test
    public void testUnregisterReader() {
        List<ReaderInfo> registerReaders = registerReaders();
        Assert.assertEquals(registerReaders.get(0), this.context.registeredReaders().get(0));
        this.context.unregisterSourceReader(0);
        Assert.assertEquals("Only reader 2 should be registered.", 2L, this.context.registeredReaders().size());
        Assert.assertNull(this.context.registeredReaders().get(0));
        Assert.assertEquals(registerReaders.get(1), this.context.registeredReaders().get(1));
        Assert.assertEquals(registerReaders.get(2), this.context.registeredReaders().get(2));
    }

    @Test
    public void testUnregisterUnregisteredReader() {
        this.context.unregisterSourceReader(0);
    }

    @Test
    public void testAssignSplitsFromCoordinatorExecutor() throws Exception {
        testAssignSplits(true);
    }

    @Test
    public void testAssignSplitsFromOtherThread() throws Exception {
        testAssignSplits(false);
    }

    private void testAssignSplits(boolean z) throws Exception {
        registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        if (z) {
            this.coordinatorExecutor.submit(() -> {
                this.context.assignSplits(splitsAssignment);
            }).get();
        } else {
            this.context.assignSplits(splitsAssignment);
        }
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0"), (Collection) this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection) this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
        Assert.assertEquals("There should be two events sent to the subtasks.", 2L, this.operatorCoordinatorContext.getEventsToOperator().size());
        List<OperatorEvent> eventsToOperatorBySubtaskId = this.operatorCoordinatorContext.getEventsToOperatorBySubtaskId(0);
        Assert.assertEquals(1L, eventsToOperatorBySubtaskId.size());
        AddSplitEvent addSplitEvent = (OperatorEvent) eventsToOperatorBySubtaskId.get(0);
        Assert.assertTrue(addSplitEvent instanceof AddSplitEvent);
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0"), addSplitEvent.splits(new MockSourceSplitSerializer()));
    }

    @Test
    public void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() {
        testAssignSplitToUnregisterdReader(true);
    }

    @Test
    public void testAssignSplitToUnregisteredReaderFromOtherThread() {
        testAssignSplitToUnregisterdReader(false);
    }

    private void testAssignSplitToUnregisterdReader(boolean z) {
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        CoordinatorTestUtils.verifyException(() -> {
            if (z) {
                this.coordinatorExecutor.submit(() -> {
                    this.context.assignSplits(splitsAssignment);
                }).get();
            } else {
                this.context.assignSplits(splitsAssignment);
            }
        }, "assignSplits() should fail to assign the splits to a reader that is not registered.", "Cannot assign splits");
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        this.coordinatorExecutor.submit(() -> {
            this.context.assignSplits(splitsAssignment);
        }).get();
        byte[] takeSnapshot = takeSnapshot(this.context, 100L);
        SplitAssignmentTracker splitAssignmentTracker = new SplitAssignmentTracker();
        SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory = new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString(), getClass().getClassLoader());
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(takeSnapshot);
        Throwable th = null;
        try {
            DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
            Throwable th2 = null;
            try {
                try {
                    SourceCoordinatorContext sourceCoordinatorContext = new SourceCoordinatorContext(this.coordinatorExecutor, coordinatorExecutorThreadFactory, 1, this.operatorCoordinatorContext, new MockSourceSplitSerializer(), splitAssignmentTracker);
                    sourceCoordinatorContext.restoreState(new MockSourceSplitSerializer(), dataInputStream);
                    if (dataInputStream != null) {
                        if (0 != 0) {
                            try {
                                dataInputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataInputStream.close();
                        }
                    }
                    Assert.assertEquals(this.context.registeredReaders(), sourceCoordinatorContext.registeredReaders());
                    Assert.assertEquals(this.splitSplitAssignmentTracker.uncheckpointedAssignments(), splitAssignmentTracker.uncheckpointedAssignments());
                    Assert.assertEquals(this.splitSplitAssignmentTracker.assignmentsByCheckpointId(), splitAssignmentTracker.assignmentsByCheckpointId());
                } finally {
                }
            } catch (Throwable th4) {
                if (dataInputStream != null) {
                    if (th2 != null) {
                        try {
                            dataInputStream.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataInputStream.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayInputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayInputStream.close();
                }
            }
        }
    }

    private List<ReaderInfo> registerReaders() {
        ReaderInfo readerInfo = new ReaderInfo(0, "subtask_0_location");
        ReaderInfo readerInfo2 = new ReaderInfo(1, "subtask_1_location");
        ReaderInfo readerInfo3 = new ReaderInfo(2, "subtask_1_location");
        this.context.registerSourceReader(readerInfo);
        this.context.registerSourceReader(readerInfo2);
        this.context.registerSourceReader(readerInfo3);
        return Arrays.asList(readerInfo, readerInfo2, readerInfo3);
    }

    private byte[] takeSnapshot(SourceCoordinatorContext<MockSourceSplit> sourceCoordinatorContext, long j) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    sourceCoordinatorContext.snapshotState(j, new MockSourceSplitSerializer(), dataOutputViewStreamWrapper);
                    dataOutputViewStreamWrapper.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (dataOutputViewStreamWrapper != null) {
                        if (0 != 0) {
                            try {
                                dataOutputViewStreamWrapper.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataOutputViewStreamWrapper.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } catch (Throwable th4) {
                if (dataOutputViewStreamWrapper != null) {
                    if (th2 != null) {
                        try {
                            dataOutputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataOutputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }
}
