/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestBoundedOneInputStreamOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

public class SourceStreamTaskTest {
    @Test
    public void testOpenClose() throws Exception {
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamSource sourceOperator = new StreamSource((SourceFunction)new OpenCloseTestSource());
        streamConfig.setStreamOperator((StreamOperator)sourceOperator);
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        Assert.assertTrue((String)"RichFunction methods where not called.", (boolean)OpenCloseTestSource.closeCalled);
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)10L, (long)resultElements.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointing() throws Exception {
        int numElements = 100;
        int numCheckpoints = 100;
        boolean numCheckpointers = true;
        int checkpointInterval = 5;
        int sourceCheckpointDelay = 1000;
        boolean sourceReadDelay = true;
        ExecutorService executor = Executors.newFixedThreadPool(10);
        try {
            int i;
            TupleTypeInfo typeInfo = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
            StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, typeInfo);
            testHarness.setupOutputForSingletonOperatorChain();
            StreamConfig streamConfig = testHarness.getStreamConfig();
            StreamSource sourceOperator = new StreamSource((SourceFunction)new MockSource(100, 1000, 1));
            streamConfig.setStreamOperator((StreamOperator)sourceOperator);
            streamConfig.setOperatorID(new OperatorID());
            Future[] checkpointerResults = new Future[1];
            testHarness.invoke();
            testHarness.waitForTaskRunning();
            StreamTask sourceTask = testHarness.getTask();
            for (i = 0; i < 1; ++i) {
                checkpointerResults[i] = executor.submit(new Checkpointer(100, 5, sourceTask));
            }
            testHarness.waitForTaskCompletion();
            for (i = 0; i < 1; ++i) {
                if (!checkpointerResults[i].isDone()) {
                    checkpointerResults[i].cancel(true);
                }
                if (checkpointerResults[i].isCancelled()) continue;
                checkpointerResults[i].get();
            }
            List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
            Assert.assertEquals((long)100L, (long)resultElements.size());
        }
        finally {
            executor.shutdown();
        }
    }

    @Test
    public void testMarkingEndOfInput() throws Exception {
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new StreamSource((SourceFunction)new FromElementsFunction(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (Object[])new String[]{"Hello"}))).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        expectedOutput.add(new StreamRecord((Object)"Hello"));
        expectedOutput.add(new StreamRecord((Object)"[Operator1]: Bye"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new StreamSource((SourceFunction)new CancelTestSource((TypeSerializer<String>)BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), "Hello"))).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.invoke();
        CancelTestSource.getDataProcessing().get();
        testHarness.getTask().cancel();
        try {
            testHarness.waitForTaskCompletion();
        }
        catch (Throwable t) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)t, CancelTaskException.class).isPresent());
        }
        expectedOutput.add(new StreamRecord((Object)"Hello"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testCancellationWithSourceBlockedOnLock() throws Exception {
        this.testCancellationWithSourceBlockedOnLock(false);
    }

    @Test
    public void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception {
        this.testCancellationWithSourceBlockedOnLock(true);
    }

    public void testCancellationWithSourceBlockedOnLock(boolean throwInCancel) throws Exception {
        block4: {
            StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
            CancelLockingSource.reset();
            testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new StreamSource((SourceFunction)new CancelLockingSource(throwInCancel))).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
            StreamConfig streamConfig = testHarness.getStreamConfig();
            streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            testHarness.invoke();
            CancelLockingSource.awaitRunning();
            try {
                testHarness.getTask().cancel();
            }
            catch (ExpectedTestException e) {
                Preconditions.checkState((boolean)throwInCancel);
            }
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Throwable t) {
                if (ExceptionUtils.findThrowable((Throwable)t, CancelTaskException.class).isPresent()) break block4;
                throw t;
            }
        }
    }

    @Test
    public void testInterruptedNotSwallowed() throws Exception {
        block2: {
            StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
            CancelLockingSource.reset();
            testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new StreamSource((SourceFunction)new InterruptedSource())).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
            StreamConfig streamConfig = testHarness.getStreamConfig();
            streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            testHarness.invoke();
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Exception e) {
                if (ExceptionUtils.findThrowable((Throwable)e, InterruptedException.class).isPresent()) break block2;
                throw e;
            }
        }
    }

    @Test
    public void cancellingForwardsExceptions() throws Exception {
        block2: {
            StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
            CompletableFuture<Void> operatorRunningWaitingFuture = new CompletableFuture<Void>();
            ExceptionThrowingSource.setIsInRunLoopFuture(operatorRunningWaitingFuture);
            testHarness.setupOutputForSingletonOperatorChain();
            StreamConfig streamConfig = testHarness.getStreamConfig();
            streamConfig.setStreamOperator((StreamOperator)new StreamSource((SourceFunction)new ExceptionThrowingSource()));
            streamConfig.setOperatorID(new OperatorID());
            testHarness.invoke();
            operatorRunningWaitingFuture.get();
            testHarness.getTask().cancel();
            Optional testException = Optional.empty();
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Throwable t) {
                if (ExceptionUtils.findThrowable((Throwable)t, CancelTaskException.class).isPresent()) break block2;
                throw t;
            }
        }
    }

    @Test
    public void finishingIgnoresExceptions() throws Exception {
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        CompletableFuture<Void> operatorRunningWaitingFuture = new CompletableFuture<Void>();
        ExceptionThrowingSource.setIsInRunLoopFuture(operatorRunningWaitingFuture);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator((StreamOperator)new StreamSource((SourceFunction)new ExceptionThrowingSource()));
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        operatorRunningWaitingFuture.get();
        testHarness.getTask().finishTask();
        testHarness.waitForTaskCompletion();
    }

    private static class ExceptionThrowingSource
    implements SourceFunction<String> {
        private static volatile CompletableFuture<Void> isInRunLoop;
        private volatile boolean running = true;

        private ExceptionThrowingSource() {
        }

        public static void setIsInRunLoopFuture(@Nonnull CompletableFuture<Void> waitingLatch) {
            isInRunLoop = waitingLatch;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws TestException {
            Preconditions.checkState((isInRunLoop != null && !isInRunLoop.isDone() ? 1 : 0) != 0);
            while (this.running) {
                if (!isInRunLoop.isDone()) {
                    isInRunLoop.complete(null);
                }
                ctx.collect((Object)"hello");
            }
            throw new TestException("Oh no, we're failing.");
        }

        public void cancel() {
            this.running = false;
        }

        public static class TestException
        extends RuntimeException {
            public TestException(String message) {
                super(message);
            }
        }
    }

    private static class CancelTestSource
    extends FromElementsFunction<String> {
        private static final long serialVersionUID = 8713065281092996067L;
        private static CompletableFuture<Void> dataProcessing = new CompletableFuture();
        private static CompletableFuture<Void> cancellationWaiting = new CompletableFuture();

        public CancelTestSource(TypeSerializer<String> serializer, String ... elements) throws IOException {
            super(serializer, (Object[])elements);
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            super.run(ctx);
            dataProcessing.complete(null);
            cancellationWaiting.get();
        }

        public void cancel() {
            super.cancel();
            cancellationWaiting.complete(null);
        }

        public static CompletableFuture<Void> getDataProcessing() {
            return dataProcessing;
        }
    }

    private static class OpenCloseTestSource
    extends RichSourceFunction<String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        OpenCloseTestSource() {
            openCalled = false;
            closeCalled = false;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            for (int i = 0; i < 10; ++i) {
                ctx.collect((Object)("Hello" + i));
            }
        }

        public void cancel() {
        }
    }

    private static class Checkpointer
    implements Callable<Boolean> {
        private final int numCheckpoints;
        private final int checkpointInterval;
        private final AtomicLong checkpointId;
        private final StreamTask<Tuple2<Long, Integer>, ?> sourceTask;

        public Checkpointer(int numCheckpoints, int checkpointInterval, StreamTask<Tuple2<Long, Integer>, ?> task) {
            this.numCheckpoints = numCheckpoints;
            this.checkpointId = new AtomicLong(0L);
            this.sourceTask = task;
            this.checkpointInterval = checkpointInterval;
        }

        @Override
        public Boolean call() throws Exception {
            for (int i = 0; i < this.numCheckpoints; ++i) {
                long currentCheckpointId = this.checkpointId.getAndIncrement();
                CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
                this.sourceTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
                Thread.sleep(this.checkpointInterval);
            }
            return true;
        }
    }

    private static class MockSource
    implements SourceFunction<Tuple2<Long, Integer>>,
    ListCheckpointed<Serializable> {
        private static final long serialVersionUID = 1L;
        private int maxElements;
        private int checkpointDelay;
        private int readDelay;
        private volatile int count;
        private volatile long lastCheckpointId = -1L;
        private Semaphore semaphore;
        private volatile boolean isRunning = true;

        public MockSource(int maxElements, int checkpointDelay, int readDelay) {
            this.maxElements = maxElements;
            this.checkpointDelay = checkpointDelay;
            this.readDelay = readDelay;
            this.count = 0;
            this.semaphore = new Semaphore(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Integer>> ctx) {
            Object lockObject = ctx.getCheckpointLock();
            while (this.isRunning && this.count < this.maxElements) {
                try {
                    Thread.sleep(this.readDelay);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                Object object = lockObject;
                synchronized (object) {
                    ctx.collect((Object)new Tuple2((Object)this.lastCheckpointId, (Object)this.count));
                    ++this.count;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public List<Serializable> snapshotState(long checkpointId, long timestamp) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of snapshotState.");
            }
            int startCount = this.count;
            this.lastCheckpointId = checkpointId;
            long sum = 0L;
            for (int i = 0; i < this.checkpointDelay; ++i) {
                sum += new Random().nextLong();
            }
            if (startCount != this.count) {
                this.semaphore.release();
                Assert.fail((String)"Count is different at start end end of snapshot.");
            }
            this.semaphore.release();
            return Collections.singletonList(sum);
        }

        public void restoreState(List<Serializable> state) throws Exception {
        }
    }

    public static class InterruptedSource
    implements SourceFunction<String> {
        private static final long serialVersionUID = 8713065281092996042L;
        private static CompletableFuture<Void> isRunning = new CompletableFuture();

        public static void reset() {
            isRunning = new CompletableFuture();
        }

        public static void awaitRunning() throws ExecutionException, InterruptedException {
            isRunning.get();
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                isRunning.complete(null);
                Thread.currentThread().interrupt();
                throw new InterruptedException();
            }
        }

        public void cancel() {
        }
    }

    public static class CancelLockingSource
    implements SourceFunction<String> {
        private static final long serialVersionUID = 8713065281092996042L;
        private static CompletableFuture<Void> isRunning = new CompletableFuture();
        private final boolean throwOnCancel;
        private volatile boolean cancelled = false;

        public CancelLockingSource(boolean throwOnCancel) {
            this.throwOnCancel = throwOnCancel;
        }

        public static void reset() {
            isRunning = new CompletableFuture();
        }

        public static void awaitRunning() throws ExecutionException, InterruptedException {
            isRunning.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                while (!this.cancelled) {
                    isRunning.complete(null);
                    if (this.throwOnCancel) {
                        Thread.sleep(1000000000L);
                        continue;
                    }
                    try {
                        Thread.sleep(1000000000L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        public void cancel() {
            if (this.throwOnCancel) {
                throw new ExpectedTestException();
            }
            this.cancelled = true;
        }
    }
}

