package org.apache.flink.streaming.api.streamvertex;

import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.IndexedMutableReader;
import org.apache.flink.streaming.io.IndexedReaderIterator;
import org.apache.flink.streaming.io.InputGateFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/streamvertex/InputHandler.class */
public class InputHandler<IN> {
    private StreamRecordSerializer<IN> inputSerializer = null;
    private IndexedReaderIterator<StreamRecord<IN>> inputIter;
    private IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>> inputs;
    private StreamVertex<IN, ?> streamVertex;
    private StreamConfig configuration;

    public InputHandler(StreamVertex<IN, ?> streamVertex) {
        this.streamVertex = streamVertex;
        this.configuration = new StreamConfig(streamVertex.getTaskConfiguration());
        try {
            setConfigInputs();
        } catch (Exception e) {
            throw new StreamVertexException("Cannot register inputs for " + getClass().getSimpleName(), e);
        }
    }

    protected void setConfigInputs() throws StreamVertexException {
        this.inputSerializer = this.configuration.getTypeSerializerIn1(this.streamVertex.userClassLoader);
        if (this.configuration.getNumberOfInputs() > 0) {
            this.inputs = new IndexedMutableReader<>(InputGateFactory.createInputGate(this.streamVertex.getEnvironment().getAllInputGates()));
            this.inputs.registerTaskEventListener(this.streamVertex.getSuperstepListener(), StreamingSuperstep.class);
            this.inputIter = new IndexedReaderIterator<>(this.inputs, this.inputSerializer);
        }
    }

    protected static <T> IndexedReaderIterator<StreamRecord<T>> staticCreateInputIterator(MutableReader<?> mutableReader, TypeSerializer<StreamRecord<T>> typeSerializer) {
        return new IndexedReaderIterator<>((IndexedMutableReader) mutableReader, typeSerializer);
    }

    public StreamRecordSerializer<IN> getInputSerializer() {
        return this.inputSerializer;
    }

    public IndexedReaderIterator<StreamRecord<IN>> getInputIter() {
        return this.inputIter;
    }

    public void clearReaders() throws IOException {
        if (this.inputs != null) {
            this.inputs.clearBuffers();
            this.inputs.cleanup();
        }
    }
}
