package org.apache.flink.streaming.io;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/io/BarrierBufferTest.class */
public class BarrierBufferTest {

    /* loaded from: input_file:org/apache/flink/streaming/io/BarrierBufferTest$MockInputGate.class */
    protected static class MockInputGate implements InputGate {
        private int numChannels;
        private Queue<BufferOrEvent> boes;

        public MockInputGate(int i, List<BufferOrEvent> list) {
            this.numChannels = i;
            this.boes = new LinkedList(list);
        }

        public int getNumberOfInputChannels() {
            return this.numChannels;
        }

        public boolean isFinished() {
            return this.boes.isEmpty();
        }

        public void requestPartitions() throws IOException, InterruptedException {
        }

        public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
            return this.boes.remove();
        }

        public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        }

        public void registerListener(EventListener<InputGate> eventListener) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/io/BarrierBufferTest$MockReader.class */
    protected static class MockReader extends AbstractReader {
        /* JADX INFO: Access modifiers changed from: protected */
        public MockReader(InputGate inputGate) {
            super(inputGate);
        }
    }

    @Test
    public void testWithoutBarriers() throws IOException, InterruptedException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(createBuffer(0));
        linkedList.add(createBuffer(0));
        linkedList.add(createBuffer(0));
        MockInputGate mockInputGate = new MockInputGate(1, linkedList);
        BarrierBuffer barrierBuffer = new BarrierBuffer(mockInputGate, new MockReader(mockInputGate));
        Assert.assertEquals(linkedList.get(0), barrierBuffer.getNextNonBlocked());
        Assert.assertEquals(linkedList.get(1), barrierBuffer.getNextNonBlocked());
        Assert.assertEquals(linkedList.get(2), barrierBuffer.getNextNonBlocked());
        barrierBuffer.cleanup();
    }

    @Test
    public void testOneChannelBarrier() throws IOException, InterruptedException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(createBuffer(0));
        linkedList.add(createBuffer(0));
        linkedList.add(createSuperstep(1L, 0));
        linkedList.add(createBuffer(0));
        linkedList.add(createBuffer(0));
        linkedList.add(createSuperstep(2L, 0));
        linkedList.add(createBuffer(0));
        MockInputGate mockInputGate = new MockInputGate(1, linkedList);
        BarrierBuffer barrierBuffer = new BarrierBuffer(mockInputGate, new MockReader(mockInputGate));
        Assert.assertEquals(linkedList.get(0), barrierBuffer.getNextNonBlocked());
        Assert.assertEquals(linkedList.get(1), barrierBuffer.getNextNonBlocked());
        Object obj = linkedList.get(2);
        BufferOrEvent nextNonBlocked = barrierBuffer.getNextNonBlocked();
        Assert.assertEquals(obj, nextNonBlocked);
        barrierBuffer.processSuperstep(nextNonBlocked);
        Assert.assertEquals(linkedList.get(3), barrierBuffer.getNextNonBlocked());
        Assert.assertEquals(linkedList.get(4), barrierBuffer.getNextNonBlocked());
        Object obj2 = linkedList.get(5);
        BufferOrEvent nextNonBlocked2 = barrierBuffer.getNextNonBlocked();
        Assert.assertEquals(obj2, nextNonBlocked2);
        barrierBuffer.processSuperstep(nextNonBlocked2);
        Assert.assertEquals(linkedList.get(6), barrierBuffer.getNextNonBlocked());
        barrierBuffer.cleanup();
    }

    @Test
    public void testMultiChannelBarrier() throws IOException, InterruptedException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(createBuffer(0));
        linkedList.add(createBuffer(1));
        linkedList.add(createSuperstep(1L, 0));
        linkedList.add(createSuperstep(2L, 0));
        linkedList.add(createBuffer(0));
        linkedList.add(createSuperstep(3L, 0));
        linkedList.add(createBuffer(0));
        linkedList.add(createBuffer(1));
        linkedList.add(createSuperstep(1L, 1));
        linkedList.add(createBuffer(0));
        linkedList.add(createBuffer(1));
        linkedList.add(createSuperstep(2L, 1));
        linkedList.add(createSuperstep(3L, 1));
        linkedList.add(createSuperstep(4L, 0));
        linkedList.add(createBuffer(0));
        linkedList.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
        MockInputGate mockInputGate = new MockInputGate(2, linkedList);
        BarrierBuffer barrierBuffer = new BarrierBuffer(mockInputGate, new MockReader(mockInputGate));
        check((BufferOrEvent) linkedList.get(0), barrierBuffer.getNextNonBlocked());
        check((BufferOrEvent) linkedList.get(1), barrierBuffer.getNextNonBlocked());
        BufferOrEvent bufferOrEvent = (BufferOrEvent) linkedList.get(2);
        BufferOrEvent nextNonBlocked = barrierBuffer.getNextNonBlocked();
        check(bufferOrEvent, nextNonBlocked);
        barrierBuffer.processSuperstep(nextNonBlocked);
        check((BufferOrEvent) linkedList.get(7), barrierBuffer.getNextNonBlocked());
        BufferOrEvent bufferOrEvent2 = (BufferOrEvent) linkedList.get(8);
        BufferOrEvent nextNonBlocked2 = barrierBuffer.getNextNonBlocked();
        check(bufferOrEvent2, nextNonBlocked2);
        barrierBuffer.processSuperstep(nextNonBlocked2);
        BufferOrEvent bufferOrEvent3 = (BufferOrEvent) linkedList.get(3);
        BufferOrEvent nextNonBlocked3 = barrierBuffer.getNextNonBlocked();
        check(bufferOrEvent3, nextNonBlocked3);
        barrierBuffer.processSuperstep(nextNonBlocked3);
        check((BufferOrEvent) linkedList.get(10), barrierBuffer.getNextNonBlocked());
        BufferOrEvent bufferOrEvent4 = (BufferOrEvent) linkedList.get(11);
        BufferOrEvent nextNonBlocked4 = barrierBuffer.getNextNonBlocked();
        check(bufferOrEvent4, nextNonBlocked4);
        barrierBuffer.processSuperstep(nextNonBlocked4);
        check((BufferOrEvent) linkedList.get(4), barrierBuffer.getNextNonBlocked());
        BufferOrEvent bufferOrEvent5 = (BufferOrEvent) linkedList.get(5);
        BufferOrEvent nextNonBlocked5 = barrierBuffer.getNextNonBlocked();
        check(bufferOrEvent5, nextNonBlocked5);
        barrierBuffer.processSuperstep(nextNonBlocked5);
        BufferOrEvent bufferOrEvent6 = (BufferOrEvent) linkedList.get(12);
        BufferOrEvent nextNonBlocked6 = barrierBuffer.getNextNonBlocked();
        check(bufferOrEvent6, nextNonBlocked6);
        barrierBuffer.processSuperstep(nextNonBlocked6);
        check((BufferOrEvent) linkedList.get(6), barrierBuffer.getNextNonBlocked());
        check((BufferOrEvent) linkedList.get(9), barrierBuffer.getNextNonBlocked());
        BufferOrEvent bufferOrEvent7 = (BufferOrEvent) linkedList.get(13);
        BufferOrEvent nextNonBlocked7 = barrierBuffer.getNextNonBlocked();
        check(bufferOrEvent7, nextNonBlocked7);
        barrierBuffer.processSuperstep(nextNonBlocked7);
        check((BufferOrEvent) linkedList.get(14), barrierBuffer.getNextNonBlocked());
        check((BufferOrEvent) linkedList.get(15), barrierBuffer.getNextNonBlocked());
        barrierBuffer.cleanup();
    }

    private static void check(BufferOrEvent bufferOrEvent, BufferOrEvent bufferOrEvent2) {
        Assert.assertEquals(Boolean.valueOf(bufferOrEvent.isBuffer()), Boolean.valueOf(bufferOrEvent2.isBuffer()));
        Assert.assertEquals(bufferOrEvent.getChannelIndex(), bufferOrEvent2.getChannelIndex());
        if (bufferOrEvent.isEvent()) {
            Assert.assertEquals(bufferOrEvent.getEvent(), bufferOrEvent2.getEvent());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static BufferOrEvent createSuperstep(long j, int i) {
        return new BufferOrEvent(new StreamingSuperstep(j), i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static BufferOrEvent createBuffer(int i) {
        return new BufferOrEvent(new Buffer(new MemorySegment(new byte[]{1}), new BufferRecycler() { // from class: org.apache.flink.streaming.io.BarrierBufferTest.1
            public void recycle(MemorySegment memorySegment) {
            }
        }), i);
    }
}
