/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.watermarkstatus;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.junit.Assert;
import org.junit.Test;

public class StatusWatermarkValveTest {
    @Test
    public void testSingleInputIncreasingWatermarks() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(1);
        valve.inputWatermark(new Watermark(0L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(0L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(25L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testSingleInputDecreasingWatermarksYieldsNoOutput() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(1);
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(25L), (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(18L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(42L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(42L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testSingleInputWatermarkStatusToggling() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(1);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)WatermarkStatus.IDLE, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)WatermarkStatus.ACTIVE, (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testSingleInputWatermarksIntactDuringIdleness() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(1);
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(25L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)WatermarkStatus.IDLE, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(50L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals((long)25L, (long)valve.getInputChannelStatus((int)0).watermark);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)WatermarkStatus.ACTIVE, (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(50L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(50L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputYieldsWatermarkOnlyWhenAllChannelsReceivesWatermarks() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(0L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(0L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(0L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(0L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputIncreasingWatermarks() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(0L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(0L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(0L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(0L), (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(12L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(8L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(15L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(10L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(17L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(12L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(20L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(15L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputDecreasingWatermarksYieldsNoOutput() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(17L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(10L), (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(12L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(8L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(15L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputWatermarkStatusToggling() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(2);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)WatermarkStatus.IDLE, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)WatermarkStatus.ACTIVE, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputWatermarkAdvancingWithPartiallyIdleChannels() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(15L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(10L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(18L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(15L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(20L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(18L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(17L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(10L), (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(17L), (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(25L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputFlushMaxWatermarkAndWatermarkStatusOnceAllInputsBecomeIdle() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(10L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(5L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(3L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(3L), (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(10L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals((Object)WatermarkStatus.IDLE, (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputWatermarkRealignmentAfterResumeActive() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(10L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(7L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(3L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(3L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(7L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertFalse((boolean)valve.getInputChannelStatus((int)2).isWatermarkAligned);
        valve.inputWatermark(new Watermark(5L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((long)5L, (long)valve.getInputChannelStatus((int)2).watermark);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(9L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertTrue((boolean)valve.getInputChannelStatus((int)2).isWatermarkAligned);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermark(new Watermark(12L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(9L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    @Test
    public void testNoOutputWhenAllActiveChannelsAreUnaligned() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(10L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(7L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals((Object)new Watermark(7L), (Object)valveOutput.popLastSeenOutput());
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assert.assertEquals(null, (Object)valveOutput.popLastSeenOutput());
    }

    private static class StatusWatermarkOutput
    implements PushingAsyncDataInput.DataOutput {
        private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<StreamElement>();

        private StatusWatermarkOutput() {
        }

        public void emitWatermark(Watermark watermark) {
            this.allOutputs.add((StreamElement)watermark);
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
            this.allOutputs.add((StreamElement)watermarkStatus);
        }

        public void emitRecord(StreamRecord record) {
            throw new UnsupportedOperationException();
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            throw new UnsupportedOperationException();
        }

        public StreamElement popLastSeenOutput() {
            return (StreamElement)this.allOutputs.poll();
        }
    }
}

