/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nemo.compiler.frontend.spark.source;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.naming.OperationNotSupportedException;
import org.apache.nemo.common.ir.BoundedIteratorReadable;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.compiler.frontend.spark.source.SparkSourceUtil;
import org.apache.nemo.compiler.frontend.spark.sql.Dataset;
import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
import org.apache.spark.Partition;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.rdd.RDD;
import scala.collection.Iterator;
import scala.collection.JavaConverters;

public final class SparkDatasetBoundedSourceVertex<T>
extends SourceVertex<T> {
    private List<Readable<T>> readables = new ArrayList<Readable<T>>();
    private long estimatedByteSize;

    public SparkDatasetBoundedSourceVertex(SparkSession sparkSession, Dataset<T> dataset) {
        RDD<T> rdd = dataset.sparkRDD();
        Partition[] partitions = rdd.getPartitions();
        for (int i = 0; i < partitions.length; ++i) {
            this.readables.add((Readable<T>)new SparkDatasetBoundedSourceReadable(partitions[i], sparkSession.getDatasetCommandsList(), sparkSession.getInitialConf(), i));
        }
        this.estimatedByteSize = (Long)dataset.javaRDD().map((Function & Serializable)o -> o.toString().getBytes("UTF-8").length).reduce((Function2 & Serializable)(a, b) -> a + b);
    }

    private SparkDatasetBoundedSourceVertex(SparkDatasetBoundedSourceVertex<T> that) {
        super(that);
        that.readables.forEach(this.readables::add);
    }

    public SparkDatasetBoundedSourceVertex getClone() {
        return new SparkDatasetBoundedSourceVertex<T>(this);
    }

    public boolean isBounded() {
        return true;
    }

    public List<Readable<T>> getReadables(int desiredNumOfSplits) {
        return this.readables;
    }

    public long getEstimatedSizeBytes() {
        return this.estimatedByteSize;
    }

    public void clearInternalStates() {
        this.readables = null;
    }

    private final class SparkDatasetBoundedSourceReadable
    extends BoundedIteratorReadable<T> {
        private final LinkedHashMap<String, Object[]> commands;
        private final Map<String, String> sessionInitialConf;
        private final int partitionIndex;
        private final List<String> locations;

        private SparkDatasetBoundedSourceReadable(Partition partition, LinkedHashMap<String, Object[]> commands, Map<String, String> sessionInitialConf, int partitionIndex) {
            this.commands = commands;
            this.sessionInitialConf = sessionInitialConf;
            this.partitionIndex = partitionIndex;
            this.locations = SparkSourceUtil.getPartitionLocation(partition);
        }

        protected java.util.Iterator<T> initializeIterator() {
            Dataset dataset;
            SparkSession spark = SparkSession.builder().config(this.sessionInitialConf).getOrCreate();
            try {
                dataset = SparkSession.initializeDataset(spark, this.commands);
            }
            catch (OperationNotSupportedException e) {
                throw new IllegalStateException(e);
            }
            RDD rdd = dataset.sparkRDD();
            Iterable iterable = () -> (java.util.Iterator)JavaConverters.asJavaIteratorConverter((Iterator)rdd.iterator(rdd.getPartitions()[this.partitionIndex], (TaskContext)TaskContext$.MODULE$.empty())).asJava();
            return iterable.iterator();
        }

        public long readWatermark() {
            throw new UnsupportedOperationException("No watermark");
        }

        public List<String> getLocations() {
            if (this.locations.isEmpty()) {
                throw new UnsupportedOperationException();
            }
            return this.locations;
        }

        public void close() throws IOException {
        }
    }
}

