package org.apache.flink.streaming.api.function.source;

import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/api/function/source/FileSourceFunction.class */
public class FileSourceFunction extends RichParallelSourceFunction<String> {
    private static final long serialVersionUID = 1;
    private InputSplitProvider provider;
    private InputFormat<String, ?> inputFormat;
    private TypeInformation<String> typeInfo;
    private volatile boolean isRunning;

    public FileSourceFunction(InputFormat<String, ?> inputFormat, TypeInformation<String> typeInformation) {
        this.inputFormat = inputFormat;
        this.typeInfo = typeInformation;
    }

    public void open(Configuration configuration) throws Exception {
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        this.provider = runtimeContext.getInputSplitProvider();
        this.inputFormat.configure(runtimeContext.getTaskStubParameters());
    }

    @Override // org.apache.flink.streaming.api.function.source.SourceFunction
    public void run(Collector<String> collector) throws Exception {
        this.isRunning = true;
        TypeSerializer createSerializer = this.typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
        Iterator<InputSplit> inputSplits = getInputSplits();
        InputFormat<String, ?> inputFormat = this.inputFormat;
        while (this.isRunning && inputSplits.hasNext()) {
            try {
                InputSplit next = inputSplits.next();
                String str = (String) createSerializer.createInstance();
                inputFormat.open(next);
                while (!inputFormat.reachedEnd()) {
                    String str2 = (String) inputFormat.nextRecord(str);
                    str = str2;
                    if (str2 != null) {
                        collector.collect(str);
                    }
                }
            } catch (Throwable th) {
                inputFormat.close();
                throw th;
            }
        }
        collector.close();
        inputFormat.close();
        this.isRunning = false;
    }

    private Iterator<InputSplit> getInputSplits() {
        return new Iterator<InputSplit>() { // from class: org.apache.flink.streaming.api.function.source.FileSourceFunction.1
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                InputSplit nextInputSplit = FileSourceFunction.this.provider.getNextInputSplit();
                if (nextInputSplit != null) {
                    this.nextSplit = nextInputSplit;
                    return true;
                }
                this.exhausted = true;
                return false;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit inputSplit = this.nextSplit;
                this.nextSplit = null;
                return inputSplit;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    @Override // org.apache.flink.streaming.api.function.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
    }
}
