package org.apache.flink.runtime.source.coordinator;

import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.class */
class SourceCoordinatorTest extends SourceCoordinatorTestBase {

    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest$ClassLoaderTestEnumerator.class */
    private static final class ClassLoaderTestEnumerator implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<ClassLoader> threadClassLoader = new CompletableFuture<>();
        final ClassLoader constructorClassLoader = Thread.currentThread().getContextClassLoader();

        public void start() {
            this.threadClassLoader.complete(Thread.currentThread().getContextClassLoader());
        }

        public void handleSplitRequest(int i, @Nullable String str) {
            throw new UnsupportedOperationException();
        }

        public void addSplitsBack(List<MockSourceSplit> list, int i) {
            throw new UnsupportedOperationException();
        }

        public void addReader(int i) {
            throw new UnsupportedOperationException();
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Set<MockSourceSplit> m694snapshotState(long j) throws Exception {
            throw new UnsupportedOperationException();
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest$EnumeratorCreatingSource.class */
    private static final class EnumeratorCreatingSource<T, EnumT extends SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>> implements Source<T, MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<EnumT> createEnumeratorFuture = new CompletableFuture<>();
        final CompletableFuture<EnumT> restoreEnumeratorFuture = new CompletableFuture<>();
        private final Supplier<EnumT> enumeratorFactory;

        public EnumeratorCreatingSource(Supplier<EnumT> supplier) {
            this.enumeratorFactory = supplier;
        }

        public Boundedness getBoundedness() {
            return Boundedness.CONTINUOUS_UNBOUNDED;
        }

        public SourceReader<T, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
            throw new UnsupportedOperationException();
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> splitEnumeratorContext) {
            EnumT enumt = this.enumeratorFactory.get();
            this.createEnumeratorFuture.complete(enumt);
            return enumt;
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> restoreEnumerator(SplitEnumeratorContext<MockSourceSplit> splitEnumeratorContext, Set<MockSourceSplit> set) {
            EnumT enumt = this.enumeratorFactory.get();
            this.restoreEnumeratorFuture.complete(enumt);
            return enumt;
        }

        public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
            return new MockSourceSplitSerializer();
        }

        public SimpleVersionedSerializer<Set<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
            return new MockSplitEnumeratorCheckpointSerializer();
        }

        public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
            return restoreEnumerator((SplitEnumeratorContext<MockSourceSplit>) splitEnumeratorContext, (Set<MockSourceSplit>) obj);
        }
    }

    SourceCoordinatorTest() {
    }

    @Test
    void testThrowExceptionWhenNotStarted() {
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.notifyCheckpointComplete(100L);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.handleEventFromOperator(0, 0, (OperatorEvent) null);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.executionAttemptFailed(0, 0, (Throwable) null);
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        }, "Call should fail when source coordinator has not started yet.", "The coordinator has not started yet.");
    }

    @Test
    void testRestCheckpointAfterCoordinatorStarted() throws Exception {
        this.sourceCoordinator.start();
        CoordinatorTestUtils.verifyException(() -> {
            this.sourceCoordinator.resetToCheckpoint(0L, (byte[]) null);
        }, "Reset to checkpoint should fail after the coordinator has started", "The coordinator can only be reset if it was not yet started");
    }

    @Test
    void testStart() throws Exception {
        this.sourceCoordinator.start();
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(getEnumerator().isStarted()).isTrue();
    }

    @Test
    void testClosed() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.close();
        Assertions.assertThat(getEnumerator().isClosed()).isTrue();
    }

    @Test
    void testHandleSourceEvent() throws Exception {
        sourceReady();
        SourceEvent sourceEvent = new SourceEvent() { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.1
        };
        this.sourceCoordinator.handleEventFromOperator(0, 0, new SourceEventWrapper(sourceEvent));
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(getEnumerator().getHandledSourceEvent()).hasSize(1);
        Assertions.assertThat(getEnumerator().getHandledSourceEvent().get(0)).isEqualTo(sourceEvent);
    }

    @Test
    void testCheckpointCoordinatorAndRestore() throws Exception {
        sourceReady();
        addTestingSplitSet(6);
        registerReader(0);
        getEnumerator().executeAssignOneSplit(0);
        getEnumerator().executeAssignOneSplit(0);
        CompletableFuture completableFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, completableFuture);
        byte[] bArr = (byte[]) completableFuture.get();
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> newSourceCoordinator = getNewSourceCoordinator();
        newSourceCoordinator.resetToCheckpoint(100L, bArr);
        TestingSplitEnumerator testingSplitEnumerator = (TestingSplitEnumerator) newSourceCoordinator.getEnumerator();
        SourceCoordinatorContext context = newSourceCoordinator.getContext();
        Assertions.assertThat(testingSplitEnumerator.getUnassignedSplits()).as("2 splits should have been assigned to reader 0", new Object[0]).hasSize(4);
        Assertions.assertThat(testingSplitEnumerator.getContext().registeredReaders()).isEmpty();
        Assertions.assertThat(context.registeredReaders()).as("Registered readers should not be recovered by restoring", new Object[0]).isEmpty();
    }

    @Test
    void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception {
        sourceReady();
        addTestingSplitSet(6);
        registerReader(0);
        getEnumerator().executeAssignOneSplit(0);
        getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        getEnumerator().addNewSplits(new MockSourceSplit(6));
        getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(101L, new CompletableFuture());
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(getEnumerator().getUnassignedSplits()).hasSize(4);
        Assertions.assertThat(this.splitSplitAssignmentTracker.uncheckpointedAssignments()).isEmpty();
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "1"), (Collection) ((Map) this.splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L)).get(0));
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("2"), (Collection) this.splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
        this.sourceCoordinator.executionAttemptFailed(0, 0, (Throwable) null);
        this.sourceCoordinator.subtaskReset(0, 99L);
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(this.context.registeredReaders()).as("Reader 0 should have been unregistered.", new Object[0]).doesNotContainKey(0);
        Iterator it = this.splitSplitAssignmentTracker.assignmentsByCheckpointId().values().iterator();
        while (it.hasNext()) {
            Assertions.assertThat((Map) it.next()).as("Assignment in uncompleted checkpoint should have been reverted.", new Object[0]).doesNotContainKey(0);
        }
        Assertions.assertThat(this.splitSplitAssignmentTracker.uncheckpointedAssignments()).doesNotContainKey(0);
        Assertions.assertThat(getEnumerator().getUnassignedSplits()).hasSize(7);
    }

    @Test
    void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception {
        sourceReady();
        addTestingSplitSet(6);
        registerReader(0);
        getEnumerator().executeAssignOneSplit(0);
        getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        this.sourceCoordinator.notifyCheckpointComplete(100L);
        this.sourceCoordinator.executionAttemptFailed(0, 0, (Throwable) null);
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(getEnumerator().getSuccessfulCheckpoints().get(0)).isEqualTo(100L);
        Assertions.assertThat(this.context.registeredReaders()).doesNotContainKey(0);
        Assertions.assertThat(getEnumerator().getUnassignedSplits()).hasSize(4);
        Assertions.assertThat(this.splitSplitAssignmentTracker.uncheckpointedAssignments()).doesNotContainKey(0);
        Assertions.assertThat(this.splitSplitAssignmentTracker.assignmentsByCheckpointId()).isEmpty();
    }

    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x0101: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:68:0x0101 */
    /* JADX WARN: Not initialized variable reg: 14, insn: 0x0106: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:70:0x0106 */
    /* JADX WARN: Type inference failed for: r13v0, types: [org.apache.flink.api.connector.source.SplitEnumerator] */
    /* JADX WARN: Type inference failed for: r14v0, types: [java.lang.Throwable] */
    @Test
    void testFailJobWhenExceptionThrownFromStart() throws Exception {
        ?? r13;
        ?? r14;
        final RuntimeException runtimeException = new RuntimeException("Artificial Exception");
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(1);
        Throwable th = null;
        try {
            try {
                MockSplitEnumerator mockSplitEnumerator = new MockSplitEnumerator(1, mockSplitEnumeratorContext) { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.2
                    @Override // org.apache.flink.api.connector.source.mocks.MockSplitEnumerator
                    public void start() {
                        throw runtimeException;
                    }
                };
                Throwable th2 = null;
                SourceCoordinator sourceCoordinator = new SourceCoordinator("TestOperator", new EnumeratorCreatingSource(() -> {
                    return mockSplitEnumerator;
                }), this.context, new CoordinatorStoreImpl(), WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, (String) null);
                Throwable th3 = null;
                try {
                    try {
                        sourceCoordinator.start();
                        CommonTestUtils.waitUtil(() -> {
                            return Boolean.valueOf(this.operatorCoordinatorContext.isJobFailed());
                        }, Duration.ofSeconds(10L), "The job should have failed due to the artificial exception.");
                        Assertions.assertThat(this.operatorCoordinatorContext.getJobFailureReason()).isEqualTo(runtimeException);
                        if (sourceCoordinator != null) {
                            if (0 != 0) {
                                try {
                                    sourceCoordinator.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                sourceCoordinator.close();
                            }
                        }
                        if (mockSplitEnumerator != null) {
                            if (0 != 0) {
                                try {
                                    mockSplitEnumerator.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                mockSplitEnumerator.close();
                            }
                        }
                        if (mockSplitEnumeratorContext != null) {
                            if (0 == 0) {
                                mockSplitEnumeratorContext.close();
                                return;
                            }
                            try {
                                mockSplitEnumeratorContext.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th3 = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (sourceCoordinator != null) {
                        if (th3 != null) {
                            try {
                                sourceCoordinator.close();
                            } catch (Throwable th9) {
                                th3.addSuppressed(th9);
                            }
                        } else {
                            sourceCoordinator.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (r13 != 0) {
                    if (r14 != 0) {
                        try {
                            r13.close();
                        } catch (Throwable th11) {
                            r14.addSuppressed(th11);
                        }
                    } else {
                        r13.close();
                    }
                }
                throw th10;
            }
        } catch (Throwable th12) {
            if (mockSplitEnumeratorContext != null) {
                if (0 != 0) {
                    try {
                        mockSplitEnumeratorContext.close();
                    } catch (Throwable th13) {
                        th.addSuppressed(th13);
                    }
                } else {
                    mockSplitEnumeratorContext.close();
                }
            }
            throw th12;
        }
    }

    @Test
    void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws Exception {
        RuntimeException runtimeException = new RuntimeException("Artificial Exception");
        new SourceCoordinator("TestOperator", new EnumeratorCreatingSource(() -> {
            throw runtimeException;
        }), this.context, new CoordinatorStoreImpl(), WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, (String) null).start();
        Assertions.assertThat(this.operatorCoordinatorContext.isJobFailed()).isTrue();
        Assertions.assertThat(this.operatorCoordinatorContext.getJobFailureReason()).isEqualTo(runtimeException);
    }

    /* JADX WARN: Failed to calculate best type for var: r13v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x0117: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:65:0x0117 */
    /* JADX WARN: Not initialized variable reg: 14, insn: 0x011c: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:67:0x011c */
    /* JADX WARN: Type inference failed for: r13v1, types: [org.apache.flink.api.connector.source.SplitEnumerator] */
    /* JADX WARN: Type inference failed for: r14v0, types: [java.lang.Throwable] */
    @Test
    void testErrorThrownFromSplitEnumerator() throws Exception {
        ?? r13;
        ?? r14;
        final Error error = new Error("Test Error");
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(1);
        Throwable th = null;
        try {
            try {
                MockSplitEnumerator mockSplitEnumerator = new MockSplitEnumerator(1, mockSplitEnumeratorContext) { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.3
                    @Override // org.apache.flink.api.connector.source.mocks.MockSplitEnumerator
                    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
                        throw error;
                    }
                };
                Throwable th2 = null;
                SourceCoordinator sourceCoordinator = new SourceCoordinator("TestOperator", new EnumeratorCreatingSource(() -> {
                    return mockSplitEnumerator;
                }), this.context, new CoordinatorStoreImpl(), WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, (String) null);
                Throwable th3 = null;
                try {
                    try {
                        sourceCoordinator.start();
                        sourceCoordinator.handleEventFromOperator(1, 0, new SourceEventWrapper(new SourceEvent() { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.4
                        }));
                        CommonTestUtils.waitUtil(() -> {
                            return Boolean.valueOf(this.operatorCoordinatorContext.isJobFailed());
                        }, Duration.ofSeconds(10L), "The job should have failed due to the artificial exception.");
                        Assertions.assertThat(this.operatorCoordinatorContext.getJobFailureReason()).isEqualTo(error);
                        if (sourceCoordinator != null) {
                            if (0 != 0) {
                                try {
                                    sourceCoordinator.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                sourceCoordinator.close();
                            }
                        }
                        if (mockSplitEnumerator != null) {
                            if (0 != 0) {
                                try {
                                    mockSplitEnumerator.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                mockSplitEnumerator.close();
                            }
                        }
                        if (mockSplitEnumeratorContext != null) {
                            if (0 == 0) {
                                mockSplitEnumeratorContext.close();
                                return;
                            }
                            try {
                                mockSplitEnumeratorContext.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th3 = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (sourceCoordinator != null) {
                        if (th3 != null) {
                            try {
                                sourceCoordinator.close();
                            } catch (Throwable th9) {
                                th3.addSuppressed(th9);
                            }
                        } else {
                            sourceCoordinator.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (r13 != 0) {
                    if (r14 != 0) {
                        try {
                            r13.close();
                        } catch (Throwable th11) {
                            r14.addSuppressed(th11);
                        }
                    } else {
                        r13.close();
                    }
                }
                throw th10;
            }
        } catch (Throwable th12) {
            if (mockSplitEnumeratorContext != null) {
                if (0 != 0) {
                    try {
                        mockSplitEnumeratorContext.close();
                    } catch (Throwable th13) {
                        th.addSuppressed(th13);
                    }
                } else {
                    mockSplitEnumeratorContext.close();
                }
            }
            throw th12;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x012e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:68:0x012e */
    /* JADX WARN: Not initialized variable reg: 14, insn: 0x0133: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:70:0x0133 */
    /* JADX WARN: Type inference failed for: r13v0, types: [org.apache.flink.api.connector.source.mocks.MockSplitEnumerator] */
    /* JADX WARN: Type inference failed for: r14v0, types: [java.lang.Throwable] */
    @Test
    void testBlockOnClose() throws Exception {
        ?? r13;
        ?? r14;
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(1);
        Throwable th = null;
        try {
            try {
                MockSplitEnumerator mockSplitEnumerator = new MockSplitEnumerator(1, mockSplitEnumeratorContext) { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.5
                    @Override // org.apache.flink.api.connector.source.mocks.MockSplitEnumerator
                    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
                        SourceCoordinatorContext<MockSourceSplit> sourceCoordinatorContext = SourceCoordinatorTest.this.context;
                        Callable callable = () -> {
                            return 1L;
                        };
                        CountDownLatch countDownLatch2 = countDownLatch;
                        sourceCoordinatorContext.callAsync(callable, (l, th2) -> {
                            countDownLatch2.countDown();
                            try {
                                Thread.sleep(Long.MAX_VALUE);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        });
                    }
                };
                Throwable th2 = null;
                SourceCoordinator sourceCoordinator = new SourceCoordinator("TestOperator", new EnumeratorCreatingSource(() -> {
                    return mockSplitEnumerator;
                }), this.context, new CoordinatorStoreImpl());
                Throwable th3 = null;
                try {
                    try {
                        sourceCoordinator.start();
                        sourceCoordinator.handleEventFromOperator(1, 0, new SourceEventWrapper(new SourceEvent() { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorTest.6
                        }));
                        countDownLatch.await();
                        sourceCoordinator.getClass();
                        ComponentClosingUtils.closeAsyncWithTimeout("testBlockOnClose", sourceCoordinator::close, Duration.ofMillis(1L)).exceptionally(th4 -> {
                            Assertions.assertThat(th4).isInstanceOf(TimeoutException.class);
                            return null;
                        }).get();
                        mockSplitEnumerator.getClass();
                        CommonTestUtils.waitUtil(mockSplitEnumerator::closed, Duration.ofSeconds(5L), "Split enumerator was not closed in 5 seconds.");
                        if (sourceCoordinator != null) {
                            if (0 != 0) {
                                try {
                                    sourceCoordinator.close();
                                } catch (Throwable th5) {
                                    th3.addSuppressed(th5);
                                }
                            } else {
                                sourceCoordinator.close();
                            }
                        }
                        if (mockSplitEnumerator != null) {
                            if (0 != 0) {
                                try {
                                    mockSplitEnumerator.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                mockSplitEnumerator.close();
                            }
                        }
                        if (mockSplitEnumeratorContext != null) {
                            if (0 == 0) {
                                mockSplitEnumeratorContext.close();
                                return;
                            }
                            try {
                                mockSplitEnumeratorContext.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        }
                    } catch (Throwable th8) {
                        th3 = th8;
                        throw th8;
                    }
                } catch (Throwable th9) {
                    if (sourceCoordinator != null) {
                        if (th3 != null) {
                            try {
                                sourceCoordinator.close();
                            } catch (Throwable th10) {
                                th3.addSuppressed(th10);
                            }
                        } else {
                            sourceCoordinator.close();
                        }
                    }
                    throw th9;
                }
            } catch (Throwable th11) {
                if (r13 != 0) {
                    if (r14 != 0) {
                        try {
                            r13.close();
                        } catch (Throwable th12) {
                            r14.addSuppressed(th12);
                        }
                    } else {
                        r13.close();
                    }
                }
                throw th11;
            }
        } catch (Throwable th13) {
            if (mockSplitEnumeratorContext != null) {
                if (0 != 0) {
                    try {
                        mockSplitEnumeratorContext.close();
                    } catch (Throwable th14) {
                        th.addSuppressed(th14);
                    }
                } else {
                    mockSplitEnumeratorContext.close();
                }
            }
            throw th13;
        }
    }

    @Test
    void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception {
        URLClassLoader uRLClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), uRLClassLoader);
        EnumeratorCreatingSource enumeratorCreatingSource = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        OperatorCoordinator coordinator = new SourceCoordinatorProvider("testOperator", mockOperatorCoordinatorContext.getOperatorId(), enumeratorCreatingSource, 1, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, (String) null).getCoordinator(mockOperatorCoordinatorContext);
        coordinator.start();
        ClassLoaderTestEnumerator classLoaderTestEnumerator = (ClassLoaderTestEnumerator) enumeratorCreatingSource.createEnumeratorFuture.get();
        Assertions.assertThat(classLoaderTestEnumerator.constructorClassLoader).isSameAs(uRLClassLoader);
        Assertions.assertThat(classLoaderTestEnumerator.threadClassLoader.get()).isSameAs(uRLClassLoader);
        coordinator.close();
    }

    @Test
    void testUserClassLoaderWhenRestoringEnumerator() throws Exception {
        URLClassLoader uRLClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), uRLClassLoader);
        EnumeratorCreatingSource enumeratorCreatingSource = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        OperatorCoordinator coordinator = new SourceCoordinatorProvider("testOperator", mockOperatorCoordinatorContext.getOperatorId(), enumeratorCreatingSource, 1, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, (String) null).getCoordinator(mockOperatorCoordinatorContext);
        coordinator.resetToCheckpoint(1L, createEmptyCheckpoint());
        coordinator.start();
        ClassLoaderTestEnumerator classLoaderTestEnumerator = (ClassLoaderTestEnumerator) enumeratorCreatingSource.restoreEnumeratorFuture.get();
        Assertions.assertThat(classLoaderTestEnumerator.constructorClassLoader).isSameAs(uRLClassLoader);
        Assertions.assertThat(classLoaderTestEnumerator.threadClassLoader.get()).isSameAs(uRLClassLoader);
        coordinator.close();
    }

    @Test
    void testSerdeBackwardCompatibility() throws Exception {
        sourceReady();
        addTestingSplitSet(6);
        TestingSplitEnumerator<MockSourceSplit> enumerator = getEnumerator();
        HashSet hashSet = new HashSet();
        enumerator.runInEnumThreadAndSync(() -> {
            hashSet.addAll(enumerator.m696snapshotState(1L));
        });
        byte[] createCheckpointDataWithSerdeV0 = createCheckpointDataWithSerdeV0(hashSet);
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> newSourceCoordinator = getNewSourceCoordinator();
        newSourceCoordinator.resetToCheckpoint(15213L, createCheckpointDataWithSerdeV0);
        TestingSplitEnumerator testingSplitEnumerator = (TestingSplitEnumerator) newSourceCoordinator.getEnumerator();
        SourceCoordinatorContext context = newSourceCoordinator.getContext();
        Assertions.assertThat(testingSplitEnumerator.getUnassignedSplits()).isEqualTo(hashSet);
        Assertions.assertThat(testingSplitEnumerator.getHandledSourceEvent()).isEmpty();
        Assertions.assertThat(context.registeredReaders()).isEmpty();
    }

    @Test
    public void testSubtaskRestartAndRequestSplitsAgain() throws Exception {
        this.sourceCoordinator.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(new MockSourceSplit(i));
        }
        getEnumerator().addNewSplits(arrayList);
        setReaderTaskReady(this.sourceCoordinator, 0, 0);
        registerReader(0, 0);
        this.sourceCoordinator.handleEventFromOperator(0, 0, new RequestSplitEvent());
        waitForSentEvents(1);
        this.sourceCoordinator.handleEventFromOperator(0, 0, new RequestSplitEvent());
        waitForSentEvents(2);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        this.sourceCoordinator.handleEventFromOperator(0, 0, new RequestSplitEvent());
        waitForSentEvents(3);
        Assertions.assertThat(getEnumerator().getUnassignedSplits()).isEmpty();
        this.sourceCoordinator.executionAttemptFailed(0, 0, (Throwable) null);
        this.sourceCoordinator.subtaskReset(0, 99L);
        waitUtilNumberReached(() -> {
            return Integer.valueOf(getEnumerator().getUnassignedSplits().size());
        }, 2);
        int i2 = 0 + 1;
        setReaderTaskReady(this.sourceCoordinator, 0, i2);
        registerReader(0, i2);
        this.sourceCoordinator.handleEventFromOperator(0, i2, new RequestSplitEvent());
        waitForSentEvents(4);
        this.sourceCoordinator.handleEventFromOperator(0, i2, new RequestSplitEvent());
        waitForSentEvents(5);
        this.sourceCoordinator.handleEventFromOperator(0, i2, new RequestSplitEvent());
        waitForSentEvents(6);
        Assertions.assertThat(getEnumerator().getUnassignedSplits()).isEmpty();
        List<OperatorEvent> sentEventsForSubtask = this.receivingTasks.getSentEventsForSubtask(0);
        assertAddSplitEvent(sentEventsForSubtask.get(0), Collections.singletonList(arrayList.get(0)));
        assertAddSplitEvent(sentEventsForSubtask.get(1), Collections.singletonList(arrayList.get(1)));
        assertAddSplitEvent(sentEventsForSubtask.get(3), Collections.singletonList(arrayList.get(0)));
        assertAddSplitEvent(sentEventsForSubtask.get(4), Collections.singletonList(arrayList.get(1)));
        Assertions.assertThat(sentEventsForSubtask.get(2)).isInstanceOf(NoMoreSplitsEvent.class);
        Assertions.assertThat(sentEventsForSubtask.get(5)).isInstanceOf(NoMoreSplitsEvent.class);
    }

    @Test
    public void testListeningEventsFromOtherCoordinators() throws Exception {
        CoordinatorStoreImpl coordinatorStoreImpl = new CoordinatorStoreImpl();
        SourceCoordinator sourceCoordinator = new SourceCoordinator("TestOperator", createMockSource(), this.context, coordinatorStoreImpl, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, "testListeningID");
        sourceCoordinator.start();
        Assertions.assertThat(coordinatorStoreImpl.get("testListeningID")).isNotNull().isSameAs(sourceCoordinator);
    }

    private byte[] createCheckpointDataWithSerdeV0(Set<MockSourceSplit> set) throws Exception {
        MockSplitEnumeratorCheckpointSerializer mockSplitEnumeratorCheckpointSerializer = new MockSplitEnumeratorCheckpointSerializer();
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(32);
        dataOutputSerializer.writeInt(0);
        dataOutputSerializer.writeInt(mockSplitEnumeratorCheckpointSerializer.getVersion());
        byte[] serialize = mockSplitEnumeratorCheckpointSerializer.serialize(set);
        dataOutputSerializer.writeInt(serialize.length);
        dataOutputSerializer.write(serialize);
        dataOutputSerializer.writeInt(0);
        dataOutputSerializer.writeInt(0);
        dataOutputSerializer.writeInt(0);
        return dataOutputSerializer.getCopyOfBuffer();
    }

    private static byte[] createEmptyCheckpoint() throws Exception {
        return SourceCoordinator.writeCheckpointBytes(Collections.emptySet(), new MockSplitEnumeratorCheckpointSerializer());
    }
}
