/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.ArrayList;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.streaming.runtime.io.BufferSpiller;
import org.apache.flink.streaming.runtime.io.TestEvent;
import org.apache.flink.util.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SpilledBufferOrEventSequenceTest {
    private final ByteBuffer buffer = ByteBuffer.allocateDirect(131072).order(ByteOrder.LITTLE_ENDIAN);
    private final int pageSize = 32768;
    private File tempFile;
    private FileChannel fileChannel;

    @Before
    public void initTempChannel() {
        try {
            this.tempFile = File.createTempFile("testdata", "tmp");
            this.fileChannel = new RandomAccessFile(this.tempFile, "rw").getChannel();
        }
        catch (Exception e) {
            this.cleanup();
        }
    }

    @After
    public void cleanup() {
        if (this.fileChannel != null) {
            try {
                this.fileChannel.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        if (this.tempFile != null) {
            this.tempFile.delete();
        }
    }

    @Test
    public void testEmptyChannel() {
        try {
            BufferSpiller.SpilledBufferOrEventSequence seq = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            seq.open();
            Assert.assertNull((Object)seq.getNext());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncompleteHeaderOnFirstElement() {
        try {
            ByteBuffer buf = ByteBuffer.allocate(7);
            buf.order(ByteOrder.LITTLE_ENDIAN);
            FileUtils.writeCompletely((WritableByteChannel)this.fileChannel, (ByteBuffer)buf);
            this.fileChannel.position(0L);
            BufferSpiller.SpilledBufferOrEventSequence seq = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            seq.open();
            try {
                seq.getNext();
                Assert.fail((String)"should fail with an exception");
            }
            catch (IOException iOException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBufferSequence() {
        try {
            Random rnd = new Random();
            long seed = rnd.nextLong();
            int numBuffers = 325;
            int numberOfChannels = 671;
            rnd.setSeed(seed);
            for (int i = 0; i < 325; ++i) {
                SpilledBufferOrEventSequenceTest.writeBuffer(this.fileChannel, rnd.nextInt(32768) + 1, rnd.nextInt(671));
            }
            this.fileChannel.position(0L);
            rnd.setSeed(seed);
            BufferSpiller.SpilledBufferOrEventSequence seq = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            seq.open();
            for (int i = 0; i < 325; ++i) {
                SpilledBufferOrEventSequenceTest.validateBuffer(seq.getNext(), rnd.nextInt(32768) + 1, rnd.nextInt(671));
            }
            Assert.assertNull((Object)seq.getNext());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBufferSequenceWithIncompleteBuffer() {
        try {
            SpilledBufferOrEventSequenceTest.writeBuffer(this.fileChannel, 1672, 7);
            ByteBuffer data = ByteBuffer.allocate(615);
            data.order(ByteOrder.LITTLE_ENDIAN);
            data.putInt(2);
            data.putInt(999);
            data.put((byte)0);
            data.position(0);
            data.limit(312);
            FileUtils.writeCompletely((WritableByteChannel)this.fileChannel, (ByteBuffer)data);
            this.fileChannel.position(0L);
            BufferSpiller.SpilledBufferOrEventSequence seq = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            seq.open();
            SpilledBufferOrEventSequenceTest.validateBuffer(seq.getNext(), 1672, 7);
            try {
                seq.getNext();
                Assert.fail((String)"should fail with an exception");
            }
            catch (IOException iOException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testEventSequence() {
        try {
            BufferOrEvent boe;
            Random rnd = new Random();
            int numEvents = 3000;
            int numberOfChannels = 1656;
            ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(3000);
            for (int i = 0; i < 3000; ++i) {
                events.add(SpilledBufferOrEventSequenceTest.generateAndWriteEvent(this.fileChannel, rnd, 1656));
            }
            this.fileChannel.position(0L);
            BufferSpiller.SpilledBufferOrEventSequence seq = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            seq.open();
            int i = 0;
            while ((boe = seq.getNext()) != null) {
                BufferOrEvent expected = (BufferOrEvent)events.get(i);
                Assert.assertTrue((boolean)boe.isEvent());
                Assert.assertEquals((Object)expected.getEvent(), (Object)boe.getEvent());
                Assert.assertEquals((long)expected.getChannelIndex(), (long)boe.getChannelIndex());
                ++i;
            }
            Assert.assertEquals((long)3000L, (long)i);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMixedSequence() {
        try {
            Random rnd = new Random();
            Random bufferRnd = new Random();
            long bufferSeed = rnd.nextLong();
            bufferRnd.setSeed(bufferSeed);
            int numEventsAndBuffers = 3000;
            int numberOfChannels = 1656;
            ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
            for (int i = 0; i < 3000; ++i) {
                boolean isEvent;
                boolean bl = isEvent = rnd.nextDouble() < 0.05;
                if (isEvent) {
                    events.add(SpilledBufferOrEventSequenceTest.generateAndWriteEvent(this.fileChannel, rnd, 1656));
                    continue;
                }
                SpilledBufferOrEventSequenceTest.writeBuffer(this.fileChannel, bufferRnd.nextInt(32768) + 1, bufferRnd.nextInt(1656));
            }
            this.fileChannel.position(0L);
            bufferRnd.setSeed(bufferSeed);
            BufferSpiller.SpilledBufferOrEventSequence seq = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            seq.open();
            int numEvent = 0;
            for (int i = 0; i < 3000; ++i) {
                BufferOrEvent next = seq.getNext();
                if (next.isEvent()) {
                    BufferOrEvent expected = (BufferOrEvent)events.get(numEvent++);
                    Assert.assertEquals((Object)expected.getEvent(), (Object)next.getEvent());
                    Assert.assertEquals((long)expected.getChannelIndex(), (long)next.getChannelIndex());
                    continue;
                }
                SpilledBufferOrEventSequenceTest.validateBuffer(next, bufferRnd.nextInt(32768) + 1, bufferRnd.nextInt(1656));
            }
            Assert.assertNull((Object)seq.getNext());
            Assert.assertEquals((long)events.size(), (long)numEvent);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleSequences() {
        File secondFile = null;
        AbstractInterruptibleChannel secondChannel = null;
        try {
            BufferOrEvent expected;
            BufferOrEvent next;
            int i;
            boolean isEvent;
            int i2;
            secondFile = File.createTempFile("testdata", "tmp");
            secondChannel = new RandomAccessFile(secondFile, "rw").getChannel();
            Random rnd = new Random();
            Random bufferRnd = new Random();
            long bufferSeed = rnd.nextLong();
            bufferRnd.setSeed(bufferSeed);
            int numEventsAndBuffers1 = 272;
            int numEventsAndBuffers2 = 151;
            int numberOfChannels = 1656;
            ArrayList<BufferOrEvent> events1 = new ArrayList<BufferOrEvent>(128);
            ArrayList<BufferOrEvent> events2 = new ArrayList<BufferOrEvent>(128);
            for (i2 = 0; i2 < 272; ++i2) {
                boolean bl = isEvent = rnd.nextDouble() < 0.05;
                if (isEvent) {
                    events1.add(SpilledBufferOrEventSequenceTest.generateAndWriteEvent(this.fileChannel, rnd, 1656));
                    continue;
                }
                SpilledBufferOrEventSequenceTest.writeBuffer(this.fileChannel, bufferRnd.nextInt(32768) + 1, bufferRnd.nextInt(1656));
            }
            for (i2 = 0; i2 < 151; ++i2) {
                boolean bl = isEvent = rnd.nextDouble() < 0.05;
                if (isEvent) {
                    events2.add(SpilledBufferOrEventSequenceTest.generateAndWriteEvent((FileChannel)secondChannel, rnd, 1656));
                    continue;
                }
                SpilledBufferOrEventSequenceTest.writeBuffer((FileChannel)secondChannel, bufferRnd.nextInt(32768) + 1, bufferRnd.nextInt(1656));
            }
            this.fileChannel.position(0L);
            ((FileChannel)secondChannel).position(0L);
            bufferRnd.setSeed(bufferSeed);
            BufferSpiller.SpilledBufferOrEventSequence seq1 = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            BufferSpiller.SpilledBufferOrEventSequence seq2 = new BufferSpiller.SpilledBufferOrEventSequence(secondFile, (FileChannel)secondChannel, this.buffer, 32768);
            seq1.open();
            int numEvent = 0;
            for (i = 0; i < 272; ++i) {
                next = seq1.getNext();
                if (next.isEvent()) {
                    expected = (BufferOrEvent)events1.get(numEvent++);
                    Assert.assertEquals((Object)expected.getEvent(), (Object)next.getEvent());
                    Assert.assertEquals((long)expected.getChannelIndex(), (long)next.getChannelIndex());
                    continue;
                }
                SpilledBufferOrEventSequenceTest.validateBuffer(next, bufferRnd.nextInt(32768) + 1, bufferRnd.nextInt(1656));
            }
            Assert.assertNull((Object)seq1.getNext());
            Assert.assertEquals((long)events1.size(), (long)numEvent);
            seq2.open();
            numEvent = 0;
            for (i = 0; i < 151; ++i) {
                next = seq2.getNext();
                if (next.isEvent()) {
                    expected = (BufferOrEvent)events2.get(numEvent++);
                    Assert.assertEquals((Object)expected.getEvent(), (Object)next.getEvent());
                    Assert.assertEquals((long)expected.getChannelIndex(), (long)next.getChannelIndex());
                    continue;
                }
                SpilledBufferOrEventSequenceTest.validateBuffer(next, bufferRnd.nextInt(32768) + 1, bufferRnd.nextInt(1656));
            }
            Assert.assertNull((Object)seq2.getNext());
            Assert.assertEquals((long)events2.size(), (long)numEvent);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (secondChannel != null) {
                try {
                    secondChannel.close();
                }
                catch (IOException iOException) {}
            }
            if (secondFile != null) {
                secondFile.delete();
            }
        }
    }

    @Test
    public void testCleanup() {
        try {
            ByteBuffer data = ByteBuffer.allocate(157);
            data.order(ByteOrder.LITTLE_ENDIAN);
            FileUtils.writeCompletely((WritableByteChannel)this.fileChannel, (ByteBuffer)data);
            this.fileChannel.position(54L);
            BufferSpiller.SpilledBufferOrEventSequence seq = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            seq.open();
            seq.cleanup();
            Assert.assertFalse((boolean)this.fileChannel.isOpen());
            Assert.assertFalse((boolean)this.tempFile.exists());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static BufferOrEvent generateAndWriteEvent(FileChannel fileChannel, Random rnd, int numberOfChannels) throws IOException {
        long magicNumber = rnd.nextLong();
        byte[] data = new byte[rnd.nextInt(1000)];
        rnd.nextBytes(data);
        TestEvent evt = new TestEvent(magicNumber, data);
        int channelIndex = rnd.nextInt(numberOfChannels);
        ByteBuffer serializedEvent = EventSerializer.toSerializedEvent((AbstractEvent)evt);
        ByteBuffer header = ByteBuffer.allocate(9);
        header.order(ByteOrder.LITTLE_ENDIAN);
        header.putInt(channelIndex);
        header.putInt(serializedEvent.remaining());
        header.put((byte)1);
        header.flip();
        FileUtils.writeCompletely((WritableByteChannel)fileChannel, (ByteBuffer)header);
        FileUtils.writeCompletely((WritableByteChannel)fileChannel, (ByteBuffer)serializedEvent);
        return new BufferOrEvent((AbstractEvent)evt, channelIndex);
    }

    private static void writeBuffer(FileChannel fileChannel, int size, int channelIndex) throws IOException {
        ByteBuffer data = ByteBuffer.allocate(size + 9);
        data.order(ByteOrder.LITTLE_ENDIAN);
        data.putInt(channelIndex);
        data.putInt(size);
        data.put((byte)0);
        for (int i = 0; i < size; ++i) {
            data.put((byte)i);
        }
        data.flip();
        FileUtils.writeCompletely((WritableByteChannel)fileChannel, (ByteBuffer)data);
    }

    private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
        Assert.assertEquals((String)"wrong channel index", (long)expectedChannelIndex, (long)boe.getChannelIndex());
        Assert.assertTrue((String)"is not buffer", (boolean)boe.isBuffer());
        Buffer buf = boe.getBuffer();
        Assert.assertEquals((String)"wrong buffer size", (long)expectedSize, (long)buf.getSize());
        MemorySegment seg = buf.getMemorySegment();
        for (int i = 0; i < expectedSize; ++i) {
            Assert.assertEquals((String)"wrong buffer contents", (long)((byte)i), (long)seg.get(i));
        }
    }
}

