package org.apache.flink.streaming.python.api.functions;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.python.util.InterpreterUtils;
import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
import org.python.core.PyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.class */
public class PythonIteratorFunction extends RichSourceFunction<Object> {
    private static final long serialVersionUID = 6741748297048588334L;
    private static final Logger LOG = LoggerFactory.getLogger(PythonIteratorFunction.class);
    private final byte[] serFun;
    private transient Iterator<Object> fun;
    private volatile transient boolean isRunning;

    public PythonIteratorFunction(Iterator<Object> it) throws IOException {
        this.serFun = SerializationUtils.serializeObject(it);
    }

    public void open(Configuration configuration) throws Exception {
        this.fun = (Iterator) InterpreterUtils.deserializeFunction(getRuntimeContext(), this.serFun);
        if (this.fun instanceof RichFunction) {
            try {
                RichFunction richFunction = this.fun;
                richFunction.setRuntimeContext(getRuntimeContext());
                richFunction.open(configuration);
            } catch (PyException e) {
                throw AbstractPythonUDF.createAndLogException(e, LOG);
            }
        }
    }

    public void run(SourceFunction.SourceContext<Object> sourceContext) throws Exception {
        while (this.isRunning && this.fun.hasNext()) {
            try {
                sourceContext.collect(this.fun.next());
            } catch (PyException e) {
                throw AbstractPythonUDF.createAndLogException(e, LOG);
            }
        }
    }

    public void close() throws Exception {
        if (this.fun instanceof RichFunction) {
            try {
                this.fun.close();
            } catch (PyException e) {
                throw AbstractPythonUDF.createAndLogException(e, LOG);
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }
}
