/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Serializable;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={ResultPartitionWriter.class})
@PowerMockIgnore(value={"javax.management.*", "com.sun.jndi.*"})
public class SourceStreamTaskTest {
    @Test
    public void testOpenClose() throws Exception {
        SourceStreamTask sourceTask = new SourceStreamTask();
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness((AbstractInvokable)sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamSource sourceOperator = new StreamSource((SourceFunction)new OpenCloseTestSource());
        streamConfig.setStreamOperator((StreamOperator)sourceOperator);
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        Assert.assertTrue((String)"RichFunction methods where not called.", (boolean)OpenCloseTestSource.closeCalled);
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)10L, (long)resultElements.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointing() throws Exception {
        int NUM_ELEMENTS = 100;
        int NUM_CHECKPOINTS = 100;
        boolean NUM_CHECKPOINTERS = true;
        int CHECKPOINT_INTERVAL = 5;
        int SOURCE_CHECKPOINT_DELAY = 1000;
        boolean SOURCE_READ_DELAY = true;
        ExecutorService executor = Executors.newFixedThreadPool(10);
        try {
            int i;
            TupleTypeInfo typeInfo = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
            SourceStreamTask sourceTask = new SourceStreamTask();
            StreamTaskTestHarness testHarness = new StreamTaskTestHarness((AbstractInvokable)sourceTask, typeInfo);
            StreamConfig streamConfig = testHarness.getStreamConfig();
            StreamSource sourceOperator = new StreamSource((SourceFunction)new MockSource(100, 1000, 1));
            streamConfig.setStreamOperator((StreamOperator)sourceOperator);
            Future[] checkpointerResults = new Future[1];
            testHarness.invoke();
            for (i = 0; i < 1; ++i) {
                checkpointerResults[i] = executor.submit(new Checkpointer(100, 5, (StreamTask<Tuple2<Long, Integer>, ?>)sourceTask));
            }
            testHarness.waitForTaskCompletion();
            for (i = 0; i < 1; ++i) {
                if (!checkpointerResults[i].isDone()) {
                    checkpointerResults[i].cancel(true);
                }
                if (checkpointerResults[i].isCancelled()) continue;
                checkpointerResults[i].get();
            }
            List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
            Assert.assertEquals((long)100L, (long)resultElements.size());
        }
        finally {
            executor.shutdown();
        }
    }

    public static class OpenCloseTestSource
    extends RichSourceFunction<String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            for (int i = 0; i < 10; ++i) {
                ctx.collect((Object)("Hello" + i));
            }
        }

        public void cancel() {
        }
    }

    private static class Checkpointer
    implements Callable<Boolean> {
        private final int numCheckpoints;
        private final int checkpointInterval;
        private final AtomicLong checkpointId;
        private final StreamTask<Tuple2<Long, Integer>, ?> sourceTask;

        public Checkpointer(int numCheckpoints, int checkpointInterval, StreamTask<Tuple2<Long, Integer>, ?> task) {
            this.numCheckpoints = numCheckpoints;
            this.checkpointId = new AtomicLong(0L);
            this.sourceTask = task;
            this.checkpointInterval = checkpointInterval;
        }

        @Override
        public Boolean call() throws Exception {
            for (int i = 0; i < this.numCheckpoints; ++i) {
                long currentCheckpointId = this.checkpointId.getAndIncrement();
                this.sourceTask.triggerCheckpoint(currentCheckpointId, 0L);
                Thread.sleep(this.checkpointInterval);
            }
            return true;
        }
    }

    private static class MockSource
    implements SourceFunction<Tuple2<Long, Integer>>,
    Checkpointed<Serializable> {
        private static final long serialVersionUID = 1L;
        private int maxElements;
        private int checkpointDelay;
        private int readDelay;
        private volatile int count;
        private volatile long lastCheckpointId = -1L;
        private Semaphore semaphore;
        private volatile boolean isRunning = true;

        public MockSource(int maxElements, int checkpointDelay, int readDelay) {
            this.maxElements = maxElements;
            this.checkpointDelay = checkpointDelay;
            this.readDelay = readDelay;
            this.count = 0;
            this.semaphore = new Semaphore(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Integer>> ctx) {
            Object lockObject = ctx.getCheckpointLock();
            while (this.isRunning && this.count < this.maxElements) {
                try {
                    Thread.sleep(this.readDelay);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Object object = lockObject;
                synchronized (object) {
                    ctx.collect((Object)new Tuple2((Object)this.lastCheckpointId, (Object)this.count));
                    ++this.count;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of snapshotState.");
            }
            int startCount = this.count;
            this.lastCheckpointId = checkpointId;
            long sum = 0L;
            for (int i = 0; i < this.checkpointDelay; ++i) {
                sum += new Random().nextLong();
            }
            if (startCount != this.count) {
                this.semaphore.release();
                Assert.fail((String)"Count is different at start end end of snapshot.");
            }
            this.semaphore.release();
            return Long.valueOf(sum);
        }

        public void restoreState(Serializable state) {
        }
    }
}

