/*
 * Decompiled with CFR 0.152.
 */
package io.warp10.spark;

import io.warp10.script.WarpScriptException;
import io.warp10.spark.common.SparkUtils;
import io.warp10.spark.common.WarpScriptAbstractFunction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;

public class WarpScriptMapPartitionsFunction<T, U>
extends WarpScriptAbstractFunction
implements MapPartitionsFunction<T, U>,
FlatMapFunction<T, U> {
    public WarpScriptMapPartitionsFunction(String code) throws WarpScriptException {
        super(code);
    }

    public Iterator<U> call(T t) throws Exception {
        Iterator iter = (Iterator)t;
        return this.call(iter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Iterator<U> call(final Iterator<T> t) throws Exception {
        WarpScriptMapPartitionsFunction warpScriptMapPartitionsFunction = this;
        synchronized (warpScriptMapPartitionsFunction) {
            return new Iterator<U>(){
                final List<Object> records = new ArrayList<Object>();
                int recordidx = 0;
                boolean done = false;
                final Iterator<T> iter = t;

                @Override
                public boolean hasNext() {
                    if (this.records.size() != this.recordidx) {
                        return true;
                    }
                    if (this.done) {
                        return false;
                    }
                    this.fillBuffer();
                    return this.hasNext();
                }

                @Override
                public U next() {
                    if (this.records.size() != this.recordidx) {
                        Object record = this.records.get(this.recordidx++);
                        if (this.records.size() == this.recordidx) {
                            this.records.clear();
                            this.recordidx = 0;
                        }
                        return record;
                    }
                    if (this.done) {
                        throw new NoSuchElementException();
                    }
                    this.fillBuffer();
                    return this.next();
                }

                private void fillBuffer() throws RuntimeException {
                    while (!this.done) {
                        Object result;
                        int i;
                        List results;
                        ArrayList<Object> input;
                        boolean hasInput = this.iter.hasNext();
                        if (hasInput) {
                            input = new ArrayList<Object>();
                            input.add(this.done);
                            input.add(SparkUtils.fromSpark(this.iter.next()));
                            try {
                                results = WarpScriptMapPartitionsFunction.this.executor.exec(input);
                                if (results.isEmpty()) continue;
                                for (i = results.size() - 1; i >= 0; --i) {
                                    result = results.get(i);
                                    this.records.add(SparkUtils.toSpark(result));
                                }
                                continue;
                            }
                            catch (WarpScriptException wse) {
                                throw new RuntimeException(wse);
                            }
                        }
                        this.done = true;
                        input = new ArrayList();
                        input.add(true);
                        try {
                            results = WarpScriptMapPartitionsFunction.this.executor.exec(input);
                            if (results.isEmpty()) {
                                return;
                            }
                            for (i = results.size() - 1; i >= 0; --i) {
                                result = results.get(i);
                                this.records.add(SparkUtils.toSpark(result));
                            }
                        }
                        catch (WarpScriptException wse) {
                            throw new RuntimeException(wse);
                        }
                    }
                }
            };
        }
    }

    public static WarpScriptMapPartitionsFunction getInstance(String code) throws WarpScriptException {
        return new WarpScriptMapPartitionsFunction(code);
    }
}

