/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.streaming.runtime.io.BufferStorage;
import org.apache.flink.streaming.runtime.io.TestEvent;
import org.junit.Assert;
import org.junit.Test;

public abstract class BufferStorageTestBase {
    protected static final int PAGE_SIZE = 4096;

    abstract BufferStorage createBufferStorage();

    @Test
    public void testRollOverEmptySequences() throws IOException {
        BufferStorage bufferStorage = this.createBufferStorage();
        bufferStorage.rollOver();
        Assert.assertFalse((boolean)bufferStorage.pollNext().isPresent());
        bufferStorage.rollOver();
        Assert.assertFalse((boolean)bufferStorage.pollNext().isPresent());
        bufferStorage.rollOver();
        Assert.assertFalse((boolean)bufferStorage.pollNext().isPresent());
    }

    @Test
    public void testSpillAndRollOverSimple() throws IOException {
        Random rnd = new Random();
        Random bufferRnd = new Random();
        int maxNumEventsAndBuffers = 3000;
        int maxNumChannels = 1656;
        BufferStorage bufferStorage = this.createBufferStorage();
        for (int round = 0; round < 5; ++round) {
            long bufferSeed = rnd.nextLong();
            bufferRnd.setSeed(bufferSeed);
            int numEventsAndBuffers = rnd.nextInt(3000) + 1;
            int numberOfChannels = rnd.nextInt(1656) + 1;
            ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
            for (int i = 0; i < numEventsAndBuffers; ++i) {
                BufferOrEvent evt;
                boolean isEvent;
                boolean bl = isEvent = rnd.nextDouble() < 0.05;
                if (isEvent) {
                    evt = BufferStorageTestBase.generateRandomEvent(rnd, numberOfChannels);
                    events.add(evt);
                } else {
                    evt = BufferStorageTestBase.generateRandomBuffer(bufferRnd.nextInt(4096) + 1, bufferRnd.nextInt(numberOfChannels));
                }
                bufferStorage.add(evt);
            }
            bufferRnd.setSeed(bufferSeed);
            bufferStorage.rollOver();
            int numEvent = 0;
            for (int i = 0; i < numEventsAndBuffers; ++i) {
                Assert.assertFalse((boolean)bufferStorage.isEmpty());
                Optional next = bufferStorage.pollNext();
                Assert.assertTrue((boolean)next.isPresent());
                BufferOrEvent bufferOrEvent = (BufferOrEvent)next.get();
                if (bufferOrEvent.isEvent()) {
                    BufferOrEvent expected = (BufferOrEvent)events.get(numEvent++);
                    Assert.assertEquals((Object)expected.getEvent(), (Object)bufferOrEvent.getEvent());
                    Assert.assertEquals((long)expected.getChannelIndex(), (long)bufferOrEvent.getChannelIndex());
                    continue;
                }
                BufferStorageTestBase.validateBuffer(bufferOrEvent, bufferRnd.nextInt(4096) + 1, bufferRnd.nextInt(numberOfChannels));
            }
            Assert.assertFalse((boolean)bufferStorage.pollNext().isPresent());
            Assert.assertTrue((boolean)bufferStorage.isEmpty());
            Assert.assertEquals((long)events.size(), (long)numEvent);
        }
    }

    @Test
    public void testSpillWhileReading() throws IOException {
        int sequences = 10;
        Random rnd = new Random();
        int maxNumEventsAndBuffers = 300;
        int maxNumChannels = 1656;
        ArrayDeque<ArrayDeque<BufferOrEvent>> expectedRolledSequences = new ArrayDeque<ArrayDeque<BufferOrEvent>>();
        ArrayDeque<BufferOrEvent> expectedPendingSequence = new ArrayDeque<BufferOrEvent>();
        BufferStorage bufferStorage = this.createBufferStorage();
        for (int round = 0; round < 20; ++round) {
            if (round % 2 == 1) {
                bufferStorage.rollOver();
                expectedRolledSequences.addFirst(expectedPendingSequence);
                expectedPendingSequence = new ArrayDeque();
                continue;
            }
            long bufferSeed = rnd.nextLong();
            Random bufferRnd = new Random(bufferSeed);
            int numEventsAndBuffers = rnd.nextInt(300) + 1;
            int numberOfChannels = rnd.nextInt(1656) + 1;
            ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
            int generated = 0;
            while (generated < numEventsAndBuffers) {
                if (rnd.nextDouble() < 0.5) {
                    BufferOrEvent evt;
                    boolean isEvent;
                    boolean bl = isEvent = rnd.nextDouble() < 0.05;
                    if (isEvent) {
                        evt = BufferStorageTestBase.generateRandomEvent(rnd, numberOfChannels);
                        events.add(evt);
                    } else {
                        evt = BufferStorageTestBase.generateRandomBuffer(bufferRnd.nextInt(4096) + 1, bufferRnd.nextInt(numberOfChannels));
                    }
                    bufferStorage.add(evt);
                    expectedPendingSequence.addLast(evt);
                    ++generated;
                    continue;
                }
                bufferStorage.rollOver();
                expectedRolledSequences.addFirst(expectedPendingSequence);
                expectedPendingSequence = new ArrayDeque();
                BufferStorageTestBase.assertNextBufferOrEvent(expectedRolledSequences, bufferStorage);
            }
            bufferStorage.rollOver();
            expectedRolledSequences.addFirst(expectedPendingSequence);
            expectedPendingSequence = new ArrayDeque();
        }
        while (!expectedRolledSequences.isEmpty()) {
            BufferStorageTestBase.assertNextBufferOrEvent(expectedRolledSequences, bufferStorage);
        }
    }

    private static void assertNextBufferOrEvent(ArrayDeque<ArrayDeque<BufferOrEvent>> expectedRolledSequence, BufferStorage bufferStorage) throws IOException {
        while (!expectedRolledSequence.isEmpty() && expectedRolledSequence.peekFirst().isEmpty()) {
            expectedRolledSequence.pollFirst();
        }
        Optional next = bufferStorage.pollNext();
        if (expectedRolledSequence.isEmpty()) {
            Assert.assertFalse((boolean)next.isPresent());
            return;
        }
        while (!next.isPresent() && !bufferStorage.isEmpty()) {
            next = bufferStorage.pollNext();
        }
        Assert.assertTrue((boolean)next.isPresent());
        BufferOrEvent actualBufferOrEvent = (BufferOrEvent)next.get();
        BufferOrEvent expectedBufferOrEvent = expectedRolledSequence.peekFirst().pollFirst();
        if (expectedBufferOrEvent.isEvent()) {
            Assert.assertEquals((long)expectedBufferOrEvent.getChannelIndex(), (long)actualBufferOrEvent.getChannelIndex());
            Assert.assertEquals((Object)expectedBufferOrEvent.getEvent(), (Object)actualBufferOrEvent.getEvent());
        } else {
            BufferStorageTestBase.validateBuffer(actualBufferOrEvent, expectedBufferOrEvent.getSize(), expectedBufferOrEvent.getChannelIndex());
        }
    }

    private static BufferOrEvent generateRandomEvent(Random rnd, int numberOfChannels) {
        long magicNumber = rnd.nextLong();
        byte[] data = new byte[rnd.nextInt(1000)];
        rnd.nextBytes(data);
        TestEvent evt = new TestEvent(magicNumber, data);
        int channelIndex = rnd.nextInt(numberOfChannels);
        return new BufferOrEvent((AbstractEvent)evt, channelIndex);
    }

    public static BufferOrEvent generateRandomBuffer(int size) {
        return BufferStorageTestBase.generateRandomBuffer(size, 0);
    }

    public static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
        MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment((int)4096);
        for (int i = 0; i < size; ++i) {
            seg.put(i, (byte)i);
        }
        NetworkBuffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE);
        buf.setSize(size);
        return new BufferOrEvent((Buffer)buf, channelIndex);
    }

    private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
        Assert.assertEquals((String)"wrong channel index", (long)expectedChannelIndex, (long)boe.getChannelIndex());
        Assert.assertTrue((String)"is not buffer", (boolean)boe.isBuffer());
        Buffer buf = boe.getBuffer();
        Assert.assertEquals((String)"wrong buffer size", (long)expectedSize, (long)buf.getSize());
        MemorySegment seg = buf.getMemorySegment();
        for (int i = 0; i < expectedSize; ++i) {
            byte expected = (byte)i;
            if (expected == seg.get(i)) continue;
            Assert.fail((String)String.format("wrong buffer contents at position %s : expected=%d , found=%d", i, expected, seg.get(i)));
        }
    }
}

