package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.ArrayDeque;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.XORShiftRandom;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.class */
public class RecordWriterTest {
    private final boolean isBroadcastWriter;

    @Rule
    public TemporaryFolder tempFolder;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriterTest$ByteArrayIO.class */
    private static class ByteArrayIO implements IOReadableWritable {
        private final byte[] bytes;

        public ByteArrayIO(byte[] bArr) {
            this.bytes = bArr;
        }

        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.write(this.bytes);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readFully(this.bytes);
        }
    }

    public RecordWriterTest() {
        this(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriterTest(boolean z) {
        this.tempFolder = new TemporaryFolder();
        this.isBroadcastWriter = z;
    }

    @Test
    public void testBroadcastEventNoRecords() throws Exception {
        ResultPartition createResultPartition = createResultPartition(32, 4);
        RecordWriter createRecordWriter = createRecordWriter(createResultPartition);
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(2148402839L, 2166311875L, CheckpointOptions.forCheckpointWithDefaultLocation());
        createRecordWriter.broadcastEvent(checkpointBarrier);
        Assert.assertEquals(0L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        for (int i = 0; i < 4; i++) {
            Assert.assertEquals(1L, createResultPartition.getNumberOfQueuedBuffers(i));
            ResultSubpartitionView createSubpartitionView = createResultPartition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
            BufferOrEvent parseBuffer = parseBuffer(createSubpartitionView.getNextBuffer().buffer(), i);
            Assert.assertTrue(parseBuffer.isEvent());
            Assert.assertEquals(checkpointBarrier, parseBuffer.getEvent());
            Assert.assertFalse(createSubpartitionView.isAvailable(Integer.MAX_VALUE));
        }
    }

    @Test
    public void testBroadcastEventMixedRecords() throws Exception {
        XORShiftRandom xORShiftRandom = new XORShiftRandom();
        ResultPartition createResultPartition = createResultPartition(32, 4);
        RecordWriter createRecordWriter = createRecordWriter(createResultPartition);
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(2147484939L, 2147483846L, CheckpointOptions.forCheckpointWithDefaultLocation());
        byte[] bArr = new byte[32 / 2];
        xORShiftRandom.nextBytes(bArr);
        createRecordWriter.emit(new ByteArrayIO(bArr));
        byte[] bArr2 = new byte[32 + 1];
        xORShiftRandom.nextBytes(bArr2);
        createRecordWriter.emit(new ByteArrayIO(bArr2));
        byte[] bArr3 = new byte[32 - 4];
        xORShiftRandom.nextBytes(bArr3);
        createRecordWriter.emit(new ByteArrayIO(bArr3));
        createRecordWriter.broadcastEvent(checkpointBarrier);
        if (this.isBroadcastWriter) {
            Assert.assertEquals(3L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            for (int i = 0; i < 4; i++) {
                Assert.assertEquals(4L, createResultPartition.getNumberOfQueuedBuffers(i));
                ResultSubpartitionView createSubpartitionView = createResultPartition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
                for (int i2 = 0; i2 < 3; i2++) {
                    Assert.assertTrue(parseBuffer(createSubpartitionView.getNextBuffer().buffer(), 0).isBuffer());
                }
                BufferOrEvent parseBuffer = parseBuffer(createSubpartitionView.getNextBuffer().buffer(), i);
                Assert.assertTrue(parseBuffer.isEvent());
                Assert.assertEquals(checkpointBarrier, parseBuffer.getEvent());
            }
            return;
        }
        Assert.assertEquals(4L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        Assert.assertEquals(2L, createResultPartition.getNumberOfQueuedBuffers(0));
        Assert.assertTrue(parseBuffer(r0[0].getNextBuffer().buffer(), 0).isBuffer());
        Assert.assertEquals(3L, createResultPartition.getNumberOfQueuedBuffers(1));
        Assert.assertTrue(parseBuffer(r0[1].getNextBuffer().buffer(), 1).isBuffer());
        Assert.assertTrue(parseBuffer(r0[1].getNextBuffer().buffer(), 1).isBuffer());
        Assert.assertEquals(2L, createResultPartition.getNumberOfQueuedBuffers(2));
        Assert.assertTrue(parseBuffer(r0[2].getNextBuffer().buffer(), 2).isBuffer());
        ResultSubpartitionView[] resultSubpartitionViewArr = {createResultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener()), createResultPartition.createSubpartitionView(1, new NoOpBufferAvailablityListener()), createResultPartition.createSubpartitionView(2, new NoOpBufferAvailablityListener()), createResultPartition.createSubpartitionView(3, new NoOpBufferAvailablityListener())};
        Assert.assertEquals(1L, createResultPartition.getNumberOfQueuedBuffers(3));
        for (int i3 = 0; i3 < 4; i3++) {
            BufferOrEvent parseBuffer2 = parseBuffer(resultSubpartitionViewArr[i3].getNextBuffer().buffer(), i3);
            Assert.assertTrue(parseBuffer2.isEvent());
            Assert.assertEquals(checkpointBarrier, parseBuffer2.getEvent());
        }
    }

    @Test
    public void testBroadcastEventBufferReferenceCounting() throws Exception {
        ResultPartition createResultPartition = createResultPartition(32768, 2);
        createRecordWriter(createResultPartition).broadcastEvent(EndOfPartitionEvent.INSTANCE);
        Buffer[] bufferArr = new Buffer[2];
        for (int i = 0; i < 2; i++) {
            Assert.assertEquals(1L, createResultPartition.getNumberOfQueuedBuffers(i));
            bufferArr[i] = createResultPartition.createSubpartitionView(i, new NoOpBufferAvailablityListener()).getNextBuffer().buffer();
            Assert.assertTrue(parseBuffer(bufferArr[i], i).isEvent());
        }
        for (int i2 = 0; i2 < 2; i2++) {
            Assert.assertTrue(bufferArr[i2].isRecycled());
        }
    }

    @Test
    public void testBroadcastEventBufferIndependence() throws Exception {
        verifyBroadcastBufferOrEventIndependence(true);
    }

    @Test
    public void testBroadcastEmitBufferIndependence() throws Exception {
        verifyBroadcastBufferOrEventIndependence(false);
    }

    @Test
    public void testBroadcastEmitRecord() throws Exception {
        ResultPartition createResultPartition = createResultPartition(32, 4);
        RecordWriter createRecordWriter = createRecordWriter(createResultPartition);
        SpillingAdaptiveSpanningRecordDeserializer spillingAdaptiveSpanningRecordDeserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        ArrayDeque arrayDeque = new ArrayDeque();
        for (SerializationTestType serializationTestType : Util.randomRecords(8, SerializationTestTypeFactory.INT)) {
            arrayDeque.add(serializationTestType);
            createRecordWriter.broadcastEmit(serializationTestType);
        }
        if (this.isBroadcastWriter) {
            Assert.assertEquals(2L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        } else {
            Assert.assertEquals(8L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        }
        for (int i = 0; i < 4; i++) {
            Assert.assertEquals(2L, createResultPartition.getNumberOfQueuedBuffers(i));
            verifyDeserializationResults(createResultPartition.createSubpartitionView(i, new NoOpBufferAvailablityListener()), spillingAdaptiveSpanningRecordDeserializer, arrayDeque.clone(), 2, 8);
        }
    }

    @Test
    public void testIsAvailableOrNot() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        BufferPool createBufferPool = networkBufferPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE);
        ResultPartition build = new ResultPartitionBuilder().setBufferPoolFactory(() -> {
            return createBufferPool;
        }).build();
        build.setup();
        RecordWriter createRecordWriter = createRecordWriter(build);
        try {
            Assert.assertTrue(createRecordWriter.getAvailableFuture().isDone());
            BufferBuilder requestBufferBuilder = createBufferPool.requestBufferBuilder(0);
            Assert.assertNotNull(requestBufferBuilder);
            Assert.assertFalse(createRecordWriter.getAvailableFuture().isDone());
            BufferBuilderTestUtils.buildSingleBuffer(requestBufferBuilder).recycleBuffer();
            Assert.assertTrue(createRecordWriter.getAvailableFuture().isDone());
            Assert.assertEquals(RecordWriter.AVAILABLE, createRecordWriter.getAvailableFuture());
            createBufferPool.lazyDestroy();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createBufferPool.lazyDestroy();
            networkBufferPool.destroy();
            throw th;
        }
    }

    private void verifyBroadcastBufferOrEventIndependence(boolean z) throws Exception {
        ResultPartition createResultPartition = createResultPartition(4096, 2);
        RecordWriter createRecordWriter = createRecordWriter(createResultPartition);
        if (z) {
            createRecordWriter.broadcastEvent(EndOfPartitionEvent.INSTANCE);
        } else {
            createRecordWriter.broadcastEmit(new IntValue(0));
        }
        Assert.assertEquals(1L, createResultPartition.getNumberOfQueuedBuffers(0));
        Assert.assertEquals(1L, createResultPartition.getNumberOfQueuedBuffers(1));
        ResultSubpartitionView createSubpartitionView = createResultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
        ResultSubpartitionView createSubpartitionView2 = createResultPartition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
        Buffer buffer = createSubpartitionView.getNextBuffer().buffer();
        Buffer buffer2 = createSubpartitionView2.getNextBuffer().buffer();
        Assert.assertEquals(0L, buffer.getReaderIndex());
        Assert.assertEquals(0L, buffer2.getReaderIndex());
        buffer.setReaderIndex(1);
        Assert.assertEquals("Buffer 2 shares the same reader index as buffer 1", 0L, buffer2.getReaderIndex());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyDeserializationResults(ResultSubpartitionView resultSubpartitionView, RecordDeserializer<SerializationTestType> recordDeserializer, ArrayDeque<SerializationTestType> arrayDeque, int i, int i2) throws Exception {
        int i3 = 0;
        for (int i4 = 0; i4 < i; i4++) {
            recordDeserializer.setNextBuffer(resultSubpartitionView.getNextBuffer().buffer());
            i3 += DeserializationUtils.deserializeRecords(arrayDeque, recordDeserializer);
        }
        Assert.assertFalse(resultSubpartitionView.isAvailable(Integer.MAX_VALUE));
        Assert.assertEquals(i2, i3);
    }

    private RecordWriter createRecordWriter(ResultPartitionWriter resultPartitionWriter) {
        return this.isBroadcastWriter ? new RecordWriterBuilder().setChannelSelector(new OutputEmitter(ShipStrategyType.BROADCAST, 0)).build(resultPartitionWriter) : new RecordWriterBuilder().build(resultPartitionWriter);
    }

    public static ResultPartition createResultPartition(int i, int i2) throws IOException {
        ResultPartition createPartition = PartitionTestUtils.createPartition(new NettyShuffleEnvironmentBuilder().setBufferSize(i).build(), ResultPartitionType.PIPELINED, i2);
        createPartition.setup();
        return createPartition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BufferOrEvent parseBuffer(Buffer buffer, int i) throws IOException {
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, new InputChannelInfo(0, i));
        }
        AbstractEvent fromBuffer = EventSerializer.fromBuffer(buffer, RecordWriterTest.class.getClassLoader());
        buffer.recycleBuffer();
        return new BufferOrEvent(fromBuffer, new InputChannelInfo(0, i));
    }
}
