package org.apache.beam.runners.spark;

import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.RegexMatcher;
import org.hamcrest.core.IsEqual;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/beam/runners/spark/GlobalWatermarkHolderTest.class */
public class GlobalWatermarkHolderTest {

    @Rule
    public ClearWatermarksRule clearWatermarksRule = new ClearWatermarksRule();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public ReuseSparkContextRule reuseContext = ReuseSparkContextRule.yes();
    private static final SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
    private static final String INSTANT_PATTERN = "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z";

    @Test
    public void testLowHighWatermarksAdvance() {
        SparkContextFactory.getSparkContext(options);
        Instant instant = new Instant(0L);
        GlobalWatermarkHolder.add(1, new GlobalWatermarkHolder.SparkWatermarks(instant.plus(Duration.millis(5L)), instant.plus(Duration.millis(5L)), instant));
        GlobalWatermarkHolder.advance();
        GlobalWatermarkHolder.add(1, new GlobalWatermarkHolder.SparkWatermarks(instant.plus(Duration.millis(10L)), instant.plus(Duration.millis(15L)), instant.plus(Duration.millis(100L))));
        GlobalWatermarkHolder.advance();
        GlobalWatermarkHolder.SparkWatermarks sparkWatermarks = (GlobalWatermarkHolder.SparkWatermarks) GlobalWatermarkHolder.get(0L).get(1);
        Assert.assertThat(sparkWatermarks.getLowWatermark(), IsEqual.equalTo(instant.plus(Duration.millis(10L))));
        Assert.assertThat(sparkWatermarks.getHighWatermark(), IsEqual.equalTo(instant.plus(Duration.millis(15L))));
        Assert.assertThat(sparkWatermarks.getSynchronizedProcessingTime(), IsEqual.equalTo(instant.plus(Duration.millis(100L))));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage(RegexMatcher.matches("Low watermark [0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z cannot be later then high watermark [0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z"));
        GlobalWatermarkHolder.add(1, new GlobalWatermarkHolder.SparkWatermarks(instant.plus(Duration.millis(25L)), instant.plus(Duration.millis(20L)), instant.plus(Duration.millis(200L))));
        GlobalWatermarkHolder.advance();
    }

    @Test
    public void testSynchronizedTimeMonotonic() {
        SparkContextFactory.getSparkContext(options);
        Instant instant = new Instant(0L);
        GlobalWatermarkHolder.add(1, new GlobalWatermarkHolder.SparkWatermarks(instant.plus(Duration.millis(5L)), instant.plus(Duration.millis(10L)), instant));
        GlobalWatermarkHolder.advance();
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("Synchronized processing time must advance.");
        GlobalWatermarkHolder.add(1, new GlobalWatermarkHolder.SparkWatermarks(instant.plus(Duration.millis(5L)), instant.plus(Duration.millis(10L)), instant));
        GlobalWatermarkHolder.advance();
    }

    @Test
    public void testMultiSource() {
        SparkContextFactory.getSparkContext(options);
        Instant instant = new Instant(0L);
        GlobalWatermarkHolder.add(1, new GlobalWatermarkHolder.SparkWatermarks(instant.plus(Duration.millis(5L)), instant.plus(Duration.millis(10L)), instant));
        GlobalWatermarkHolder.add(2, new GlobalWatermarkHolder.SparkWatermarks(instant.plus(Duration.millis(3L)), instant.plus(Duration.millis(6L)), instant));
        GlobalWatermarkHolder.advance();
        GlobalWatermarkHolder.SparkWatermarks sparkWatermarks = (GlobalWatermarkHolder.SparkWatermarks) GlobalWatermarkHolder.get(0L).get(1);
        Assert.assertThat(sparkWatermarks.getLowWatermark(), IsEqual.equalTo(instant.plus(Duration.millis(5L))));
        Assert.assertThat(sparkWatermarks.getHighWatermark(), IsEqual.equalTo(instant.plus(Duration.millis(10L))));
        GlobalWatermarkHolder.SparkWatermarks sparkWatermarks2 = (GlobalWatermarkHolder.SparkWatermarks) GlobalWatermarkHolder.get(0L).get(2);
        Assert.assertThat(sparkWatermarks2.getLowWatermark(), IsEqual.equalTo(instant.plus(Duration.millis(3L))));
        Assert.assertThat(sparkWatermarks2.getHighWatermark(), IsEqual.equalTo(instant.plus(Duration.millis(6L))));
    }
}
