package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.streaming.runtime.io.BufferSpiller;
import org.apache.flink.util.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.class */
public class SpilledBufferOrEventSequenceTest {
    private final ByteBuffer buffer = ByteBuffer.allocateDirect(131072).order(ByteOrder.LITTLE_ENDIAN);
    private final int pageSize = 32768;
    private File tempFile;
    private FileChannel fileChannel;

    @Before
    public void initTempChannel() {
        try {
            this.tempFile = File.createTempFile("testdata", "tmp");
            this.fileChannel = new RandomAccessFile(this.tempFile, "rw").getChannel();
        } catch (Exception e) {
            cleanup();
        }
    }

    @After
    public void cleanup() {
        if (this.fileChannel != null) {
            try {
                this.fileChannel.close();
            } catch (IOException e) {
            }
        }
        if (this.tempFile != null) {
            this.tempFile.delete();
        }
    }

    @Test
    public void testEmptyChannel() {
        try {
            BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            spilledBufferOrEventSequence.open();
            Assert.assertNull(spilledBufferOrEventSequence.getNext());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testIncompleteHeaderOnFirstElement() {
        try {
            ByteBuffer allocate = ByteBuffer.allocate(7);
            allocate.order(ByteOrder.LITTLE_ENDIAN);
            FileUtils.writeCompletely(this.fileChannel, allocate);
            this.fileChannel.position(0L);
            BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            spilledBufferOrEventSequence.open();
            try {
                spilledBufferOrEventSequence.getNext();
                Assert.fail("should fail with an exception");
            } catch (IOException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testBufferSequence() {
        try {
            Random random = new Random();
            long nextLong = random.nextLong();
            random.setSeed(nextLong);
            for (int i = 0; i < 325; i++) {
                writeBuffer(this.fileChannel, random.nextInt(32768) + 1, random.nextInt(671));
            }
            this.fileChannel.position(0L);
            random.setSeed(nextLong);
            BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            spilledBufferOrEventSequence.open();
            for (int i2 = 0; i2 < 325; i2++) {
                validateBuffer(spilledBufferOrEventSequence.getNext(), random.nextInt(32768) + 1, random.nextInt(671));
            }
            Assert.assertNull(spilledBufferOrEventSequence.getNext());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBufferSequenceWithIncompleteBuffer() {
        try {
            writeBuffer(this.fileChannel, 1672, 7);
            ByteBuffer allocate = ByteBuffer.allocate(615);
            allocate.order(ByteOrder.LITTLE_ENDIAN);
            allocate.putInt(2);
            allocate.putInt(999);
            allocate.put((byte) 0);
            allocate.position(0);
            allocate.limit(312);
            FileUtils.writeCompletely(this.fileChannel, allocate);
            this.fileChannel.position(0L);
            BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            spilledBufferOrEventSequence.open();
            validateBuffer(spilledBufferOrEventSequence.getNext(), 1672, 7);
            try {
                spilledBufferOrEventSequence.getNext();
                Assert.fail("should fail with an exception");
            } catch (IOException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testEventSequence() {
        try {
            Random random = new Random();
            ArrayList arrayList = new ArrayList(3000);
            for (int i = 0; i < 3000; i++) {
                arrayList.add(generateAndWriteEvent(this.fileChannel, random, 1656));
            }
            this.fileChannel.position(0L);
            BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            spilledBufferOrEventSequence.open();
            int i2 = 0;
            while (true) {
                BufferOrEvent next = spilledBufferOrEventSequence.getNext();
                if (next == null) {
                    Assert.assertEquals(3000L, i2);
                    return;
                }
                BufferOrEvent bufferOrEvent = (BufferOrEvent) arrayList.get(i2);
                Assert.assertTrue(next.isEvent());
                Assert.assertEquals(bufferOrEvent.getEvent(), next.getEvent());
                Assert.assertEquals(bufferOrEvent.getChannelIndex(), next.getChannelIndex());
                i2++;
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMixedSequence() {
        try {
            Random random = new Random();
            Random random2 = new Random();
            long nextLong = random.nextLong();
            random2.setSeed(nextLong);
            ArrayList arrayList = new ArrayList(128);
            for (int i = 0; i < 3000; i++) {
                if (random.nextDouble() < 0.05d) {
                    arrayList.add(generateAndWriteEvent(this.fileChannel, random, 1656));
                } else {
                    writeBuffer(this.fileChannel, random2.nextInt(32768) + 1, random2.nextInt(1656));
                }
            }
            this.fileChannel.position(0L);
            random2.setSeed(nextLong);
            BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            spilledBufferOrEventSequence.open();
            int i2 = 0;
            for (int i3 = 0; i3 < 3000; i3++) {
                BufferOrEvent next = spilledBufferOrEventSequence.getNext();
                if (next.isEvent()) {
                    int i4 = i2;
                    i2++;
                    Assert.assertEquals(((BufferOrEvent) arrayList.get(i4)).getEvent(), next.getEvent());
                    Assert.assertEquals(r0.getChannelIndex(), next.getChannelIndex());
                } else {
                    validateBuffer(next, random2.nextInt(32768) + 1, random2.nextInt(1656));
                }
            }
            Assert.assertNull(spilledBufferOrEventSequence.getNext());
            Assert.assertEquals(arrayList.size(), i2);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultipleSequences() {
        File file = null;
        FileChannel fileChannel = null;
        try {
            try {
                file = File.createTempFile("testdata", "tmp");
                fileChannel = new RandomAccessFile(file, "rw").getChannel();
                Random random = new Random();
                Random random2 = new Random();
                long nextLong = random.nextLong();
                random2.setSeed(nextLong);
                ArrayList arrayList = new ArrayList(128);
                ArrayList arrayList2 = new ArrayList(128);
                for (int i = 0; i < 272; i++) {
                    if (random.nextDouble() < 0.05d) {
                        arrayList.add(generateAndWriteEvent(this.fileChannel, random, 1656));
                    } else {
                        writeBuffer(this.fileChannel, random2.nextInt(32768) + 1, random2.nextInt(1656));
                    }
                }
                for (int i2 = 0; i2 < 151; i2++) {
                    if (random.nextDouble() < 0.05d) {
                        arrayList2.add(generateAndWriteEvent(fileChannel, random, 1656));
                    } else {
                        writeBuffer(fileChannel, random2.nextInt(32768) + 1, random2.nextInt(1656));
                    }
                }
                this.fileChannel.position(0L);
                fileChannel.position(0L);
                random2.setSeed(nextLong);
                BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
                BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence2 = new BufferSpiller.SpilledBufferOrEventSequence(file, fileChannel, this.buffer, 32768);
                spilledBufferOrEventSequence.open();
                int i3 = 0;
                for (int i4 = 0; i4 < 272; i4++) {
                    BufferOrEvent next = spilledBufferOrEventSequence.getNext();
                    if (next.isEvent()) {
                        int i5 = i3;
                        i3++;
                        Assert.assertEquals(((BufferOrEvent) arrayList.get(i5)).getEvent(), next.getEvent());
                        Assert.assertEquals(r0.getChannelIndex(), next.getChannelIndex());
                    } else {
                        validateBuffer(next, random2.nextInt(32768) + 1, random2.nextInt(1656));
                    }
                }
                Assert.assertNull(spilledBufferOrEventSequence.getNext());
                Assert.assertEquals(arrayList.size(), i3);
                spilledBufferOrEventSequence2.open();
                int i6 = 0;
                for (int i7 = 0; i7 < 151; i7++) {
                    BufferOrEvent next2 = spilledBufferOrEventSequence2.getNext();
                    if (next2.isEvent()) {
                        int i8 = i6;
                        i6++;
                        Assert.assertEquals(((BufferOrEvent) arrayList2.get(i8)).getEvent(), next2.getEvent());
                        Assert.assertEquals(r0.getChannelIndex(), next2.getChannelIndex());
                    } else {
                        validateBuffer(next2, random2.nextInt(32768) + 1, random2.nextInt(1656));
                    }
                }
                Assert.assertNull(spilledBufferOrEventSequence2.getNext());
                Assert.assertEquals(arrayList2.size(), i6);
                if (fileChannel != null) {
                    try {
                        fileChannel.close();
                    } catch (IOException e) {
                    }
                }
                if (file != null) {
                    file.delete();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
                Assert.fail(e2.getMessage());
                if (fileChannel != null) {
                    try {
                        fileChannel.close();
                    } catch (IOException e3) {
                    }
                }
                if (file != null) {
                    file.delete();
                }
            }
        } catch (Throwable th) {
            if (fileChannel != null) {
                try {
                    fileChannel.close();
                } catch (IOException e4) {
                }
            }
            if (file != null) {
                file.delete();
            }
            throw th;
        }
    }

    @Test
    public void testCleanup() {
        try {
            ByteBuffer allocate = ByteBuffer.allocate(157);
            allocate.order(ByteOrder.LITTLE_ENDIAN);
            FileUtils.writeCompletely(this.fileChannel, allocate);
            this.fileChannel.position(54L);
            BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence = new BufferSpiller.SpilledBufferOrEventSequence(this.tempFile, this.fileChannel, this.buffer, 32768);
            spilledBufferOrEventSequence.open();
            spilledBufferOrEventSequence.cleanup();
            Assert.assertFalse(this.fileChannel.isOpen());
            Assert.assertFalse(this.tempFile.exists());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private static BufferOrEvent generateAndWriteEvent(FileChannel fileChannel, Random random, int i) throws IOException {
        long nextLong = random.nextLong();
        byte[] bArr = new byte[random.nextInt(1000)];
        random.nextBytes(bArr);
        TestEvent testEvent = new TestEvent(nextLong, bArr);
        int nextInt = random.nextInt(i);
        ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(testEvent);
        ByteBuffer allocate = ByteBuffer.allocate(9);
        allocate.order(ByteOrder.LITTLE_ENDIAN);
        allocate.putInt(nextInt);
        allocate.putInt(serializedEvent.remaining());
        allocate.put((byte) 1);
        allocate.flip();
        FileUtils.writeCompletely(fileChannel, allocate);
        FileUtils.writeCompletely(fileChannel, serializedEvent);
        return new BufferOrEvent(testEvent, nextInt);
    }

    private static void writeBuffer(FileChannel fileChannel, int i, int i2) throws IOException {
        ByteBuffer allocate = ByteBuffer.allocate(i + 9);
        allocate.order(ByteOrder.LITTLE_ENDIAN);
        allocate.putInt(i2);
        allocate.putInt(i);
        allocate.put((byte) 0);
        for (int i3 = 0; i3 < i; i3++) {
            allocate.put((byte) i3);
        }
        allocate.flip();
        FileUtils.writeCompletely(fileChannel, allocate);
    }

    private static void validateBuffer(BufferOrEvent bufferOrEvent, int i, int i2) {
        Assert.assertEquals("wrong channel index", i2, bufferOrEvent.getChannelIndex());
        Assert.assertTrue("is not buffer", bufferOrEvent.isBuffer());
        Buffer buffer = bufferOrEvent.getBuffer();
        Assert.assertEquals("wrong buffer size", i, buffer.getSize());
        MemorySegment memorySegment = buffer.getMemorySegment();
        for (int i3 = 0; i3 < i; i3++) {
            Assert.assertEquals("wrong buffer contents", (byte) i3, memorySegment.get(i3));
        }
    }
}
