/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class StreamSourceOperatorTest {
    @Test
    public void testEmitMaxWatermarkForFiniteSource() throws Exception {
        StreamSource operator = new StreamSource(new FiniteSource());
        ArrayList output = new ArrayList();
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, null);
        operator.run(new Object(), new CollectorOutput(output));
        Assert.assertEquals((long)1L, (long)output.size());
        Assert.assertEquals((Object)Watermark.MAX_WATERMARK, output.get(0));
    }

    @Test
    public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
        ArrayList output = new ArrayList();
        StreamSource operator = new StreamSource(new InfiniteSource());
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, null);
        operator.cancel();
        operator.run(new Object(), new CollectorOutput(output));
        Assert.assertTrue((boolean)output.isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
        ArrayList output = new ArrayList();
        final Thread runner = Thread.currentThread();
        final StreamSource operator = new StreamSource(new InfiniteSource());
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, null);
        new Thread("canceler"){

            @Override
            public void run() {
                try {
                    Thread.sleep(200L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                operator.cancel();
                runner.interrupt();
            }
        }.start();
        try {
            operator.run(new Object(), new CollectorOutput(output));
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)output.isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnImmediateStop() throws Exception {
        ArrayList output = new ArrayList();
        StoppableStreamSource operator = new StoppableStreamSource(new InfiniteSource());
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, null);
        operator.stop();
        operator.run(new Object(), new CollectorOutput(output));
        Assert.assertTrue((boolean)output.isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnAsyncStop() throws Exception {
        ArrayList output = new ArrayList();
        final StoppableStreamSource operator = new StoppableStreamSource(new InfiniteSource());
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, null);
        new Thread("canceler"){

            @Override
            public void run() {
                try {
                    Thread.sleep(200L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                operator.stop();
            }
        }.start();
        operator.run(new Object(), new CollectorOutput(output));
        Assert.assertTrue((boolean)output.isEmpty());
    }

    @Test
    public void testAutomaticWatermarkContext() throws Exception {
        StoppableStreamSource operator = new StoppableStreamSource(new InfiniteSource());
        long watermarkInterval = 10L;
        TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, (TimeServiceProvider)timeProvider);
        ArrayList output = new ArrayList();
        StreamSource.AutomaticWatermarkContext ctx = new StreamSource.AutomaticWatermarkContext((StreamSource)operator, operator.getContainingTask().getCheckpointLock(), new CollectorOutput(output), operator.getExecutionConfig().getAutoWatermarkInterval());
        for (long i = 1L; i < 100L; i += watermarkInterval) {
            timeProvider.setCurrentTime(i);
        }
        Assert.assertTrue((output.size() == 9 ? 1 : 0) != 0);
        long nextWatermark = 0L;
        for (StreamElement el : output) {
            Watermark wm = (Watermark)el;
            Assert.assertTrue((wm.getTimestamp() == (nextWatermark += watermarkInterval) ? 1 : 0) != 0);
        }
    }

    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, TimeCharacteristic timeChar, long watermarkInterval, final TimeServiceProvider timeProvider) {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setAutoWatermarkInterval(watermarkInterval);
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setTimeCharacteristic(timeChar);
        DummyEnvironment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
        StreamTask mockTask = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)mockTask.getName()).thenReturn((Object)"Mock Task");
        Mockito.when((Object)mockTask.getCheckpointLock()).thenReturn(new Object());
        Mockito.when((Object)mockTask.getConfiguration()).thenReturn((Object)cfg);
        Mockito.when((Object)mockTask.getEnvironment()).thenReturn((Object)env);
        Mockito.when((Object)mockTask.getExecutionConfig()).thenReturn((Object)executionConfig);
        Mockito.when((Object)mockTask.getAccumulatorMap()).thenReturn(Collections.emptyMap());
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<ScheduledFuture>(){

            public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable {
                final long execTime = (Long)invocation.getArguments()[0];
                final Triggerable target = (Triggerable)invocation.getArguments()[1];
                if (timeProvider == null) {
                    throw new RuntimeException("The time provider is null");
                }
                timeProvider.registerTimer(execTime, new Runnable(){

                    @Override
                    public void run() {
                        try {
                            target.trigger(execTime);
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                return null;
            }
        }).when((Object)mockTask)).registerTimer(Matchers.anyLong(), (Triggerable)Matchers.any(Triggerable.class));
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<Long>(){

            public Long answer(InvocationOnMock invocation) throws Throwable {
                if (timeProvider == null) {
                    throw new RuntimeException("The time provider is null");
                }
                return timeProvider.getCurrentProcessingTime();
            }
        }).when((Object)mockTask)).getCurrentProcessingTime();
        operator.setup(mockTask, cfg, (Output)Mockito.mock(Output.class));
    }

    private static class CollectorOutput<T>
    implements Output<StreamRecord<T>> {
        private final List<StreamElement> list;

        private CollectorOutput(List<StreamElement> list) {
            this.list = list;
        }

        public void emitWatermark(Watermark mark) {
            this.list.add((StreamElement)mark);
        }

        public void collect(StreamRecord<T> record) {
            this.list.add((StreamElement)record);
        }

        public void close() {
        }
    }

    private static final class InfiniteSource<T>
    implements SourceFunction<T>,
    StoppableFunction {
        private volatile boolean running = true;

        private InfiniteSource() {
        }

        public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
            while (this.running) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void stop() {
            this.running = false;
        }
    }

    private static final class FiniteSource<T>
    implements SourceFunction<T>,
    StoppableFunction {
        private FiniteSource() {
        }

        public void run(SourceFunction.SourceContext<T> ctx) {
        }

        public void cancel() {
        }

        public void stop() {
        }
    }
}

