/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.source;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.OnEventTestWatermarkGenerator;
import org.apache.flink.streaming.api.operators.source.OnPeriodicTestWatermarkGenerator;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SourceOperatorEventTimeTest {
    private final boolean emitProgressiveWatermarks;

    @Parameterized.Parameters(name="Emit progressive watermarks: {0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    public SourceOperatorEventTimeTest(boolean emitProgressiveWatermarks) {
        this.emitProgressiveWatermarks = emitProgressiveWatermarks;
    }

    @Test
    public void testMainOutputPeriodicWatermarks() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnPeriodicTestWatermarkGenerator());
        List<Watermark> result = this.testSequenceOfWatermarks(this.emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> output.collect((Object)0, 100L), output -> output.collect((Object)0, 120L), output -> output.collect((Object)0, 110L));
        this.assertWatermarksOrEmpty(result, new Watermark(100L), new Watermark(120L));
    }

    @Test
    public void testMainOutputEventWatermarks() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        List<Watermark> result = this.testSequenceOfWatermarks(this.emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> output.collect((Object)0, 100L), output -> output.collect((Object)0, 120L), output -> output.collect((Object)0, 110L));
        this.assertWatermarksOrEmpty(result, new Watermark(100L), new Watermark(120L));
    }

    @Test
    public void testPerSplitOutputPeriodicWatermarks() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnPeriodicTestWatermarkGenerator());
        List<Watermark> result = this.testSequenceOfWatermarks(this.emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> {
            output.createOutputForSplit("A");
            output.createOutputForSplit("B");
        }, output -> output.createOutputForSplit("A").collect((Object)0, 100L), output -> output.createOutputForSplit("B").collect((Object)0, 200L), output -> output.createOutputForSplit("A").collect((Object)0, 150L), output -> output.releaseOutputForSplit("A"), output -> output.createOutputForSplit("B").collect((Object)0, 200L));
        this.assertWatermarksOrEmpty(result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
    }

    @Test
    public void testPerSplitOutputEventWatermarks() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        List<Watermark> result = this.testSequenceOfWatermarks(this.emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> {
            output.createOutputForSplit("one");
            output.createOutputForSplit("two");
        }, output -> output.createOutputForSplit("one").collect((Object)0, 100L), output -> output.createOutputForSplit("two").collect((Object)0, 200L), output -> output.createOutputForSplit("one").collect((Object)0, 150L), output -> output.releaseOutputForSplit("one"), output -> output.createOutputForSplit("two").collect((Object)0, 200L));
        this.assertWatermarksOrEmpty(result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
    }

    @Test
    public void testCreatingPerSplitOutputOnSplitAddition() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        InterpretingSourceReader reader = new InterpretingSourceReader(new Consumer[]{output -> output.createOutputForSplit("1").collect((Object)0, 100L), output -> output.createOutputForSplit("1").collect((Object)0, 200L), output -> output.createOutputForSplit("1").collect((Object)0, 300L), output -> output.createOutputForSplit("2").collect((Object)0, 150L), output -> output.createOutputForSplit("2").collect((Object)0, 400L)});
        SourceOperator<Integer, MockSourceSplit> sourceOperator = TestingSourceOperator.createTestOperator(reader, watermarkStrategy, this.emitProgressiveWatermarks);
        sourceOperator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(new MockSourceSplit(1), new MockSourceSplit(2)), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        List<Watermark> result = this.testSequenceOfWatermarks(sourceOperator);
        this.assertWatermarksOrEmpty(result, new Watermark(150L), new Watermark(300L));
    }

    private void assertWatermarksOrEmpty(List<Watermark> actualWatermarks, Watermark ... expectedWatermarks) {
        if (this.emitProgressiveWatermarks) {
            ArrayList watermarks = Lists.newArrayList((Object[])expectedWatermarks);
            Assert.assertThat(actualWatermarks, (Matcher)Matchers.contains((Object[])watermarks.toArray()));
        } else {
            Assert.assertThat(actualWatermarks, (Matcher)Matchers.hasSize((int)0));
        }
    }

    @SafeVarargs
    private final List<Watermark> testSequenceOfWatermarks(boolean emitProgressiveWatermarks, WatermarkStrategy<Integer> watermarkStrategy, Consumer<ReaderOutput<Integer>> ... actions) throws Exception {
        InterpretingSourceReader reader = new InterpretingSourceReader((Consumer[])actions);
        SourceOperator<Integer, MockSourceSplit> sourceOperator = TestingSourceOperator.createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks);
        return this.testSequenceOfWatermarks(sourceOperator);
    }

    private final List<Watermark> testSequenceOfWatermarks(SourceOperator<Integer, MockSourceSplit> sourceOperator) throws Exception {
        List<Object> allEvents = this.testSequenceOfEvents(sourceOperator);
        return allEvents.stream().filter(evt -> evt instanceof Watermark).map(evt -> (Watermark)evt).collect(Collectors.toList());
    }

    private final List<Object> testSequenceOfEvents(SourceOperator<Integer, MockSourceSplit> sourceOperator) throws Exception {
        CollectingDataOutput out = new CollectingDataOutput();
        TestProcessingTimeService timeService = (TestProcessingTimeService)sourceOperator.getProcessingTimeService();
        while (sourceOperator.emitNext(out) != DataInputStatus.END_OF_INPUT) {
            timeService.setCurrentTime(timeService.getCurrentProcessingTime() + 100L);
        }
        return out.events;
    }

    private static final class InterpretingSourceReader
    implements SourceReader<Integer, MockSourceSplit> {
        private final Iterator<Consumer<ReaderOutput<Integer>>> actions;

        @SafeVarargs
        private InterpretingSourceReader(Consumer<ReaderOutput<Integer>> ... actions) {
            this.actions = Arrays.asList(actions).iterator();
        }

        public void start() {
        }

        public InputStatus pollNext(ReaderOutput<Integer> output) {
            if (this.actions.hasNext()) {
                this.actions.next().accept(output);
                return InputStatus.MORE_AVAILABLE;
            }
            return InputStatus.END_OF_INPUT;
        }

        public List<MockSourceSplit> snapshotState(long checkpointId) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Void> isAvailable() {
            return CompletableFuture.completedFuture(null);
        }

        public void addSplits(List<MockSourceSplit> splits) {
        }

        public void notifyNoMoreSplits() {
        }

        public void close() {
        }
    }
}

