/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.python.api.functions;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.python.api.functions.AbstractPythonUDF;
import org.apache.flink.streaming.python.util.InterpreterUtils;
import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
import org.python.core.PyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonIteratorFunction
extends RichSourceFunction<Object> {
    private static final long serialVersionUID = 6741748297048588334L;
    private static final Logger LOG = LoggerFactory.getLogger(PythonIteratorFunction.class);
    private final byte[] serFun;
    private transient Iterator<Object> fun;
    private volatile transient boolean isRunning;

    public PythonIteratorFunction(Iterator<Object> fun) throws IOException {
        this.serFun = SerializationUtils.serializeObject(fun);
    }

    public void open(Configuration parameters) throws Exception {
        this.fun = (Iterator)InterpreterUtils.deserializeFunction(this.getRuntimeContext(), this.serFun);
        if (this.fun instanceof RichFunction) {
            try {
                RichFunction winFun = (RichFunction)this.fun;
                winFun.setRuntimeContext(this.getRuntimeContext());
                winFun.open(parameters);
            }
            catch (PyException pe) {
                throw AbstractPythonUDF.createAndLogException(pe, LOG);
            }
        }
    }

    public void run(SourceFunction.SourceContext<Object> ctx) throws Exception {
        try {
            while (this.isRunning && this.fun.hasNext()) {
                ctx.collect(this.fun.next());
            }
        }
        catch (PyException pe) {
            throw AbstractPythonUDF.createAndLogException(pe, LOG);
        }
    }

    public void close() throws Exception {
        if (this.fun instanceof RichFunction) {
            try {
                ((RichFunction)this.fun).close();
            }
            catch (PyException pe) {
                throw AbstractPythonUDF.createAndLogException(pe, LOG);
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }
}

