package org.apache.flink.connector.base.source.reader;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Optional;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.class */
public class AlignedWatermarksITCase {
    public static final String SLOW_SOURCE_NAME = "SlowNumberSequenceSource";
    public static final String FAST_SOURCE_NAME = "FastNumberSequenceSource";
    public static final int MAX_DRIFT = 10;

    @RegisterExtension
    LoggerAuditingExtension loggerAuditingExtension = new LoggerAuditingExtension(AlignedWatermarksITCase.class, Level.INFO);
    private static final Duration UPDATE_INTERVAL = Duration.ofMillis(100);
    private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setConfiguration(reporter.addToConfiguration(new Configuration())).build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase$PunctuatedGenerator.class */
    public static class PunctuatedGenerator implements WatermarkGenerator<Long> {
        private PunctuatedGenerator() {
        }

        public void onEvent(Long l, long j, WatermarkOutput watermarkOutput) {
            watermarkOutput.emitWatermark(new Watermark(j));
        }

        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        }
    }

    @Test
    public void testAlignment(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
        JobID jobID = ((JobSubmissionResult) miniCluster.submitJob(getJobGraph()).get()).getJobID();
        CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false);
        long j = Long.MAX_VALUE;
        do {
            Optional findMetric = reporter.findMetric(jobID, "FastNumberSequenceSource.*watermarkAlignmentDrift");
            Thread.sleep(200L);
            Optional map = findMetric.map(metric -> {
                return (Long) ((Gauge) metric).getValue();
            });
            if (map.isPresent()) {
                Long l = (Long) map.get();
                Assertions.assertThat(l).isLessThanOrEqualTo(j);
                j = l.longValue();
            }
        } while (j >= 10);
    }

    private JobGraph getJobGraph() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().setAutoWatermarkInterval(0L);
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromSource(new NumberSequenceSource(0L, Long.MAX_VALUE), WatermarkStrategy.forGenerator(context -> {
            return new PunctuatedGenerator();
        }).withWatermarkAlignment("group-1", Duration.ofMillis(10L), UPDATE_INTERVAL).withTimestampAssigner((l, j) -> {
            return l.longValue();
        }), SLOW_SOURCE_NAME).map(new RichMapFunction<Long, Long>() { // from class: org.apache.flink.connector.base.source.reader.AlignedWatermarksITCase.1
            public Long map(Long l2) throws Exception {
                Thread.sleep(10L);
                return l2;
            }
        }).union(new DataStream[]{executionEnvironment.fromSource(new NumberSequenceSource(0L, Long.MAX_VALUE), WatermarkStrategy.forGenerator(context2 -> {
            return new PunctuatedGenerator();
        }).withWatermarkAlignment("group-1", Duration.ofMillis(10L), UPDATE_INTERVAL).withTimestampAssigner((l2, j2) -> {
            return l2.longValue();
        }), FAST_SOURCE_NAME).map(new RichMapFunction<Long, Long>() { // from class: org.apache.flink.connector.base.source.reader.AlignedWatermarksITCase.2
            public Long map(Long l3) throws Exception {
                Thread.sleep(1L);
                return l3;
            }
        })}).sinkTo(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -560290137:
                if (implMethodName.equals("lambda$getJobGraph$63abb1a4$1")) {
                    z = 2;
                    break;
                }
                break;
            case 169048480:
                if (implMethodName.equals("lambda$getJobGraph$bd7fe250$1")) {
                    z = false;
                    break;
                }
                break;
            case 600400934:
                if (implMethodName.equals("lambda$getJobGraph$b2d69d0b$1")) {
                    z = 3;
                    break;
                }
                break;
            case 2130818329:
                if (implMethodName.equals("lambda$getJobGraph$6e54f6e9$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context2 -> {
                        return new PunctuatedGenerator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;J)J")) {
                    return (l2, j2) -> {
                        return l2.longValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;J)J")) {
                    return (l, j) -> {
                        return l.longValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context -> {
                        return new PunctuatedGenerator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
