package org.apache.flink.runtime.io.network.api.writer;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterTest;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.class */
public class RecordWriterDelegateTest extends TestLogger {
    private static final int numberOfBuffers = 10;
    private static final int memorySegmentSize = 128;
    private static final int numberOfSegmentsToRequest = 2;
    private NetworkBufferPool globalPool;

    @Before
    public void setup() {
        this.globalPool = new NetworkBufferPool(10, memorySegmentSize, numberOfSegmentsToRequest);
    }

    @After
    public void teardown() {
        this.globalPool.destroyAllBufferPools();
        this.globalPool.destroy();
    }

    @Test
    public void testSingleRecordWriterAvailability() throws Exception {
        RecordWriter createRecordWriter = createRecordWriter(this.globalPool);
        SingleRecordWriter singleRecordWriter = new SingleRecordWriter(createRecordWriter);
        Assert.assertEquals(createRecordWriter, singleRecordWriter.getRecordWriter(0));
        verifyAvailability(singleRecordWriter);
    }

    @Test
    public void testMultipleRecordWritersAvailability() throws Exception {
        ArrayList arrayList = new ArrayList(numberOfSegmentsToRequest);
        for (int i = 0; i < numberOfSegmentsToRequest; i++) {
            arrayList.add(createRecordWriter(this.globalPool));
        }
        MultipleRecordWriters multipleRecordWriters = new MultipleRecordWriters(arrayList);
        for (int i2 = 0; i2 < numberOfSegmentsToRequest; i2++) {
            Assert.assertEquals(arrayList.get(i2), multipleRecordWriters.getRecordWriter(i2));
        }
        verifyAvailability(multipleRecordWriters);
    }

    @Test
    public void testSingleRecordWriterBroadcastEvent() throws Exception {
        ArrayDeque<BufferConsumer>[] arrayDequeArr = {new ArrayDeque<>(), new ArrayDeque<>()};
        verifyBroadcastEvent(new SingleRecordWriter(createRecordWriter(arrayDequeArr)), arrayDequeArr, 1);
    }

    @Test
    public void testMultipleRecordWritersBroadcastEvent() throws Exception {
        ArrayList arrayList = new ArrayList(numberOfSegmentsToRequest);
        ArrayDeque<BufferConsumer>[] arrayDequeArr = {new ArrayDeque<>(), new ArrayDeque<>()};
        for (int i = 0; i < numberOfSegmentsToRequest; i++) {
            arrayList.add(createRecordWriter(arrayDequeArr));
        }
        verifyBroadcastEvent(new MultipleRecordWriters(arrayList), arrayDequeArr, numberOfSegmentsToRequest);
    }

    private RecordWriter createRecordWriter(NetworkBufferPool networkBufferPool) throws Exception {
        BufferPool createBufferPool = networkBufferPool.createBufferPool(1, 1);
        ResultPartition build = new ResultPartitionBuilder().setBufferPoolFactory(bufferPoolOwner -> {
            return createBufferPool;
        }).build();
        build.setup();
        return new RecordWriterBuilder().build(build);
    }

    private RecordWriter createRecordWriter(ArrayDeque<BufferConsumer>[] arrayDequeArr) {
        return new RecordWriterBuilder().build(new RecordWriterTest.CollectingPartitionWriter(arrayDequeArr, new TestPooledBufferProvider(1)));
    }

    private void verifyAvailability(RecordWriterDelegate recordWriterDelegate) throws Exception {
        Assert.assertTrue(recordWriterDelegate.isAvailable());
        Assert.assertTrue(recordWriterDelegate.getAvailableFuture().isDone());
        RecordWriter recordWriter = recordWriterDelegate.getRecordWriter(0);
        BufferBuilder bufferBuilder = (BufferBuilder) Preconditions.checkNotNull(recordWriter.getBufferBuilder(0));
        Assert.assertFalse(recordWriterDelegate.isAvailable());
        CompletableFuture availableFuture = recordWriterDelegate.getAvailableFuture();
        Assert.assertFalse(availableFuture.isDone());
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish();
        recordWriter.getTargetPartition().createSubpartitionView(0, new NoOpBufferAvailablityListener()).getNextBuffer().buffer().recycleBuffer();
        Assert.assertTrue(availableFuture.isDone());
        Assert.assertTrue(recordWriterDelegate.isAvailable());
        Assert.assertTrue(recordWriterDelegate.getAvailableFuture().isDone());
    }

    private void verifyBroadcastEvent(RecordWriterDelegate recordWriterDelegate, ArrayDeque<BufferConsumer>[] arrayDequeArr, int i) throws Exception {
        CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(1L);
        recordWriterDelegate.broadcastEvent(cancelCheckpointMarker);
        for (int i2 = 0; i2 < arrayDequeArr.length; i2++) {
            Assert.assertEquals(i, arrayDequeArr[i2].size());
            for (int i3 = 0; i3 < i; i3++) {
                BufferOrEvent parseBuffer = RecordWriterTest.parseBuffer(arrayDequeArr[i2].remove(), i2);
                Assert.assertTrue(parseBuffer.isEvent());
                Assert.assertEquals(cancelCheckpointMarker, parseBuffer.getEvent());
            }
        }
    }
}
