package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/CoRecordReader.class */
public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends AbstractReader implements EventListener<InputGate>, StreamingReader {
    private final InputGate bufferReader1;
    private final InputGate bufferReader2;
    private final LinkedBlockingDeque<Integer> availableRecordReaders;
    private LinkedList<Integer> processed;
    private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
    private RecordDeserializer<T1> reader1currentRecordDeserializer;
    private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
    private RecordDeserializer<T2> reader2currentRecordDeserializer;
    private int currentReaderIndex;
    private boolean hasRequestedPartitions;
    protected CoRecordReader<T1, T2>.CoBarrierBuffer barrierBuffer1;
    protected CoRecordReader<T1, T2>.CoBarrierBuffer barrierBuffer2;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/CoRecordReader$CoBarrierBuffer.class */
    public class CoBarrierBuffer extends BarrierBuffer {
        private CoRecordReader<T1, T2>.CoBarrierBuffer otherBuffer;

        public CoBarrierBuffer(InputGate inputGate, AbstractReader abstractReader) {
            super(inputGate, abstractReader);
        }

        public void setOtherBarrierBuffer(CoRecordReader<T1, T2>.CoBarrierBuffer coBarrierBuffer) {
            this.otherBuffer = coBarrierBuffer;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.streaming.runtime.io.BarrierBuffer
        public void actOnAllBlocked() {
            if (this.otherBuffer.isAllBlocked()) {
                super.actOnAllBlocked();
                this.otherBuffer.releaseBlocks();
            }
        }
    }

    public CoRecordReader(InputGate inputGate, InputGate inputGate2) {
        super(new UnionInputGate(new InputGate[]{inputGate, inputGate2}));
        this.availableRecordReaders = new LinkedBlockingDeque<>();
        this.processed = new LinkedList<>();
        this.bufferReader1 = inputGate;
        this.bufferReader2 = inputGate2;
        this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
        this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputGate2.getNumberOfInputChannels()];
        for (int i = 0; i < this.reader1RecordDeserializers.length; i++) {
            this.reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer();
        }
        for (int i2 = 0; i2 < this.reader2RecordDeserializers.length; i2++) {
            this.reader2RecordDeserializers[i2] = new AdaptiveSpanningRecordDeserializer();
        }
        inputGate.registerListener(this);
        inputGate2.registerListener(this);
        this.barrierBuffer1 = new CoBarrierBuffer(inputGate, this);
        this.barrierBuffer2 = new CoBarrierBuffer(inputGate2, this);
        this.barrierBuffer1.setOtherBarrierBuffer(this.barrierBuffer2);
        this.barrierBuffer2.setOtherBarrierBuffer(this.barrierBuffer1);
    }

    public void requestPartitionsOnce() throws IOException, InterruptedException {
        if (this.hasRequestedPartitions) {
            return;
        }
        this.bufferReader1.requestPartitions();
        this.bufferReader2.requestPartitions();
        this.hasRequestedPartitions = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getNextRecord(T1 t1, T2 t2) throws IOException, InterruptedException {
        requestPartitionsOnce();
        while (true) {
            if (this.currentReaderIndex == 0) {
                if (this.bufferReader1.isFinished() && this.bufferReader2.isFinished()) {
                    return 0;
                }
                this.currentReaderIndex = getNextReaderIndexBlocking();
            }
            if (this.currentReaderIndex == 1) {
                while (true) {
                    if (this.reader1currentRecordDeserializer != null) {
                        RecordDeserializer.DeserializationResult nextRecord = this.reader1currentRecordDeserializer.getNextRecord(t1);
                        if (nextRecord.isBufferConsumed()) {
                            this.reader1currentRecordDeserializer.getCurrentBuffer().recycle();
                            this.reader1currentRecordDeserializer = null;
                            this.currentReaderIndex = 0;
                        }
                        if (nextRecord.isFullRecord()) {
                            return 1;
                        }
                    } else {
                        BufferOrEvent nextNonBlocked = this.barrierBuffer1.getNextNonBlocked();
                        if (nextNonBlocked.isBuffer()) {
                            this.reader1currentRecordDeserializer = this.reader1RecordDeserializers[nextNonBlocked.getChannelIndex()];
                            this.reader1currentRecordDeserializer.setNextBuffer(nextNonBlocked.getBuffer());
                        } else {
                            if (nextNonBlocked.getEvent() instanceof StreamingSuperstep) {
                                this.barrierBuffer1.processSuperstep(nextNonBlocked);
                                this.currentReaderIndex = 0;
                                break;
                            }
                            if (handleEvent(nextNonBlocked.getEvent())) {
                                this.currentReaderIndex = 0;
                                break;
                            }
                        }
                    }
                }
            } else {
                if (this.currentReaderIndex != 2) {
                    throw new IllegalStateException("Bug: unexpected current reader index.");
                }
                while (true) {
                    if (this.reader2currentRecordDeserializer != null) {
                        RecordDeserializer.DeserializationResult nextRecord2 = this.reader2currentRecordDeserializer.getNextRecord(t2);
                        if (nextRecord2.isBufferConsumed()) {
                            this.reader2currentRecordDeserializer.getCurrentBuffer().recycle();
                            this.reader2currentRecordDeserializer = null;
                            this.currentReaderIndex = 0;
                        }
                        if (nextRecord2.isFullRecord()) {
                            return 2;
                        }
                    } else {
                        BufferOrEvent nextNonBlocked2 = this.barrierBuffer2.getNextNonBlocked();
                        if (nextNonBlocked2.isBuffer()) {
                            this.reader2currentRecordDeserializer = this.reader2RecordDeserializers[nextNonBlocked2.getChannelIndex()];
                            this.reader2currentRecordDeserializer.setNextBuffer(nextNonBlocked2.getBuffer());
                        } else {
                            if (nextNonBlocked2.getEvent() instanceof StreamingSuperstep) {
                                this.barrierBuffer2.processSuperstep(nextNonBlocked2);
                                this.currentReaderIndex = 0;
                                break;
                            }
                            if (handleEvent(nextNonBlocked2.getEvent())) {
                                this.currentReaderIndex = 0;
                                break;
                            }
                        }
                    }
                }
            }
        }
    }

    protected int getNextReaderIndexBlocking() throws InterruptedException {
        Integer take;
        while (true) {
            LinkedList<Integer> linkedList = this.processed;
            take = this.availableRecordReaders.take();
            if (!linkedList.contains(take)) {
                break;
            }
            this.processed.remove(take);
        }
        if (take.intValue() == 1) {
            if (!this.barrierBuffer1.isAllBlocked()) {
                return 1;
            }
            this.availableRecordReaders.addFirst(1);
            this.processed.add(2);
            return 2;
        }
        if (!this.barrierBuffer2.isAllBlocked()) {
            return 2;
        }
        this.availableRecordReaders.addFirst(2);
        this.processed.add(1);
        return 1;
    }

    public void onEvent(InputGate inputGate) {
        addToAvailable(inputGate);
    }

    protected void addToAvailable(InputGate inputGate) {
        if (inputGate == this.bufferReader1) {
            this.availableRecordReaders.add(1);
        } else if (inputGate == this.bufferReader2) {
            this.availableRecordReaders.add(2);
        }
    }

    public void clearBuffers() {
        for (RecordDeserializer recordDeserializer : this.reader1RecordDeserializers) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycle();
            }
        }
        for (RecordDeserializer recordDeserializer2 : this.reader2RecordDeserializers) {
            Buffer currentBuffer2 = recordDeserializer2.getCurrentBuffer();
            if (currentBuffer2 != null && !currentBuffer2.isRecycled()) {
                currentBuffer2.recycle();
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamingReader
    public void cleanup() throws IOException {
        try {
            this.barrierBuffer1.cleanup();
        } finally {
            this.barrierBuffer2.cleanup();
        }
    }
}
