package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.CheckedSupplier;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.class */
public class SourceStreamTaskTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$CancelLockingSource.class */
    public static class CancelLockingSource implements SourceFunction<String> {
        private static final long serialVersionUID = 8713065281092996042L;
        private static CompletableFuture<Void> isRunning = new CompletableFuture<>();
        private final boolean throwOnCancel;
        private volatile boolean cancelled = false;

        public CancelLockingSource(boolean z) {
            this.throwOnCancel = z;
        }

        public static void reset() {
            isRunning = new CompletableFuture<>();
        }

        public static void awaitRunning() throws ExecutionException, InterruptedException {
            isRunning.get();
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            synchronized (sourceContext.getCheckpointLock()) {
                while (!this.cancelled) {
                    isRunning.complete(null);
                    if (this.throwOnCancel) {
                        Thread.sleep(1000000000L);
                    } else {
                        try {
                            Thread.sleep(1000000000L);
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }
        }

        public void cancel() {
            if (this.throwOnCancel) {
                throw new ExpectedTestException();
            }
            this.cancelled = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$CancelTestSource.class */
    private static class CancelTestSource extends FromElementsFunction<String> {
        private static final long serialVersionUID = 8713065281092996067L;
        private static MultiShotLatch dataProcessing = new MultiShotLatch();
        private static MultiShotLatch cancellationWaiting = new MultiShotLatch();

        public CancelTestSource(TypeSerializer<String> typeSerializer, String... strArr) throws IOException {
            super(typeSerializer, strArr);
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            super.run(sourceContext);
            dataProcessing.trigger();
            cancellationWaiting.await();
        }

        public void cancel() {
            super.cancel();
            cancellationWaiting.trigger();
        }

        public static MultiShotLatch getDataProcessing() {
            return dataProcessing;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$Checkpointer.class */
    private static class Checkpointer implements Callable<Boolean> {
        private final int numCheckpoints;
        private final int checkpointInterval;
        private final AtomicLong checkpointId = new AtomicLong(0);
        private final StreamTask<Tuple2<Long, Integer>, ?> sourceTask;

        Checkpointer(int i, int i2, StreamTask<Tuple2<Long, Integer>, ?> streamTask) {
            this.numCheckpoints = i;
            this.sourceTask = streamTask;
            this.checkpointInterval = i2;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            for (int i = 0; i < this.numCheckpoints; i++) {
                try {
                    this.sourceTask.triggerCheckpointAsync(new CheckpointMetaData(this.checkpointId.getAndIncrement(), 0L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
                    Thread.sleep(this.checkpointInterval);
                } catch (RejectedExecutionException e) {
                    return false;
                }
            }
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$ExceptionThrowingSource.class */
    private static class ExceptionThrowingSource implements SourceFunction<String> {
        private static volatile CompletableFuture<Void> isInRunLoop;
        private volatile boolean running;

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$ExceptionThrowingSource$TestException.class */
        public static class TestException extends RuntimeException {
            public TestException(String str) {
                super(str);
            }
        }

        private ExceptionThrowingSource() {
            this.running = true;
        }

        public static void setIsInRunLoopFuture(@Nonnull CompletableFuture<Void> completableFuture) {
            isInRunLoop = completableFuture;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws TestException {
            Preconditions.checkState((isInRunLoop == null || isInRunLoop.isDone()) ? false : true);
            while (this.running) {
                if (!isInRunLoop.isDone()) {
                    isInRunLoop.complete(null);
                }
                sourceContext.collect("hello");
            }
            throw new TestException("Oh no, we're failing.");
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$InterruptedSource.class */
    public static class InterruptedSource implements SourceFunction<String> {
        private static final long serialVersionUID = 8713065281092996042L;
        private ExceptionGenerator exceptionGenerator;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$InterruptedSource$ExceptionGenerator.class */
        public interface ExceptionGenerator extends CheckedSupplier<Exception>, Serializable {
        }

        public InterruptedSource(ExceptionGenerator exceptionGenerator) {
            this.exceptionGenerator = exceptionGenerator;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            synchronized (sourceContext.getCheckpointLock()) {
                Thread.currentThread().interrupt();
                throw ((Exception) this.exceptionGenerator.get());
            }
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$MockSource.class */
    private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
        private static final long serialVersionUID = 1;
        private int maxElements;
        private int checkpointDelay;
        private int readDelay;
        private volatile long lastCheckpointId = -1;
        private volatile boolean isRunning = true;
        private volatile int count = 0;
        private Semaphore semaphore = new Semaphore(1);

        public MockSource(int i, int i2, int i3) {
            this.maxElements = i;
            this.checkpointDelay = i2;
            this.readDelay = i3;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Integer>> sourceContext) {
            while (this.isRunning && this.count < this.maxElements) {
                try {
                    Thread.sleep(this.readDelay);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(new Tuple2(Long.valueOf(this.lastCheckpointId), Integer.valueOf(this.count)));
                    this.count++;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public List<Serializable> snapshotState(long j, long j2) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail("Concurrent invocation of snapshotState.");
            }
            int i = this.count;
            this.lastCheckpointId = j;
            long j3 = 0;
            for (int i2 = 0; i2 < this.checkpointDelay; i2++) {
                j3 += new Random().nextLong();
            }
            if (i != this.count) {
                this.semaphore.release();
                Assert.fail("Count is different at start end end of snapshot.");
            }
            this.semaphore.release();
            return Collections.singletonList(Long.valueOf(j3));
        }

        public void restoreState(List<Serializable> list) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$NonStoppingSource.class */
    private static class NonStoppingSource implements SourceFunction<String> {
        private static final long serialVersionUID = 1;
        private static boolean running = true;
        private static CompletableFuture<Void> startFuture = new CompletableFuture<>();

        private NonStoppingSource() {
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            startFuture.complete(null);
            while (running) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            }
        }

        public void cancel() {
        }

        static void forceCancel() {
            running = false;
        }

        static void waitForStart() {
            startFuture.join();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$OpenCloseTestSource.class */
    private static class OpenCloseTestSource extends RichSourceFunction<String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        OpenCloseTestSource() {
            openCalled = false;
            closeCalled = false;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            for (int i = 0; i < 10; i++) {
                sourceContext.collect("Hello" + i);
            }
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$OutputRecordInCloseTestSource.class */
    private static final class OutputRecordInCloseTestSource<SRC extends SourceFunction<String>> extends StreamSource<String, SRC> implements BoundedOneInput {
        private final String name;

        public OutputRecordInCloseTestSource(String str, SRC src) {
            super(src);
            this.name = str;
        }

        public void endInput() {
            output("[" + this.name + "]: End of input");
        }

        public void close() throws Exception {
            ProcessingTimeService processingTimeService = getProcessingTimeService();
            processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime(), j -> {
                output("[" + this.name + "]: Timer registered in close");
            });
            output("[" + this.name + "]: Bye");
            super.close();
        }

        private void output(String str) {
            this.output.collect(new StreamRecord(str));
        }
    }

    @Test
    public void testOpenClose() throws Exception {
        StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        streamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = streamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamSource(new OpenCloseTestSource()));
        streamConfig.setOperatorID(new OperatorID());
        streamTaskTestHarness.invoke();
        streamTaskTestHarness.waitForTaskCompletion();
        Assert.assertTrue("RichFunction methods where not called.", OpenCloseTestSource.closeCalled);
        Assert.assertEquals(10L, TestHarnessUtil.getRawElementsFromOutput(streamTaskTestHarness.getOutput()).size());
    }

    @Test(timeout = 60000)
    public void testStartDelayMetric() throws Exception {
        StreamTaskMailboxTestHarnessBuilder streamTaskMailboxTestHarnessBuilder = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        StreamTaskMailboxTestHarness build = streamTaskMailboxTestHarnessBuilder.setupOutputForSingletonOperatorChain((StreamOperator<?>) new StreamSource(new CancelTestSource(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), "Hello"))).setTaskMetricGroup(new StreamTaskTestHarness.TestTaskMetricGroup(concurrentHashMap)).build();
        Future triggerCheckpointAsync = build.streamTask.triggerCheckpointAsync(new CheckpointMetaData(1L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        Assert.assertFalse(triggerCheckpointAsync.isDone());
        Thread.sleep(42L);
        while (!triggerCheckpointAsync.isDone()) {
            build.streamTask.runMailboxStep();
        }
        Assert.assertThat(((Gauge) concurrentHashMap.get("checkpointStartDelayNanos")).getValue(), Matchers.greaterThanOrEqualTo(Long.valueOf(42 * 1000000)));
    }

    @Test
    public void testCheckpointing() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        try {
            StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}));
            streamTaskTestHarness.setupOutputForSingletonOperatorChain();
            StreamConfig streamConfig = streamTaskTestHarness.getStreamConfig();
            streamConfig.setStreamOperator(new StreamSource(new MockSource(100, 1000, 1)));
            streamConfig.setOperatorID(new OperatorID());
            Future[] futureArr = new Future[1];
            streamTaskTestHarness.invoke();
            streamTaskTestHarness.waitForTaskRunning();
            StreamTask mo134getTask = streamTaskTestHarness.mo134getTask();
            for (int i = 0; i < 1; i++) {
                futureArr[i] = newFixedThreadPool.submit(new Checkpointer(100, 5, mo134getTask));
            }
            streamTaskTestHarness.waitForTaskCompletion();
            for (int i2 = 0; i2 < 1; i2++) {
                if (!futureArr[i2].isDone()) {
                    futureArr[i2].cancel(true);
                }
                if (!futureArr[i2].isCancelled()) {
                    futureArr[i2].get();
                }
            }
            Assert.assertEquals(100L, TestHarnessUtil.getRawElementsFromOutput(streamTaskTestHarness.getOutput()).size());
            newFixedThreadPool.shutdown();
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            throw th;
        }
    }

    @Test
    public void testClosingAllOperatorsOnChainProperly() throws Exception {
        StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        streamTaskTestHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>) new OutputRecordInCloseTestSource("Source0", new FromElementsFunction(StringSerializer.INSTANCE, new String[]{"Hello"}))).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        streamTaskTestHarness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamTaskTestHarness.invoke();
        streamTaskTestHarness.waitForTaskCompletion();
        ArrayList arrayList = new ArrayList();
        Collections.addAll(arrayList, new StreamRecord("Hello"), new StreamRecord("[Source0]: End of input"), new StreamRecord("[Source0]: Bye"), new StreamRecord("[Operator1]: End of input"), new StreamRecord("[Operator1]: Bye"));
        Assert.assertArrayEquals("Output was not correct.", arrayList.toArray(), streamTaskTestHarness.getOutput().toArray());
    }

    @Test
    public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
        boolean isPresent;
        StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        streamTaskTestHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>) new StreamSource(new CancelTestSource(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), "Hello"))).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        streamTaskTestHarness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        streamTaskTestHarness.invoke();
        CancelTestSource.getDataProcessing().await();
        streamTaskTestHarness.mo134getTask().cancel();
        try {
            streamTaskTestHarness.waitForTaskCompletion();
        } finally {
            if (!isPresent) {
            }
            concurrentLinkedQueue.add(new StreamRecord("Hello"));
            TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, streamTaskTestHarness.getOutput());
        }
        concurrentLinkedQueue.add(new StreamRecord("Hello"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, streamTaskTestHarness.getOutput());
    }

    @Test
    public void testCancellationWithSourceBlockedOnLock() throws Exception {
        testCancellationWithSourceBlockedOnLock(false, false);
    }

    @Test
    public void testCancellationWithSourceBlockedOnLockWithPendingMail() throws Exception {
        testCancellationWithSourceBlockedOnLock(true, false);
    }

    @Test
    public void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception {
        testCancellationWithSourceBlockedOnLock(false, true);
    }

    @Test
    public void testCancellationWithSourceBlockedOnLockWithPendingMailAndThrowingOnError() throws Exception {
        testCancellationWithSourceBlockedOnLock(true, true);
    }

    public void testCancellationWithSourceBlockedOnLock(boolean z, boolean z2) throws Exception {
        boolean isPresent;
        boolean isPresent2;
        StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        CancelLockingSource.reset();
        streamTaskTestHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>) new StreamSource(new CancelLockingSource(z2))).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        streamTaskTestHarness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamTaskTestHarness.invoke();
        CancelLockingSource.awaitRunning();
        if (z) {
            streamTaskTestHarness.mo134getTask().getMailboxExecutorFactory().createExecutor(0).execute(() -> {
                Assert.assertFalse("This should never execute before task cancelation", streamTaskTestHarness.mo134getTask().isRunning());
            }, "Test");
        }
        try {
            streamTaskTestHarness.mo134getTask().cancel();
        } catch (ExpectedTestException e) {
            Preconditions.checkState(z2);
        }
        try {
            streamTaskTestHarness.waitForTaskCompletion();
        } finally {
            if (!isPresent) {
                if (!isPresent2) {
                }
            }
        }
    }

    @Test
    public void testInterruptionExceptionNotSwallowed() throws Exception {
        testInterruptionExceptionNotSwallowed(InterruptedException::new);
    }

    @Test
    public void testWrappedInterruptionExceptionNotSwallowed() throws Exception {
        testInterruptionExceptionNotSwallowed(() -> {
            return new RuntimeException((Throwable) new FlinkRuntimeException(new InterruptedException()));
        });
    }

    private void testInterruptionExceptionNotSwallowed(InterruptedSource.ExceptionGenerator exceptionGenerator) throws Exception {
        StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        CancelLockingSource.reset();
        streamTaskTestHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>) new StreamSource(new InterruptedSource(exceptionGenerator))).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        streamTaskTestHarness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamTaskTestHarness.invoke();
        try {
            streamTaskTestHarness.waitForTaskCompletion();
        } catch (Exception e) {
            if (!ExceptionUtils.findThrowable(e, InterruptedException.class).isPresent()) {
                throw e;
            }
        }
    }

    @Test
    public void finishingIgnoresExceptions() throws Exception {
        StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        CompletableFuture completableFuture = new CompletableFuture();
        ExceptionThrowingSource.setIsInRunLoopFuture(completableFuture);
        streamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = streamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamSource(new ExceptionThrowingSource()));
        streamConfig.setOperatorID(new OperatorID());
        streamTaskTestHarness.invoke();
        completableFuture.get();
        streamTaskTestHarness.mo134getTask().finishTask();
        streamTaskTestHarness.waitForTaskCompletion();
    }

    @Test
    public void testWaitsForSourceThreadOnCancel() throws Exception {
        StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        streamTaskTestHarness.setupOutputForSingletonOperatorChain();
        streamTaskTestHarness.getStreamConfig().setStreamOperator(new StreamSource(new NonStoppingSource()));
        streamTaskTestHarness.invoke();
        NonStoppingSource.waitForStart();
        streamTaskTestHarness.mo134getTask().cancel();
        streamTaskTestHarness.waitForTaskCompletion(500L, true);
        Assert.assertTrue(streamTaskTestHarness.taskThread.isAlive());
        NonStoppingSource.forceCancel();
        streamTaskTestHarness.waitForTaskCompletion(Long.MAX_VALUE, true);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1738927121:
                if (implMethodName.equals("lambda$testWrappedInterruptionExceptionNotSwallowed$eecdceac$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$InterruptedSource$ExceptionGenerator") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Exception;")) {
                    return () -> {
                        return new RuntimeException((Throwable) new FlinkRuntimeException(new InterruptedException()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest$InterruptedSource$ExceptionGenerator") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/InterruptedException") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return InterruptedException::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
