/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={ResultPartitionWriter.class})
@PowerMockIgnore(value={"javax.management.*", "com.sun.jndi.*"})
public class StreamTaskTimerTest {
    @Test
    public void testCustomTimeServiceProvider() throws Throwable {
        TestTimeServiceProvider tp = new TestTimeServiceProvider();
        OneInputStreamTask mapTask = new OneInputStreamTask();
        mapTask.setTimeService((TimeServiceProvider)tp);
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap(new DummyMapFunction());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        testHarness.invoke();
        Assert.assertTrue((testHarness.getCurrentProcessingTime() == 0L ? 1 : 0) != 0);
        tp.setCurrentTime(11L);
        Assert.assertTrue((testHarness.getCurrentProcessingTime() == 11L ? 1 : 0) != 0);
        tp.setCurrentTime(15L);
        tp.setCurrentTime(16L);
        Assert.assertTrue((testHarness.getCurrentProcessingTime() == 16L ? 1 : 0) != 0);
        mapTask.registerTimer(30L, new Triggerable(){

            public void trigger(long timestamp) {
            }
        });
        mapTask.registerTimer(40L, new Triggerable(){

            public void trigger(long timestamp) {
            }
        });
        Assert.assertEquals((long)2L, (long)tp.getNoOfRegisteredTimers());
        tp.setCurrentTime(35L);
        Assert.assertEquals((long)1L, (long)tp.getNoOfRegisteredTimers());
        tp.setCurrentTime(40L);
        Assert.assertEquals((long)0L, (long)tp.getNoOfRegisteredTimers());
        tp.shutdownService();
    }

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        OneInputStreamTask mapTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap(new DummyMapFunction());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        mapTask.registerTimer(System.currentTimeMillis(), new Triggerable(){

            public void trigger(long timestamp) {
            }
        });
        Assert.assertEquals((long)1L, (long)StreamTask.TRIGGER_THREAD_GROUP.activeCount());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        long deadline = System.currentTimeMillis() + 4000L;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((String)"Trigger timer thread did not properly shut down", (long)0L, (long)StreamTask.TRIGGER_THREAD_GROUP.activeCount());
    }

    @Test
    public void checkScheduledTimestampe() {
        try {
            OneInputStreamTask mapTask = new OneInputStreamTask();
            OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
            StreamConfig streamConfig = testHarness.getStreamConfig();
            StreamMap mapOperator = new StreamMap(new DummyMapFunction());
            streamConfig.setStreamOperator((StreamOperator)mapOperator);
            testHarness.invoke();
            testHarness.waitForTaskRunning();
            AtomicReference errorRef = new AtomicReference();
            long t1 = System.currentTimeMillis();
            long t2 = System.currentTimeMillis() - 200L;
            long t3 = System.currentTimeMillis() + 100L;
            long t4 = System.currentTimeMillis() + 200L;
            mapTask.registerTimer(t1, (Triggerable)new ValidatingTriggerable(errorRef, t1, 0));
            mapTask.registerTimer(t2, (Triggerable)new ValidatingTriggerable(errorRef, t2, 1));
            mapTask.registerTimer(t3, (Triggerable)new ValidatingTriggerable(errorRef, t3, 2));
            mapTask.registerTimer(t4, (Triggerable)new ValidatingTriggerable(errorRef, t4, 3));
            long deadline = System.currentTimeMillis() + 20000L;
            while (errorRef.get() == null && ValidatingTriggerable.numInSequence < 4 && System.currentTimeMillis() < deadline) {
                Thread.sleep(100L);
            }
            if (errorRef.get() != null) {
                ((Throwable)errorRef.get()).printStackTrace();
                Assert.fail((String)((Throwable)errorRef.get()).getMessage());
            }
            Assert.assertEquals((long)4L, (long)ValidatingTriggerable.numInSequence);
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
            deadline = System.currentTimeMillis() + 4000L;
            while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
                Thread.sleep(10L);
            }
            Assert.assertEquals((String)"Trigger timer thread did not properly shut down", (long)0L, (long)StreamTask.TRIGGER_THREAD_GROUP.activeCount());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    public static class DummyMapFunction<T>
    implements MapFunction<T, T> {
        public T map(T value) {
            return value;
        }
    }

    private static class ValidatingTriggerable
    implements Triggerable {
        static int numInSequence;
        private final AtomicReference<Throwable> errorRef;
        private final long expectedTimestamp;
        private final int expectedInSequence;

        private ValidatingTriggerable(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) {
            this.errorRef = errorRef;
            this.expectedTimestamp = expectedTimestamp;
            this.expectedInSequence = expectedInSequence;
        }

        public void trigger(long timestamp) {
            try {
                Assert.assertEquals((long)this.expectedTimestamp, (long)timestamp);
                Assert.assertEquals((long)this.expectedInSequence, (long)numInSequence);
                ++numInSequence;
            }
            catch (Throwable t) {
                this.errorRef.compareAndSet(null, t);
            }
        }
    }
}

