/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierUnaligner;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class StreamTaskNetworkInputTest {
    private static final int PAGE_SIZE = 1000;
    private final IOManager ioManager = new IOManagerAsync();

    @After
    public void tearDown() throws Exception {
        this.ioManager.close();
    }

    @Test
    public void testIsAvailableWithBufferedDataInDeserializer() throws Exception {
        List<BufferOrEvent> buffers = Collections.singletonList(this.createDataBuffer());
        VerifyRecordsDataOutput output = new VerifyRecordsDataOutput();
        StreamTaskNetworkInput input = this.createStreamTaskNetworkInput(buffers, output);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        Assert.assertEquals((long)2L, (long)output.getNumberOfEmittedRecords());
    }

    @Test
    public void testNoDataProcessedAfterCheckpointBarrier() throws Exception {
        CheckpointBarrier barrier = new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation());
        ArrayList<BufferOrEvent> buffers = new ArrayList<BufferOrEvent>(2);
        buffers.add(new BufferOrEvent((AbstractEvent)barrier, new InputChannelInfo(0, 0)));
        buffers.add(this.createDataBuffer());
        VerifyRecordsDataOutput output = new VerifyRecordsDataOutput();
        StreamTaskNetworkInput input = this.createStreamTaskNetworkInput(buffers, output);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        Assert.assertEquals((long)0L, (long)output.getNumberOfEmittedRecords());
    }

    @Test
    public void testSnapshotWithTwoInputGates() throws Exception {
        SingleInputGate inputGate1 = new SingleInputGateBuilder().setSingleInputGateIndex(0).build();
        RemoteInputChannel channel1 = InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate1);
        inputGate1.setInputChannels(new InputChannel[]{channel1});
        SingleInputGate inputGate2 = new SingleInputGateBuilder().setSingleInputGateIndex(1).build();
        RemoteInputChannel channel2 = InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate2);
        inputGate2.setInputChannels(new InputChannel[]{channel2});
        CheckpointBarrierUnaligner unaligner = new CheckpointBarrierUnaligner((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, "test", (AbstractInvokable)new DummyCheckpointInvokable(), new InputGate[]{inputGate1, inputGate2});
        inputGate1.registerBufferReceivedListener((BufferReceivedListener)unaligner.getBufferReceivedListener().get());
        inputGate2.registerBufferReceivedListener((BufferReceivedListener)unaligner.getBufferReceivedListener().get());
        StreamTaskNetworkInput<Long> input1 = this.createInput((CheckpointBarrierHandler)unaligner, inputGate1);
        StreamTaskNetworkInput<Long> input2 = this.createInput((CheckpointBarrierHandler)unaligner, inputGate2);
        CheckpointBarrier barrier = new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation());
        channel1.onBuffer(EventSerializer.toBuffer((AbstractEvent)barrier), 0, 0);
        channel1.onBuffer(BufferBuilderTestUtils.buildSomeBuffer((int)1), 1, 0);
        channel2.onBuffer(BufferBuilderTestUtils.buildSomeBuffer((int)2), 0, 0);
        channel2.onBuffer(BufferBuilderTestUtils.buildSomeBuffer((int)3), 1, 0);
        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
        channelStateWriter.start(0L, CheckpointOptions.forCheckpointWithDefaultLocation());
        CompletableFuture completableFuture1 = input1.prepareSnapshot((ChannelStateWriter)channelStateWriter, 0L);
        CompletableFuture completableFuture2 = input2.prepareSnapshot((ChannelStateWriter)channelStateWriter, 0L);
        channel2.onBuffer(EventSerializer.toBuffer((AbstractEvent)barrier), 2, 0);
        completableFuture1.join();
        completableFuture2.join();
        Assert.assertEquals((Object)channelStateWriter.getAddedInput().get((Object)channel1.getChannelInfo()), Collections.emptyList());
        List storedBuffers = channelStateWriter.getAddedInput().get((Object)channel2.getChannelInfo());
        Assert.assertEquals(Arrays.asList(2, 3), storedBuffers.stream().map(Buffer::getSize).collect(Collectors.toList()));
    }

    private StreamTaskNetworkInput<Long> createInput(CheckpointBarrierHandler handler, SingleInputGate inputGate) {
        return new StreamTaskNetworkInput(new CheckpointedInputGate((InputGate)inputGate, handler), (TypeSerializer)LongSerializer.INSTANCE, new StatusWatermarkValve(inputGate.getNumberOfInputChannels(), new NoOpDataOutput()), inputGate.getGateIndex(), (RecordDeserializer[])this.createDeserializers(inputGate.getNumberOfInputChannels()));
    }

    private TestRecordDeserializer[] createDeserializers(int numberOfInputChannels) {
        return (TestRecordDeserializer[])IntStream.range(0, numberOfInputChannels).mapToObj(index -> new TestRecordDeserializer(this.ioManager.getSpillingDirectoriesPaths())).toArray(TestRecordDeserializer[]::new);
    }

    @Test
    public void testSnapshotAfterEndOfPartition() throws Exception {
        int numInputChannels = 1;
        int channelId = 0;
        int checkpointId = 0;
        VerifyRecordsDataOutput output = new VerifyRecordsDataOutput();
        LongSerializer inSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate inputGate = new StreamTestSingleInputGate(numInputChannels, 0, inSerializer, 1024);
        TestRecordDeserializer[] deserializers = this.createDeserializers(numInputChannels);
        StreamTaskNetworkInput input = new StreamTaskNetworkInput(new CheckpointedInputGate((InputGate)inputGate.getInputGate(), (CheckpointBarrierHandler)new CheckpointBarrierUnaligner((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, "test", (AbstractInvokable)new DummyCheckpointInvokable(), new InputGate[]{inputGate.getInputGate()})), (TypeSerializer)inSerializer, new StatusWatermarkValve(numInputChannels, output), 0, (RecordDeserializer[])deserializers);
        inputGate.sendEvent((AbstractEvent)new CheckpointBarrier((long)checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), channelId);
        inputGate.sendElement(new StreamRecord((Object)42L), channelId);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        Assert.assertEquals((long)1L, (long)output.getNumberOfEmittedRecords());
        inputGate.sendEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, channelId);
        input.emitNext(output);
        Assert.assertNull((Object)((Object)deserializers[channelId]));
        CompletableFuture completableFuture = input.prepareSnapshot(ChannelStateWriter.NO_OP, (long)checkpointId);
        completableFuture.join();
    }

    @Test
    public void testReleasingDeserializerTimely() throws Exception {
        int numInputChannels = 2;
        LongSerializer inSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate inputGate = new StreamTestSingleInputGate(numInputChannels, 0, inSerializer, 1024);
        TestRecordDeserializer[] deserializers = new TestRecordDeserializer[numInputChannels];
        for (int i = 0; i < deserializers.length; ++i) {
            deserializers[i] = new TestRecordDeserializer(this.ioManager.getSpillingDirectoriesPaths());
        }
        TestRecordDeserializer[] copiedDeserializers = Arrays.copyOf(deserializers, deserializers.length);
        NoOpDataOutput output = new NoOpDataOutput();
        StreamTaskNetworkInput input = new StreamTaskNetworkInput(new CheckpointedInputGate((InputGate)inputGate.getInputGate(), (CheckpointBarrierHandler)new CheckpointBarrierTracker(1, (AbstractInvokable)new DummyCheckpointInvokable())), (TypeSerializer)inSerializer, new StatusWatermarkValve(1, output), 0, (RecordDeserializer[])deserializers);
        for (int i = 0; i < numInputChannels; ++i) {
            Assert.assertNotNull((Object)((Object)deserializers[i]));
            inputGate.sendEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, i);
            input.emitNext(output);
            Assert.assertNull((Object)((Object)deserializers[i]));
            Assert.assertTrue((boolean)copiedDeserializers[i].isCleared());
        }
    }

    private BufferOrEvent createDataBuffer() throws IOException {
        BufferBuilder bufferBuilder = BufferBuilderTestUtils.createEmptyBufferBuilder((int)1000);
        BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
        this.serializeRecord(42L, bufferBuilder);
        this.serializeRecord(44L, bufferBuilder);
        return new BufferOrEvent(bufferConsumer.build(), new InputChannelInfo(0, 0), false);
    }

    private StreamTaskNetworkInput createStreamTaskNetworkInput(List<BufferOrEvent> buffers, PushingAsyncDataInput.DataOutput output) {
        return new StreamTaskNetworkInput(new CheckpointedInputGate((InputGate)new MockInputGate(1, buffers, false), (CheckpointBarrierHandler)new CheckpointBarrierTracker(1, (AbstractInvokable)new DummyCheckpointInvokable())), (TypeSerializer)LongSerializer.INSTANCE, this.ioManager, new StatusWatermarkValve(1, output), 0);
    }

    private void serializeRecord(long value, BufferBuilder bufferBuilder) throws IOException {
        SpanningRecordSerializer serializer = new SpanningRecordSerializer();
        SerializationDelegate serializationDelegate = new SerializationDelegate((TypeSerializer)new StreamElementSerializer((TypeSerializer)LongSerializer.INSTANCE));
        serializationDelegate.setInstance((Object)new StreamRecord((Object)value));
        serializer.serializeRecord((IOReadableWritable)serializationDelegate);
        Assert.assertFalse((boolean)serializer.copyToBufferBuilder(bufferBuilder).isFullBuffer());
    }

    private static void assertHasNextElement(StreamTaskNetworkInput input, PushingAsyncDataInput.DataOutput output) throws Exception {
        Assert.assertTrue((boolean)input.getAvailableFuture().isDone());
        InputStatus status = input.emitNext(output);
        Assert.assertThat((Object)status, (Matcher)Matchers.is((Object)InputStatus.MORE_AVAILABLE));
    }

    private static class VerifyRecordsDataOutput<T>
    extends NoOpDataOutput<T> {
        private int numberOfEmittedRecords;

        private VerifyRecordsDataOutput() {
        }

        @Override
        public void emitRecord(StreamRecord<T> record) {
            ++this.numberOfEmittedRecords;
        }

        int getNumberOfEmittedRecords() {
            return this.numberOfEmittedRecords;
        }
    }

    private static class NoOpDataOutput<T>
    implements PushingAsyncDataInput.DataOutput<T> {
        private NoOpDataOutput() {
        }

        public void emitRecord(StreamRecord<T> record) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void emitStreamStatus(StreamStatus streamStatus) {
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    private static class TestRecordDeserializer
    extends SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>> {
        private boolean cleared = false;

        public TestRecordDeserializer(String[] tmpDirectories) {
            super(tmpDirectories);
        }

        public void clear() {
            this.cleared = true;
        }

        public boolean isCleared() {
            return this.cleared;
        }
    }
}

