package org.apache.flink.streaming.api.operators.source;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.class */
public class SourceOperatorEventTimeTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest$InterpretingSourceReader.class */
    public static final class InterpretingSourceReader implements SourceReader<Integer, MockSourceSplit> {
        private final Iterator<Consumer<ReaderOutput<Integer>>> actions;

        @SafeVarargs
        private InterpretingSourceReader(Consumer<ReaderOutput<Integer>>... consumerArr) {
            this.actions = Arrays.asList(consumerArr).iterator();
        }

        public void start() {
        }

        public InputStatus pollNext(ReaderOutput<Integer> readerOutput) {
            if (!this.actions.hasNext()) {
                return InputStatus.END_OF_INPUT;
            }
            this.actions.next().accept(readerOutput);
            return InputStatus.MORE_AVAILABLE;
        }

        public List<MockSourceSplit> snapshotState(long j) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Void> isAvailable() {
            return CompletableFuture.completedFuture(null);
        }

        public void addSplits(List<MockSourceSplit> list) {
        }

        public void notifyNoMoreSplits() {
        }

        public void close() {
        }
    }

    @Test
    public void testMainOutputPeriodicWatermarks() throws Exception {
        Assert.assertThat(testSequenceOfWatermarks(WatermarkStrategy.forGenerator(context -> {
            return new OnPeriodicTestWatermarkGenerator();
        }), readerOutput -> {
            readerOutput.collect(0, 100L);
        }, readerOutput2 -> {
            readerOutput2.collect(0, 120L);
        }, readerOutput3 -> {
            readerOutput3.collect(0, 110L);
        }), Matchers.contains(new Watermark[]{new Watermark(100L), new Watermark(120L)}));
    }

    @Test
    public void testMainOutputEventWatermarks() throws Exception {
        Assert.assertThat(testSequenceOfWatermarks(WatermarkStrategy.forGenerator(context -> {
            return new OnEventTestWatermarkGenerator();
        }), readerOutput -> {
            readerOutput.collect(0, 100L);
        }, readerOutput2 -> {
            readerOutput2.collect(0, 120L);
        }, readerOutput3 -> {
            readerOutput3.collect(0, 110L);
        }), Matchers.contains(new Watermark[]{new Watermark(100L), new Watermark(120L)}));
    }

    @Test
    public void testPerSplitOutputPeriodicWatermarks() throws Exception {
        Assert.assertThat(testSequenceOfWatermarks(WatermarkStrategy.forGenerator(context -> {
            return new OnPeriodicTestWatermarkGenerator();
        }), readerOutput -> {
            readerOutput.createOutputForSplit("A");
            readerOutput.createOutputForSplit("B");
        }, readerOutput2 -> {
            readerOutput2.createOutputForSplit("A").collect(0, 100L);
        }, readerOutput3 -> {
            readerOutput3.createOutputForSplit("B").collect(0, 200L);
        }, readerOutput4 -> {
            readerOutput4.createOutputForSplit("A").collect(0, 150L);
        }, readerOutput5 -> {
            readerOutput5.releaseOutputForSplit("A");
        }, readerOutput6 -> {
            readerOutput6.createOutputForSplit("B").collect(0, 200L);
        }), Matchers.contains(new Watermark[]{new Watermark(100L), new Watermark(150L), new Watermark(200L)}));
    }

    @Test
    public void testPerSplitOutputEventWatermarks() throws Exception {
        Assert.assertThat(testSequenceOfWatermarks(WatermarkStrategy.forGenerator(context -> {
            return new OnEventTestWatermarkGenerator();
        }), readerOutput -> {
            readerOutput.createOutputForSplit("one");
            readerOutput.createOutputForSplit("two");
        }, readerOutput2 -> {
            readerOutput2.createOutputForSplit("one").collect(0, 100L);
        }, readerOutput3 -> {
            readerOutput3.createOutputForSplit("two").collect(0, 200L);
        }, readerOutput4 -> {
            readerOutput4.createOutputForSplit("one").collect(0, 150L);
        }, readerOutput5 -> {
            readerOutput5.releaseOutputForSplit("one");
        }, readerOutput6 -> {
            readerOutput6.createOutputForSplit("two").collect(0, 200L);
        }), Matchers.contains(new Watermark[]{new Watermark(100L), new Watermark(150L), new Watermark(200L)}));
    }

    @SafeVarargs
    private final List<Watermark> testSequenceOfWatermarks(WatermarkStrategy<Integer> watermarkStrategy, Consumer<ReaderOutput<Integer>>... consumerArr) throws Exception {
        return (List) testSequenceOfEvents(watermarkStrategy, consumerArr).stream().filter(obj -> {
            return obj instanceof org.apache.flink.streaming.api.watermark.Watermark;
        }).map(obj2 -> {
            return new Watermark(((org.apache.flink.streaming.api.watermark.Watermark) obj2).getTimestamp());
        }).collect(Collectors.toList());
    }

    @SafeVarargs
    private final List<Object> testSequenceOfEvents(WatermarkStrategy<Integer> watermarkStrategy, Consumer<ReaderOutput<Integer>>... consumerArr) throws Exception {
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(2147483647L);
        SourceOperator createTestOperator = createTestOperator(new InterpretingSourceReader(consumerArr), watermarkStrategy, testProcessingTimeService);
        while (createTestOperator.emitNext(collectingDataOutput) != InputStatus.END_OF_INPUT) {
            testProcessingTimeService.setCurrentTime(testProcessingTimeService.getCurrentProcessingTime() + 100);
        }
        return collectingDataOutput.events;
    }

    private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(SourceReader<T, MockSourceSplit> sourceReader, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService processingTimeService) throws Exception {
        StateInitializationContextImpl stateInitializationContextImpl = new StateInitializationContextImpl(false, new MemoryStateBackend().createOperatorStateBackend(new MockEnvironmentBuilder().build(), "test-operator", Collections.emptyList(), new CloseableRegistry()), (KeyedStateStore) null, (Iterable) null, (Iterable) null);
        TestingSourceOperator testingSourceOperator = new TestingSourceOperator(sourceReader, watermarkStrategy, processingTimeService);
        testingSourceOperator.initializeState(stateInitializationContextImpl);
        testingSourceOperator.open();
        return testingSourceOperator;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -603050147:
                if (implMethodName.equals("lambda$testMainOutputEventWatermarks$400354f4$1")) {
                    z = false;
                    break;
                }
                break;
            case 1063988769:
                if (implMethodName.equals("lambda$testPerSplitOutputEventWatermarks$400354f4$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1799219722:
                if (implMethodName.equals("lambda$testMainOutputPeriodicWatermarks$400354f4$1")) {
                    z = true;
                    break;
                }
                break;
            case 1848722630:
                if (implMethodName.equals("lambda$testPerSplitOutputPeriodicWatermarks$400354f4$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context -> {
                        return new OnEventTestWatermarkGenerator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context2 -> {
                        return new OnPeriodicTestWatermarkGenerator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context3 -> {
                        return new OnPeriodicTestWatermarkGenerator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context4 -> {
                        return new OnEventTestWatermarkGenerator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
