package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.class */
public class StreamTaskNetworkInputTest {
    private static final int PAGE_SIZE = 1000;
    private final IOManager ioManager = new IOManagerAsync();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest$NoOpDataOutput.class */
    private static class NoOpDataOutput<T> implements PushingAsyncDataInput.DataOutput<T> {
        private NoOpDataOutput() {
        }

        public void emitRecord(StreamRecord<T> streamRecord) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void emitStreamStatus(StreamStatus streamStatus) {
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest$TestRecordDeserializer.class */
    public static class TestRecordDeserializer extends SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>> {
        private boolean cleared;

        public TestRecordDeserializer(String[] strArr) {
            super(strArr);
            this.cleared = false;
        }

        public void clear() {
            this.cleared = true;
        }

        public boolean isCleared() {
            return this.cleared;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest$VerifyRecordsDataOutput.class */
    private static class VerifyRecordsDataOutput<T> extends NoOpDataOutput<T> {
        private int numberOfEmittedRecords;

        private VerifyRecordsDataOutput() {
            super();
        }

        @Override // org.apache.flink.streaming.runtime.io.StreamTaskNetworkInputTest.NoOpDataOutput
        public void emitRecord(StreamRecord<T> streamRecord) {
            this.numberOfEmittedRecords++;
        }

        int getNumberOfEmittedRecords() {
            return this.numberOfEmittedRecords;
        }
    }

    @After
    public void tearDown() throws Exception {
        this.ioManager.close();
    }

    @Test
    public void testIsAvailableWithBufferedDataInDeserializer() throws Exception {
        List<BufferOrEvent> singletonList = Collections.singletonList(createDataBuffer());
        VerifyRecordsDataOutput verifyRecordsDataOutput = new VerifyRecordsDataOutput();
        StreamTaskNetworkInput createStreamTaskNetworkInput = createStreamTaskNetworkInput(singletonList);
        assertHasNextElement(createStreamTaskNetworkInput, verifyRecordsDataOutput);
        assertHasNextElement(createStreamTaskNetworkInput, verifyRecordsDataOutput);
        Assert.assertEquals(2L, verifyRecordsDataOutput.getNumberOfEmittedRecords());
    }

    @Test
    public void testNoDataProcessedAfterCheckpointBarrier() throws Exception {
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation());
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(new BufferOrEvent(checkpointBarrier, new InputChannelInfo(0, 0)));
        arrayList.add(createDataBuffer());
        assertHasNextElement(createStreamTaskNetworkInput(arrayList), new VerifyRecordsDataOutput());
        Assert.assertEquals(0L, r0.getNumberOfEmittedRecords());
    }

    private TestRecordDeserializer[] createDeserializers(int i) {
        return (TestRecordDeserializer[]) IntStream.range(0, i).mapToObj(i2 -> {
            return new TestRecordDeserializer(this.ioManager.getSpillingDirectoriesPaths());
        }).toArray(i3 -> {
            return new TestRecordDeserializer[i3];
        });
    }

    @Test
    public void testSnapshotAfterEndOfPartition() throws Exception {
        VerifyRecordsDataOutput verifyRecordsDataOutput = new VerifyRecordsDataOutput();
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate streamTestSingleInputGate = new StreamTestSingleInputGate(1, 0, longSerializer, StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE);
        TestRecordDeserializer[] createDeserializers = createDeserializers(1);
        StreamTaskNetworkInput streamTaskNetworkInput = new StreamTaskNetworkInput(new CheckpointedInputGate(streamTestSingleInputGate.getInputGate(), SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler(TestSubtaskCheckpointCoordinator.INSTANCE, "test", new DummyCheckpointInvokable(), new CheckpointableInput[]{streamTestSingleInputGate.getInputGate()}), new SyncMailboxExecutor()), longSerializer, new StatusWatermarkValve(1), 0, createDeserializers);
        streamTestSingleInputGate.sendEvent(new CheckpointBarrier(0, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0);
        streamTestSingleInputGate.sendElement(new StreamRecord(42L), 0);
        assertHasNextElement(streamTaskNetworkInput, verifyRecordsDataOutput);
        assertHasNextElement(streamTaskNetworkInput, verifyRecordsDataOutput);
        Assert.assertEquals(1L, verifyRecordsDataOutput.getNumberOfEmittedRecords());
        streamTestSingleInputGate.sendEvent(EndOfPartitionEvent.INSTANCE, 0);
        streamTaskNetworkInput.emitNext(verifyRecordsDataOutput);
        Assert.assertNull(createDeserializers[0]);
        streamTaskNetworkInput.prepareSnapshot(ChannelStateWriter.NO_OP, 0).join();
    }

    @Test
    public void testReleasingDeserializerTimely() throws Exception {
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate streamTestSingleInputGate = new StreamTestSingleInputGate(2, 0, longSerializer, StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE);
        TestRecordDeserializer[] testRecordDeserializerArr = new TestRecordDeserializer[2];
        for (int i = 0; i < testRecordDeserializerArr.length; i++) {
            testRecordDeserializerArr[i] = new TestRecordDeserializer(this.ioManager.getSpillingDirectoriesPaths());
        }
        TestRecordDeserializer[] testRecordDeserializerArr2 = (TestRecordDeserializer[]) Arrays.copyOf(testRecordDeserializerArr, testRecordDeserializerArr.length);
        NoOpDataOutput noOpDataOutput = new NoOpDataOutput();
        StreamTaskNetworkInput streamTaskNetworkInput = new StreamTaskNetworkInput(new CheckpointedInputGate(streamTestSingleInputGate.getInputGate(), new CheckpointBarrierTracker(1, new DummyCheckpointInvokable()), new SyncMailboxExecutor()), longSerializer, new StatusWatermarkValve(1), 0, testRecordDeserializerArr);
        for (int i2 = 0; i2 < 2; i2++) {
            Assert.assertNotNull(testRecordDeserializerArr[i2]);
            streamTestSingleInputGate.sendEvent(EndOfPartitionEvent.INSTANCE, i2);
            streamTaskNetworkInput.emitNext(noOpDataOutput);
            Assert.assertNull(testRecordDeserializerArr[i2]);
            Assert.assertTrue(testRecordDeserializerArr2[i2].isCleared());
        }
    }

    private BufferOrEvent createDataBuffer() throws IOException {
        BufferBuilder createEmptyBufferBuilder = BufferBuilderTestUtils.createEmptyBufferBuilder(PAGE_SIZE);
        BufferConsumer createBufferConsumer = createEmptyBufferBuilder.createBufferConsumer();
        serializeRecord(42L, createEmptyBufferBuilder);
        serializeRecord(44L, createEmptyBufferBuilder);
        return new BufferOrEvent(createBufferConsumer.build(), new InputChannelInfo(0, 0));
    }

    private StreamTaskNetworkInput createStreamTaskNetworkInput(List<BufferOrEvent> list) {
        return new StreamTaskNetworkInput(new CheckpointedInputGate(new MockInputGate(1, list, false), new CheckpointBarrierTracker(1, new DummyCheckpointInvokable()), new SyncMailboxExecutor()), LongSerializer.INSTANCE, this.ioManager, new StatusWatermarkValve(1), 0);
    }

    private void serializeRecord(long j, BufferBuilder bufferBuilder) throws IOException {
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
        SerializationDelegate serializationDelegate = new SerializationDelegate(new StreamElementSerializer(LongSerializer.INSTANCE));
        serializationDelegate.setInstance(new StreamRecord(Long.valueOf(j)));
        bufferBuilder.appendAndCommit(RecordWriter.serializeRecord(dataOutputSerializer, serializationDelegate));
        Assert.assertFalse(bufferBuilder.isFull());
    }

    private static void assertHasNextElement(StreamTaskNetworkInput streamTaskNetworkInput, PushingAsyncDataInput.DataOutput dataOutput) throws Exception {
        Assert.assertTrue(streamTaskNetworkInput.getAvailableFuture().isDone());
        Assert.assertThat(streamTaskNetworkInput.emitNext(dataOutput), Matchers.is(InputStatus.MORE_AVAILABLE));
    }
}
