/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Assert;
import org.junit.Test;

public class StreamTaskTimerTest {
    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap(new DummyMapFunction());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        OneInputStreamTask mapTask = testHarness.getTask();
        mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback(){

            public void onProcessingTime(long timestamp) {
            }
        });
        Assert.assertEquals((long)1L, (long)StreamTask.TRIGGER_THREAD_GROUP.activeCount());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        long deadline = System.currentTimeMillis() + 4000L;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((String)"Trigger timer thread did not properly shut down", (long)0L, (long)StreamTask.TRIGGER_THREAD_GROUP.activeCount());
    }

    @Test
    public void checkScheduledTimestampe() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap(new DummyMapFunction());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        OneInputStreamTask mapTask = testHarness.getTask();
        AtomicReference errorRef = new AtomicReference();
        long t1 = System.currentTimeMillis();
        long t2 = System.currentTimeMillis() - 200L;
        long t3 = System.currentTimeMillis() + 100L;
        long t4 = System.currentTimeMillis() + 200L;
        ProcessingTimeService timeService = mapTask.getProcessingTimeService();
        timeService.registerTimer(t1, (ProcessingTimeCallback)new ValidatingProcessingTimeCallback(errorRef, t1, 0));
        timeService.registerTimer(t2, (ProcessingTimeCallback)new ValidatingProcessingTimeCallback(errorRef, t2, 1));
        timeService.registerTimer(t3, (ProcessingTimeCallback)new ValidatingProcessingTimeCallback(errorRef, t3, 2));
        timeService.registerTimer(t4, (ProcessingTimeCallback)new ValidatingProcessingTimeCallback(errorRef, t4, 3));
        long deadline = System.currentTimeMillis() + 20000L;
        while (errorRef.get() == null && ValidatingProcessingTimeCallback.numInSequence < 4 && System.currentTimeMillis() < deadline) {
            Thread.sleep(100L);
        }
        if (errorRef.get() != null) {
            ((Throwable)errorRef.get()).printStackTrace();
            Assert.fail((String)((Throwable)errorRef.get()).getMessage());
        }
        Assert.assertEquals((long)4L, (long)ValidatingProcessingTimeCallback.numInSequence);
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        deadline = System.currentTimeMillis() + 4000L;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((String)"Trigger timer thread did not properly shut down", (long)0L, (long)StreamTask.TRIGGER_THREAD_GROUP.activeCount());
    }

    public static class DummyMapFunction<T>
    implements MapFunction<T, T> {
        public T map(T value) {
            return value;
        }
    }

    private static class ValidatingProcessingTimeCallback
    implements ProcessingTimeCallback {
        static int numInSequence;
        private final AtomicReference<Throwable> errorRef;
        private final long expectedTimestamp;
        private final int expectedInSequence;

        private ValidatingProcessingTimeCallback(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) {
            this.errorRef = errorRef;
            this.expectedTimestamp = expectedTimestamp;
            this.expectedInSequence = expectedInSequence;
        }

        public void onProcessingTime(long timestamp) {
            try {
                Assert.assertEquals((long)this.expectedTimestamp, (long)timestamp);
                Assert.assertEquals((long)this.expectedInSequence, (long)numInSequence);
                ++numInSequence;
            }
            catch (Throwable t) {
                this.errorRef.compareAndSet(null, t);
            }
        }
    }
}

