package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.class */
public class StreamSourceOperatorWatermarksTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest$FiniteSource.class */
    private static final class FiniteSource<T> implements SourceFunction<T>, StoppableFunction {
        private FiniteSource() {
        }

        public void run(SourceFunction.SourceContext<T> sourceContext) {
        }

        public void cancel() {
        }

        public void stop() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest$InfiniteSource.class */
    private static final class InfiniteSource<T> implements SourceFunction<T>, StoppableFunction {
        private volatile boolean running;

        private InfiniteSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
            while (this.running) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void stop() {
            this.running = false;
        }
    }

    @Test
    public void testEmitMaxWatermarkForFiniteSource() throws Exception {
        StreamSource streamSource = new StreamSource(new FiniteSource());
        ArrayList arrayList = new ArrayList();
        setupSourceOperator(streamSource, TimeCharacteristic.EventTime, 0L);
        streamSource.run(new Object(), (StreamStatusMaintainer) Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(arrayList));
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertEquals(Watermark.MAX_WATERMARK, arrayList.get(0));
    }

    @Test
    public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
        ArrayList arrayList = new ArrayList();
        StreamSource streamSource = new StreamSource(new InfiniteSource());
        setupSourceOperator(streamSource, TimeCharacteristic.EventTime, 0L);
        streamSource.cancel();
        streamSource.run(new Object(), (StreamStatusMaintainer) Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(arrayList));
        Assert.assertTrue(arrayList.isEmpty());
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [org.apache.flink.streaming.runtime.operators.StreamSourceOperatorWatermarksTest$1] */
    @Test
    public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
        ArrayList arrayList = new ArrayList();
        final Thread currentThread = Thread.currentThread();
        final StreamSource streamSource = new StreamSource(new InfiniteSource());
        setupSourceOperator(streamSource, TimeCharacteristic.EventTime, 0L);
        new Thread("canceler") { // from class: org.apache.flink.streaming.runtime.operators.StreamSourceOperatorWatermarksTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                }
                streamSource.cancel();
                currentThread.interrupt();
            }
        }.start();
        try {
            streamSource.run(new Object(), (StreamStatusMaintainer) Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(arrayList));
        } catch (InterruptedException e) {
        }
        Assert.assertTrue(arrayList.isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnImmediateStop() throws Exception {
        ArrayList arrayList = new ArrayList();
        StoppableStreamSource stoppableStreamSource = new StoppableStreamSource(new InfiniteSource());
        setupSourceOperator(stoppableStreamSource, TimeCharacteristic.EventTime, 0L);
        stoppableStreamSource.stop();
        stoppableStreamSource.run(new Object(), (StreamStatusMaintainer) Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(arrayList));
        Assert.assertTrue(arrayList.isEmpty());
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [org.apache.flink.streaming.runtime.operators.StreamSourceOperatorWatermarksTest$2] */
    @Test
    public void testNoMaxWatermarkOnAsyncStop() throws Exception {
        ArrayList arrayList = new ArrayList();
        final StoppableStreamSource stoppableStreamSource = new StoppableStreamSource(new InfiniteSource());
        setupSourceOperator(stoppableStreamSource, TimeCharacteristic.EventTime, 0L);
        new Thread("canceler") { // from class: org.apache.flink.streaming.runtime.operators.StreamSourceOperatorWatermarksTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                }
                stoppableStreamSource.stop();
            }
        }.start();
        stoppableStreamSource.run(new Object(), (StreamStatusMaintainer) Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(arrayList));
        Assert.assertTrue(arrayList.isEmpty());
    }

    @Test
    public void testAutomaticWatermarkContext() throws Exception {
        StoppableStreamSource stoppableStreamSource = new StoppableStreamSource(new InfiniteSource());
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        setupSourceOperator(stoppableStreamSource, TimeCharacteristic.IngestionTime, 10L, testProcessingTimeService);
        ArrayList arrayList = new ArrayList();
        StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime, stoppableStreamSource.getContainingTask().getProcessingTimeService(), stoppableStreamSource.getContainingTask().getCheckpointLock(), stoppableStreamSource.getContainingTask().getStreamStatusMaintainer(), new CollectorOutput(arrayList), stoppableStreamSource.getExecutionConfig().getAutoWatermarkInterval(), -1L);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                break;
            }
            testProcessingTimeService.setCurrentTime(j2);
            j = j2 + 10;
        }
        Assert.assertTrue(arrayList.size() == 9);
        long j3 = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            j3 += 10;
            Assert.assertTrue(((StreamElement) it.next()).getTimestamp() == j3);
        }
    }

    private static <T> void setupSourceOperator(StreamSource<T, ?> streamSource, TimeCharacteristic timeCharacteristic, long j) {
        setupSourceOperator(streamSource, timeCharacteristic, j, new TestProcessingTimeService());
    }

    private static <T> void setupSourceOperator(StreamSource<T, ?> streamSource, TimeCharacteristic timeCharacteristic, long j, final ProcessingTimeService processingTimeService) {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setAutoWatermarkInterval(j);
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStateBackend(new MemoryStateBackend());
        streamConfig.setTimeCharacteristic(timeCharacteristic);
        streamConfig.setOperatorID(new OperatorID());
        DummyEnvironment dummyEnvironment = new DummyEnvironment("MockTwoInputTask", 1, 0);
        StreamStatusMaintainer streamStatusMaintainer = (StreamStatusMaintainer) Mockito.mock(StreamStatusMaintainer.class);
        Mockito.when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.getName()).thenReturn("Mock Task");
        Mockito.when(streamTask.getCheckpointLock()).thenReturn(new Object());
        Mockito.when(streamTask.getConfiguration()).thenReturn(streamConfig);
        Mockito.when(streamTask.getEnvironment()).thenReturn(dummyEnvironment);
        Mockito.when(streamTask.getExecutionConfig()).thenReturn(executionConfig);
        Mockito.when(streamTask.getAccumulatorMap()).thenReturn(Collections.emptyMap());
        Mockito.when(streamTask.getStreamStatusMaintainer()).thenReturn(streamStatusMaintainer);
        ((StreamTask) Mockito.doAnswer(new Answer<ProcessingTimeService>() { // from class: org.apache.flink.streaming.runtime.operators.StreamSourceOperatorWatermarksTest.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ProcessingTimeService m55answer(InvocationOnMock invocationOnMock) throws Throwable {
                if (processingTimeService == null) {
                    throw new RuntimeException("The time provider is null.");
                }
                return processingTimeService;
            }
        }).when(streamTask)).getProcessingTimeService();
        streamSource.setup(streamTask, streamConfig, (Output) Mockito.mock(Output.class));
    }
}
