package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/BufferStorageTestBase.class */
public abstract class BufferStorageTestBase {
    protected static final int PAGE_SIZE = 4096;

    abstract BufferStorage createBufferStorage();

    @Test
    public void testRollOverEmptySequences() throws IOException {
        BufferStorage createBufferStorage = createBufferStorage();
        createBufferStorage.rollOver();
        Assert.assertFalse(createBufferStorage.pollNext().isPresent());
        createBufferStorage.rollOver();
        Assert.assertFalse(createBufferStorage.pollNext().isPresent());
        createBufferStorage.rollOver();
        Assert.assertFalse(createBufferStorage.pollNext().isPresent());
    }

    @Test
    public void testSpillAndRollOverSimple() throws IOException {
        BufferOrEvent generateRandomBuffer;
        Random random = new Random();
        Random random2 = new Random();
        BufferStorage createBufferStorage = createBufferStorage();
        for (int i = 0; i < 5; i++) {
            long nextLong = random.nextLong();
            random2.setSeed(nextLong);
            int nextInt = random.nextInt(3000) + 1;
            int nextInt2 = random.nextInt(1656) + 1;
            ArrayList arrayList = new ArrayList(128);
            for (int i2 = 0; i2 < nextInt; i2++) {
                if (random.nextDouble() < 0.05d) {
                    generateRandomBuffer = generateRandomEvent(random, nextInt2);
                    arrayList.add(generateRandomBuffer);
                } else {
                    generateRandomBuffer = generateRandomBuffer(random2.nextInt(PAGE_SIZE) + 1, random2.nextInt(nextInt2));
                }
                createBufferStorage.add(generateRandomBuffer);
            }
            random2.setSeed(nextLong);
            createBufferStorage.rollOver();
            int i3 = 0;
            for (int i4 = 0; i4 < nextInt; i4++) {
                Assert.assertFalse(createBufferStorage.isEmpty());
                Optional pollNext = createBufferStorage.pollNext();
                Assert.assertTrue(pollNext.isPresent());
                BufferOrEvent bufferOrEvent = (BufferOrEvent) pollNext.get();
                if (bufferOrEvent.isEvent()) {
                    int i5 = i3;
                    i3++;
                    Assert.assertEquals(((BufferOrEvent) arrayList.get(i5)).getEvent(), bufferOrEvent.getEvent());
                    Assert.assertEquals(r0.getChannelIndex(), bufferOrEvent.getChannelIndex());
                } else {
                    validateBuffer(bufferOrEvent, random2.nextInt(PAGE_SIZE) + 1, random2.nextInt(nextInt2));
                }
            }
            Assert.assertFalse(createBufferStorage.pollNext().isPresent());
            Assert.assertTrue(createBufferStorage.isEmpty());
            Assert.assertEquals(arrayList.size(), i3);
        }
    }

    @Test
    public void testSpillWhileReading() throws IOException {
        ArrayDeque arrayDeque;
        BufferOrEvent generateRandomBuffer;
        Random random = new Random();
        ArrayDeque arrayDeque2 = new ArrayDeque();
        ArrayDeque arrayDeque3 = new ArrayDeque();
        BufferStorage createBufferStorage = createBufferStorage();
        for (int i = 0; i < 20; i++) {
            if (i % 2 == 1) {
                createBufferStorage.rollOver();
                arrayDeque2.addFirst(arrayDeque3);
                arrayDeque = new ArrayDeque();
            } else {
                Random random2 = new Random(random.nextLong());
                int nextInt = random.nextInt(300) + 1;
                int nextInt2 = random.nextInt(1656) + 1;
                ArrayList arrayList = new ArrayList(128);
                int i2 = 0;
                while (i2 < nextInt) {
                    if (random.nextDouble() < 0.5d) {
                        if (random.nextDouble() < 0.05d) {
                            generateRandomBuffer = generateRandomEvent(random, nextInt2);
                            arrayList.add(generateRandomBuffer);
                        } else {
                            generateRandomBuffer = generateRandomBuffer(random2.nextInt(PAGE_SIZE) + 1, random2.nextInt(nextInt2));
                        }
                        createBufferStorage.add(generateRandomBuffer);
                        arrayDeque3.addLast(generateRandomBuffer);
                        i2++;
                    } else {
                        createBufferStorage.rollOver();
                        arrayDeque2.addFirst(arrayDeque3);
                        arrayDeque3 = new ArrayDeque();
                        assertNextBufferOrEvent(arrayDeque2, createBufferStorage);
                    }
                }
                createBufferStorage.rollOver();
                arrayDeque2.addFirst(arrayDeque3);
                arrayDeque = new ArrayDeque();
            }
            arrayDeque3 = arrayDeque;
        }
        while (!arrayDeque2.isEmpty()) {
            assertNextBufferOrEvent(arrayDeque2, createBufferStorage);
        }
    }

    private static void assertNextBufferOrEvent(ArrayDeque<ArrayDeque<BufferOrEvent>> arrayDeque, BufferStorage bufferStorage) throws IOException {
        while (!arrayDeque.isEmpty() && arrayDeque.peekFirst().isEmpty()) {
            arrayDeque.pollFirst();
        }
        Optional pollNext = bufferStorage.pollNext();
        if (arrayDeque.isEmpty()) {
            Assert.assertFalse(pollNext.isPresent());
            return;
        }
        while (!pollNext.isPresent() && !bufferStorage.isEmpty()) {
            pollNext = bufferStorage.pollNext();
        }
        Assert.assertTrue(pollNext.isPresent());
        BufferOrEvent bufferOrEvent = (BufferOrEvent) pollNext.get();
        BufferOrEvent pollFirst = arrayDeque.peekFirst().pollFirst();
        if (!pollFirst.isEvent()) {
            validateBuffer(bufferOrEvent, pollFirst.getSize(), pollFirst.getChannelIndex());
        } else {
            Assert.assertEquals(pollFirst.getChannelIndex(), bufferOrEvent.getChannelIndex());
            Assert.assertEquals(pollFirst.getEvent(), bufferOrEvent.getEvent());
        }
    }

    private static BufferOrEvent generateRandomEvent(Random random, int i) {
        long nextLong = random.nextLong();
        byte[] bArr = new byte[random.nextInt(1000)];
        random.nextBytes(bArr);
        return new BufferOrEvent(new TestEvent(nextLong, bArr), random.nextInt(i));
    }

    public static BufferOrEvent generateRandomBuffer(int i) {
        return generateRandomBuffer(i, 0);
    }

    public static BufferOrEvent generateRandomBuffer(int i, int i2) {
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
        for (int i3 = 0; i3 < i; i3++) {
            allocateUnpooledSegment.put(i3, (byte) i3);
        }
        NetworkBuffer networkBuffer = new NetworkBuffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE);
        networkBuffer.setSize(i);
        return new BufferOrEvent(networkBuffer, i2);
    }

    private static void validateBuffer(BufferOrEvent bufferOrEvent, int i, int i2) {
        Assert.assertEquals("wrong channel index", i2, bufferOrEvent.getChannelIndex());
        Assert.assertTrue("is not buffer", bufferOrEvent.isBuffer());
        Buffer buffer = bufferOrEvent.getBuffer();
        Assert.assertEquals("wrong buffer size", i, buffer.getSize());
        MemorySegment memorySegment = buffer.getMemorySegment();
        for (int i3 = 0; i3 < i; i3++) {
            byte b = (byte) i3;
            if (b != memorySegment.get(i3)) {
                Assert.fail(String.format("wrong buffer contents at position %s : expected=%d , found=%d", Integer.valueOf(i3), Byte.valueOf(b), Byte.valueOf(memorySegment.get(i3))));
            }
        }
    }
}
