package org.apache.flink.streaming.runtime.streamstatus;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.class */
public class StatusWatermarkValveTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest$BufferedValveOutputHandler.class */
    private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
        private BlockingQueue<Watermark> outputWatermarks;
        private BlockingQueue<StreamStatus> outputStreamStatuses;

        private BufferedValveOutputHandler() {
            this.outputWatermarks = new LinkedBlockingQueue();
            this.outputStreamStatuses = new LinkedBlockingQueue();
        }

        public void handleWatermark(Watermark watermark) {
            this.outputWatermarks.add(watermark);
        }

        public void handleStreamStatus(StreamStatus streamStatus) {
            this.outputStreamStatuses.add(streamStatus);
        }

        public Watermark popLastOutputWatermark() {
            return this.outputWatermarks.poll();
        }

        public StreamStatus popLastOutputStreamStatus() {
            return this.outputStreamStatuses.poll();
        }

        public boolean hasNoOutputWatermarks() {
            return this.outputWatermarks.size() == 0;
        }

        public boolean hasNoOutputStreamStatuses() {
            return this.outputStreamStatuses.size() == 0;
        }
    }

    @Test
    public void testAllInputChannelsStartAsActive() {
        BufferedValveOutputHandler bufferedValveOutputHandler = new BufferedValveOutputHandler();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(4, bufferedValveOutputHandler);
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 3);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 1);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 2);
        Assert.assertEquals(StreamStatus.IDLE, bufferedValveOutputHandler.popLastOutputStreamStatus());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
    }

    @Test
    public void testOneInputValve() {
        BufferedValveOutputHandler bufferedValveOutputHandler = new BufferedValveOutputHandler();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1, bufferedValveOutputHandler);
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(0L), 0);
        Assert.assertEquals(new Watermark(0L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0);
        Assert.assertEquals(new Watermark(25L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(18L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(42L), 0);
        Assert.assertEquals(new Watermark(42L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        Assert.assertEquals(StreamStatus.IDLE, bufferedValveOutputHandler.popLastOutputStreamStatus());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(52L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(60L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 0);
        Assert.assertEquals(StreamStatus.ACTIVE, bufferedValveOutputHandler.popLastOutputStreamStatus());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(40L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(68L), 0);
        Assert.assertEquals(new Watermark(68L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(72L), 0);
        Assert.assertEquals(new Watermark(72L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
    }

    @Test
    public void testMultipleInputValve() {
        BufferedValveOutputHandler bufferedValveOutputHandler = new BufferedValveOutputHandler();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, bufferedValveOutputHandler);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(0L), 1);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(0L), 2);
        Assert.assertEquals(new Watermark(0L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(12L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(8L), 2);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(10L), 2);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(15L), 1);
        Assert.assertEquals(new Watermark(10L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(6L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 2);
        Assert.assertEquals(new Watermark(12L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(17L), 0);
        Assert.assertEquals(new Watermark(15L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(20L), 1);
        Assert.assertEquals(new Watermark(20L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 2);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(18L), 2);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(22L), 1);
        Assert.assertEquals(new Watermark(22L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(28L), 0);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(33L), 1);
        Assert.assertEquals(new Watermark(28L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(30L), 2);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(31L), 0);
        Assert.assertEquals(new Watermark(30L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(34L), 2);
        Assert.assertEquals(new Watermark(31L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        Assert.assertEquals(new Watermark(33L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 2);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 1);
        Assert.assertEquals(StreamStatus.IDLE, bufferedValveOutputHandler.popLastOutputStreamStatus());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 0);
        Assert.assertEquals(StreamStatus.ACTIVE, bufferedValveOutputHandler.popLastOutputStreamStatus());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(36L), 0);
        Assert.assertEquals(new Watermark(36L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 1);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(35L), 1);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(37L), 0);
        Assert.assertEquals(new Watermark(37L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(38L), 1);
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
        statusWatermarkValve.inputWatermark(new Watermark(40L), 0);
        Assert.assertEquals(new Watermark(38L), bufferedValveOutputHandler.popLastOutputWatermark());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputWatermarks());
        Assert.assertTrue(bufferedValveOutputHandler.hasNoOutputStreamStatuses());
    }
}
