package org.apache.flink.api.common.eventtime;

import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.class */
class WatermarkOutputMultiplexerTest {
    WatermarkOutputMultiplexerTest() {
    }

    @Test
    void singleImmediateWatermark() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        createImmediateOutput(new WatermarkOutputMultiplexer(createTestingWatermarkOutput)).emitWatermark(new Watermark(0L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(0L));
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isFalse();
    }

    @Test
    void singleImmediateIdleness() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        createImmediateOutput(new WatermarkOutputMultiplexer(createTestingWatermarkOutput)).markIdle();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isNull();
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isTrue();
    }

    @Test
    void singleImmediateWatermarkAfterIdleness() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutput createImmediateOutput = createImmediateOutput(new WatermarkOutputMultiplexer(createTestingWatermarkOutput));
        createImmediateOutput.markIdle();
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isTrue();
        createImmediateOutput.emitWatermark(new Watermark(0L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(0L));
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isFalse();
    }

    @Test
    void multipleImmediateWatermark() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        WatermarkOutput createImmediateOutput = createImmediateOutput(watermarkOutputMultiplexer);
        WatermarkOutput createImmediateOutput2 = createImmediateOutput(watermarkOutputMultiplexer);
        WatermarkOutput createImmediateOutput3 = createImmediateOutput(watermarkOutputMultiplexer);
        createImmediateOutput.emitWatermark(new Watermark(2L));
        createImmediateOutput2.emitWatermark(new Watermark(5L));
        createImmediateOutput3.markIdle();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(2L));
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isFalse();
    }

    @Test
    void whenImmediateOutputBecomesIdleWatermarkAdvances() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        WatermarkOutput createImmediateOutput = createImmediateOutput(watermarkOutputMultiplexer);
        WatermarkOutput createImmediateOutput2 = createImmediateOutput(watermarkOutputMultiplexer);
        createImmediateOutput.emitWatermark(new Watermark(2L));
        createImmediateOutput2.emitWatermark(new Watermark(5L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(2L));
        createImmediateOutput.markIdle();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(5L));
    }

    @Test
    void combinedWatermarkDoesNotRegressWhenIdleOutputRegresses() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        WatermarkOutput createImmediateOutput = createImmediateOutput(watermarkOutputMultiplexer);
        WatermarkOutput createImmediateOutput2 = createImmediateOutput(watermarkOutputMultiplexer);
        createImmediateOutput.emitWatermark(new Watermark(2L));
        createImmediateOutput2.emitWatermark(new Watermark(5L));
        createImmediateOutput.markIdle();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(5L));
        createImmediateOutput.emitWatermark(new Watermark(3L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(5L));
    }

    @Test
    void noCombinedDeferredUpdateWhenWeHaveZeroOutputs() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        new WatermarkOutputMultiplexer(createTestingWatermarkOutput).onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isNull();
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isFalse();
    }

    @Test
    void deferredOutputDoesNotImmediatelyAdvanceWatermark() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        WatermarkOutput createDeferredOutput = createDeferredOutput(watermarkOutputMultiplexer);
        WatermarkOutput createDeferredOutput2 = createDeferredOutput(watermarkOutputMultiplexer);
        createDeferredOutput.emitWatermark(new Watermark(0L));
        createDeferredOutput2.emitWatermark(new Watermark(1L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isNull();
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(0L));
    }

    @Test
    void singleDeferredWatermark() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        createDeferredOutput(watermarkOutputMultiplexer).emitWatermark(new Watermark(0L));
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(0L));
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isFalse();
    }

    @Test
    void singleDeferredIdleness() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        createDeferredOutput(watermarkOutputMultiplexer).markIdle();
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isNull();
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isTrue();
    }

    @Test
    void singleDeferredWatermarkAfterIdleness() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        WatermarkOutput createDeferredOutput = createDeferredOutput(watermarkOutputMultiplexer);
        createDeferredOutput.markIdle();
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isTrue();
        createDeferredOutput.emitWatermark(new Watermark(0L));
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(0L));
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isFalse();
    }

    @Test
    void multipleDeferredWatermark() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        WatermarkOutput createDeferredOutput = createDeferredOutput(watermarkOutputMultiplexer);
        WatermarkOutput createDeferredOutput2 = createDeferredOutput(watermarkOutputMultiplexer);
        WatermarkOutput createDeferredOutput3 = createDeferredOutput(watermarkOutputMultiplexer);
        createDeferredOutput.emitWatermark(new Watermark(2L));
        createDeferredOutput2.emitWatermark(new Watermark(5L));
        createDeferredOutput3.markIdle();
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(2L));
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isFalse();
    }

    @Test
    void immediateUpdatesTakeDeferredUpdatesIntoAccount() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        WatermarkOutput createImmediateOutput = createImmediateOutput(watermarkOutputMultiplexer);
        createDeferredOutput(watermarkOutputMultiplexer).emitWatermark(new Watermark(5L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isNull();
        createImmediateOutput.emitWatermark(new Watermark(2L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(2L));
    }

    @Test
    void immediateUpdateOnSameOutputAsDeferredUpdateDoesNotRegress() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        watermarkOutputMultiplexer.registerNewOutput("test-id", j -> {
        });
        WatermarkOutput immediateOutput = watermarkOutputMultiplexer.getImmediateOutput("test-id");
        watermarkOutputMultiplexer.getDeferredOutput("test-id").emitWatermark(new Watermark(5L));
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(5L));
        immediateOutput.emitWatermark(new Watermark(2L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(5L));
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(new Watermark(5L));
    }

    @Test
    void lowerImmediateUpdateOnSameOutputDoesNotEmitCombinedUpdate() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        watermarkOutputMultiplexer.registerNewOutput("1234-test", j -> {
        });
        WatermarkOutput immediateOutput = watermarkOutputMultiplexer.getImmediateOutput("1234-test");
        watermarkOutputMultiplexer.getDeferredOutput("1234-test").emitWatermark(new Watermark(5L));
        immediateOutput.emitWatermark(new Watermark(2L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isNull();
    }

    @Test
    void testRemoveUnblocksWatermarks() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        watermarkOutputMultiplexer.registerNewOutput("lower", j -> {
        });
        watermarkOutputMultiplexer.registerNewOutput("higher", j2 -> {
        });
        watermarkOutputMultiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(156765L));
        watermarkOutputMultiplexer.unregisterOutput("lower");
        watermarkOutputMultiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(156775L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark().getTimestamp()).isEqualTo(156775L);
    }

    @Test
    void testRemoveOfLowestDoesNotImmediatelyAdvanceWatermark() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        watermarkOutputMultiplexer.registerNewOutput("lower", j -> {
        });
        watermarkOutputMultiplexer.registerNewOutput("higher", j2 -> {
        });
        watermarkOutputMultiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(-4343L));
        watermarkOutputMultiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(-4333L));
        watermarkOutputMultiplexer.unregisterOutput("lower");
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark().getTimestamp()).isEqualTo(-4343L);
    }

    @Test
    void testRemoveOfHighestDoesNotRetractWatermark() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        watermarkOutputMultiplexer.registerNewOutput("higher", j -> {
        });
        watermarkOutputMultiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(2L));
        watermarkOutputMultiplexer.unregisterOutput("higher");
        watermarkOutputMultiplexer.registerNewOutput("lower", j2 -> {
        });
        watermarkOutputMultiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(1L));
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark().getTimestamp()).isEqualTo(2L);
    }

    @Test
    void testRemoveRegisteredReturnValue() {
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput());
        watermarkOutputMultiplexer.registerNewOutput("does-exist", j -> {
        });
        Assertions.assertThat(watermarkOutputMultiplexer.unregisterOutput("does-exist")).isTrue();
    }

    @Test
    void testRemoveNotRegisteredReturnValue() {
        Assertions.assertThat(new WatermarkOutputMultiplexer(createTestingWatermarkOutput()).unregisterOutput("does-not-exist")).isFalse();
    }

    @Test
    void testNotEmittingIdleAfterAllSplitsRemoved() {
        TestingWatermarkOutput createTestingWatermarkOutput = createTestingWatermarkOutput();
        WatermarkOutputMultiplexer watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(createTestingWatermarkOutput);
        Watermark watermark = new Watermark(1L);
        String uuid = UUID.randomUUID().toString();
        watermarkOutputMultiplexer.registerNewOutput(uuid, j -> {
        });
        watermarkOutputMultiplexer.getImmediateOutput(uuid).emitWatermark(watermark);
        watermarkOutputMultiplexer.unregisterOutput(uuid);
        watermarkOutputMultiplexer.onPeriodicEmit();
        Assertions.assertThat(createTestingWatermarkOutput.lastWatermark()).isEqualTo(watermark);
        Assertions.assertThat(createTestingWatermarkOutput.isIdle()).isFalse();
    }

    private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer watermarkOutputMultiplexer) {
        String uuid = UUID.randomUUID().toString();
        watermarkOutputMultiplexer.registerNewOutput(uuid, j -> {
        });
        return watermarkOutputMultiplexer.getImmediateOutput(uuid);
    }

    private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer watermarkOutputMultiplexer) {
        String uuid = UUID.randomUUID().toString();
        watermarkOutputMultiplexer.registerNewOutput(uuid, j -> {
        });
        return watermarkOutputMultiplexer.getDeferredOutput(uuid);
    }

    private static TestingWatermarkOutput createTestingWatermarkOutput() {
        return new TestingWatermarkOutput();
    }
}
