package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.class */
public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
    private final int numInputChannels;
    private final TestInputChannel[] inputChannels;
    private final int bufferSize;
    private TypeSerializer<T> serializer;
    private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate$InputValue.class */
    public static class InputValue<T> {
        private Object elementOrEvent;
        private boolean isStreamEnd;
        private boolean isStreamRecord;
        private boolean isEvent;

        private InputValue(Object obj, boolean z, boolean z2, boolean z3) {
            this.elementOrEvent = obj;
            this.isStreamEnd = z;
            this.isStreamRecord = z3;
            this.isEvent = z2;
        }

        public static <X> InputValue<X> element(Object obj) {
            return new InputValue<>(obj, false, false, true);
        }

        public static <X> InputValue<X> streamEnd() {
            return new InputValue<>(null, true, false, false);
        }

        public static <X> InputValue<X> event(AbstractEvent abstractEvent) {
            return new InputValue<>(abstractEvent, false, true, false);
        }

        public Object getStreamRecord() {
            return this.elementOrEvent;
        }

        public AbstractEvent getEvent() {
            return (AbstractEvent) this.elementOrEvent;
        }

        public boolean isStreamEnd() {
            return this.isStreamEnd;
        }

        public boolean isStreamRecord() {
            return this.isStreamRecord;
        }

        public boolean isEvent() {
            return this.isEvent;
        }
    }

    public StreamTestSingleInputGate(int i, int i2, TypeSerializer<T> typeSerializer) throws IOException, InterruptedException {
        super(i, false);
        this.bufferSize = i2;
        this.serializer = typeSerializer;
        this.numInputChannels = i;
        this.inputChannels = new TestInputChannel[i];
        this.inputQueues = new ConcurrentLinkedQueue[i];
        setupInputChannels();
        ((SingleInputGate) Mockito.doReturn(Integer.valueOf(i2)).when(this.inputGate)).getPageSize();
    }

    private void setupInputChannels() throws IOException, InterruptedException {
        for (int i = 0; i < this.numInputChannels; i++) {
            final int i2 = i;
            final SpanningRecordSerializer spanningRecordSerializer = new SpanningRecordSerializer();
            final SerializationDelegate serializationDelegate = new SerializationDelegate(new StreamElementSerializer(this.serializer));
            this.inputQueues[i2] = new ConcurrentLinkedQueue<>();
            this.inputChannels[i2] = new TestInputChannel(this.inputGate, i);
            Mockito.when(this.inputChannels[i2].getInputChannel().getNextBuffer()).thenAnswer(new Answer<InputChannel.BufferAndAvailability>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public InputChannel.BufferAndAvailability m0answer(InvocationOnMock invocationOnMock) throws Throwable {
                    InputChannel.BufferAndAvailability m0answer;
                    InputValue inputValue = (InputValue) StreamTestSingleInputGate.this.inputQueues[i2].poll();
                    if (inputValue != null && inputValue.isStreamEnd()) {
                        Mockito.when(Boolean.valueOf(StreamTestSingleInputGate.this.inputChannels[i2].getInputChannel().isReleased())).thenReturn(true);
                        return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
                    }
                    if (inputValue != null && inputValue.isStreamRecord()) {
                        Object streamRecord = inputValue.getStreamRecord();
                        spanningRecordSerializer.setNextBuffer(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(StreamTestSingleInputGate.this.bufferSize), (BufferRecycler) Mockito.mock(BufferRecycler.class)));
                        serializationDelegate.setInstance(streamRecord);
                        spanningRecordSerializer.addRecord(serializationDelegate);
                        return new InputChannel.BufferAndAvailability(spanningRecordSerializer.getCurrentBuffer(), false);
                    }
                    if (inputValue != null && inputValue.isEvent()) {
                        return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(inputValue.getEvent()), false);
                    }
                    synchronized (StreamTestSingleInputGate.this.inputQueues[i2]) {
                        StreamTestSingleInputGate.this.inputQueues[i2].wait();
                        m0answer = m0answer(invocationOnMock);
                    }
                    return m0answer;
                }
            });
            this.inputGate.setInputChannel(new IntermediateResultPartitionID(), this.inputChannels[i2].getInputChannel());
        }
    }

    public void sendElement(Object obj, int i) {
        synchronized (this.inputQueues[i]) {
            this.inputQueues[i].add(InputValue.element(obj));
            this.inputQueues[i].notifyAll();
        }
        this.inputGate.notifyChannelNonEmpty(this.inputChannels[i].getInputChannel());
    }

    public void sendEvent(AbstractEvent abstractEvent, int i) {
        synchronized (this.inputQueues[i]) {
            this.inputQueues[i].add(InputValue.event(abstractEvent));
            this.inputQueues[i].notifyAll();
        }
        this.inputGate.notifyChannelNonEmpty(this.inputChannels[i].getInputChannel());
    }

    public void endInput() {
        for (int i = 0; i < this.numInputChannels; i++) {
            synchronized (this.inputQueues[i]) {
                this.inputQueues[i].add(InputValue.streamEnd());
                this.inputQueues[i].notifyAll();
            }
            this.inputGate.notifyChannelNonEmpty(this.inputChannels[i].getInputChannel());
        }
    }

    public boolean allQueuesEmpty() {
        for (int i = 0; i < this.numInputChannels; i++) {
            if (this.inputQueues[i].size() > 0) {
                return false;
            }
        }
        return true;
    }
}
