/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.operators.StreamTaskTimerTest;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.junit.Assert;
import org.junit.Test;

public class TestProcessingTimeServiceTest {
    @Test
    public void testCustomTimeServiceProvider() throws Throwable {
        TestProcessingTimeService tp = new TestProcessingTimeService();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(env -> new OneInputStreamTask(env, (ProcessingTimeService)tp), BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap(new StreamTaskTimerTest.DummyMapFunction());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        OneInputStreamTask mapTask = testHarness.getTask();
        Assert.assertEquals((long)Long.MIN_VALUE, (long)testHarness.getProcessingTimeService().getCurrentProcessingTime());
        tp.setCurrentTime(11L);
        Assert.assertEquals((long)testHarness.getProcessingTimeService().getCurrentProcessingTime(), (long)11L);
        tp.setCurrentTime(15L);
        tp.setCurrentTime(16L);
        Assert.assertEquals((long)testHarness.getProcessingTimeService().getCurrentProcessingTime(), (long)16L);
        mapTask.getProcessingTimeService().registerTimer(30L, new ProcessingTimeCallback(){

            public void onProcessingTime(long timestamp) {
            }
        });
        mapTask.getProcessingTimeService().registerTimer(40L, new ProcessingTimeCallback(){

            public void onProcessingTime(long timestamp) {
            }
        });
        Assert.assertEquals((long)2L, (long)tp.getNumActiveTimers());
        tp.setCurrentTime(35L);
        Assert.assertEquals((long)1L, (long)tp.getNumActiveTimers());
        tp.setCurrentTime(40L);
        Assert.assertEquals((long)0L, (long)tp.getNumActiveTimers());
        tp.shutdownService();
    }

    public static class ReferenceSettingExceptionHandler
    implements AsyncExceptionHandler {
        private final AtomicReference<Throwable> errorReference;

        public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
            this.errorReference = errorReference;
        }

        public void handleAsyncException(String message, Throwable exception) {
            this.errorReference.compareAndSet(null, exception);
        }
    }
}

