/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class StreamTaskCancellationTest
extends TestLogger {
    @Test
    public void testDoNotInterruptWhileClosing() throws Exception {
        TestInterruptInCloseOperator testOperator = new TestInterruptInCloseOperator();
        StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperator<?>)testOperator).build();
        Throwable throwable = null;
        if (harness != null) {
            if (throwable != null) {
                try {
                    harness.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
            } else {
                harness.close();
            }
        }
    }

    @Test
    public void testCanceleablesCanceledOnCancelTaskError() throws Exception {
        CancelFailingTask.syncLatch = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(CancelFailingTask.class, (ShuffleEnvironment)shuffleEnvironment, cfg, new Configuration());
            task.startTaskThread();
            CancelFailingTask.syncLatch.await();
            task.cancelExecution();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        }
    }

    @Test
    public void testCancelTaskExceptionHandling() throws Exception {
        StreamConfig cfg = new StreamConfig(new Configuration());
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(CancelThrowingTask.class, (ShuffleEnvironment)shuffleEnvironment, cfg, new Configuration());
            task.startTaskThread();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        }
    }

    public static class CancelThrowingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        public CancelThrowingTask(Environment env) throws Exception {
            super(env);
        }

        protected void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) {
            throw new CancelTaskException();
        }
    }

    public static class CancelFailingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private static OneShotLatch syncLatch;

        public CancelFailingTask(Environment env) throws Exception {
            super(env);
        }

        protected void init() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            OneShotLatch latch = new OneShotLatch();
            Object lock = new Object();
            holder.start();
            try (LockHolder holder = new LockHolder(lock, latch);){
                this.getCancelables().registerCloseable((AutoCloseable)holder);
                latch.await();
                syncLatch.trigger();
                Object object = lock;
                synchronized (object) {
                }
            }
            controller.suspendDefaultAction();
            this.mailboxProcessor.suspend();
        }

        protected void cleanUpInternal() {
        }

        protected void cancelTask() throws Exception {
            throw new Exception("test exception");
        }

        private static final class LockHolder
        extends Thread
        implements Closeable {
            private final OneShotLatch trigger;
            private final Object lock;
            private volatile boolean canceled;

            private LockHolder(Object lock, OneShotLatch trigger) {
                this.lock = lock;
                this.trigger = trigger;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = this.lock;
                synchronized (object) {
                    while (!this.canceled) {
                        this.trigger.trigger();
                        try {
                            Thread.sleep(1000000000L);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                }
            }

            public void cancel() {
                this.canceled = true;
            }

            @Override
            public void close() {
                this.canceled = true;
                this.interrupt();
            }
        }
    }

    private static class TestInterruptInCloseOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private TestInterruptInCloseOperator() {
        }

        public void close() throws Exception {
            super.close();
            AtomicBoolean running = new AtomicBoolean(true);
            Thread thread = new Thread(() -> {
                while (running.get()) {
                }
            });
            thread.start();
            try {
                this.getContainingTask().maybeInterruptOnCancel(thread, null, null);
                Assert.assertFalse((boolean)thread.isInterrupted());
            }
            finally {
                running.set(false);
            }
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }
    }
}

