/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.recipes.spark;

import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.mapreduce.FluoEntryInputFormat;
import org.apache.fluo.mapreduce.FluoKeyValue;
import org.apache.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.fluo.recipes.spark.AccumuloRangePartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class FluoSparkHelper {
    private static final Logger log = LoggerFactory.getLogger(FluoSparkHelper.class);
    private static AtomicInteger tempDirCounter = new AtomicInteger(0);
    private FluoConfiguration fluoConfig;
    private Configuration hadoopConfig;
    private Path tempBaseDir;
    private FileSystem hdfs;
    private Connector defaultConn;

    public FluoSparkHelper(FluoConfiguration fluoConfig, Configuration hadoopConfig, Path tempBaseDir) {
        this.fluoConfig = fluoConfig;
        this.hadoopConfig = hadoopConfig;
        this.tempBaseDir = tempBaseDir;
        this.defaultConn = FluoSparkHelper.getAccumuloConnector(fluoConfig);
        try {
            this.hdfs = FileSystem.get((Configuration)hadoopConfig);
        }
        catch (IOException e) {
            throw new IllegalStateException("Unable to get HDFS client from hadoop config", e);
        }
    }

    public static JavaPairRDD<RowColumn, Bytes> toPairRDD(JavaRDD<RowColumnValue> rcvRDD) {
        return rcvRDD.mapToPair((PairFunction & Serializable)rcv -> new Tuple2((Object)rcv.getRowColumn(), (Object)rcv.getValue()));
    }

    public static JavaRDD<RowColumnValue> toRcvRDD(JavaPairRDD<RowColumn, Bytes> pairRDD) {
        return pairRDD.map((Function & Serializable)t -> new RowColumnValue(((RowColumn)t._1()).getRow(), ((RowColumn)t._1()).getColumn(), (Bytes)t._2()));
    }

    private static Instance getInstance(FluoConfiguration config) {
        ClientConfiguration clientConfig = new ClientConfiguration(new org.apache.commons.configuration.Configuration[0]).withInstance(config.getAccumuloInstance()).withZkHosts(config.getAccumuloZookeepers()).withZkTimeout(config.getZookeeperTimeout() / 1000);
        return new ZooKeeperInstance((org.apache.commons.configuration.Configuration)clientConfig);
    }

    private static Connector getAccumuloConnector(FluoConfiguration config) {
        try {
            return FluoSparkHelper.getInstance(config).getConnector(config.getAccumuloUser(), (AuthenticationToken)new PasswordToken((CharSequence)config.getAccumuloPassword()));
        }
        catch (AccumuloException | AccumuloSecurityException e) {
            throw new IllegalStateException(e);
        }
    }

    public JavaPairRDD<RowColumn, Bytes> readFromFluo(JavaSparkContext ctx) {
        Job job;
        try {
            job = Job.getInstance((Configuration)this.hadoopConfig);
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
        FluoEntryInputFormat.configure((Job)job, (SimpleConfiguration)this.fluoConfig);
        return ctx.newAPIHadoopRDD(job.getConfiguration(), FluoEntryInputFormat.class, RowColumn.class, Bytes.class);
    }

    public void bulkImportRcvToFluo(JavaPairRDD<RowColumn, Bytes> data, BulkImportOptions opts) {
        data = this.partitionForAccumulo(data, this.fluoConfig.getAccumuloTable(), opts);
        JavaPairRDD kvData = data.flatMapToPair((PairFlatMapFunction & Serializable)tuple -> {
            LinkedList<Tuple2> output = new LinkedList<Tuple2>();
            RowColumn rc = (RowColumn)tuple._1();
            FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
            fkvg.setRow(rc.getRow()).setColumn(rc.getColumn()).setValue(((Bytes)tuple._2()).toArray());
            for (FluoKeyValue kv : fkvg.getKeyValues()) {
                output.add(new Tuple2((Object)kv.getKey(), (Object)kv.getValue()));
            }
            return output;
        });
        this.bulkImportKvToAccumulo((JavaPairRDD<Key, Value>)kvData, this.fluoConfig.getAccumuloTable(), opts);
    }

    public void bulkImportKvToFluo(JavaPairRDD<Key, Value> data, BulkImportOptions opts) {
        this.bulkImportKvToAccumulo(data, this.fluoConfig.getAccumuloTable(), opts);
    }

    public void bulkImportRcvToAccumulo(JavaPairRDD<RowColumn, Bytes> data, String accumuloTable, BulkImportOptions opts) {
        data = this.partitionForAccumulo(data, accumuloTable, opts);
        JavaPairRDD kvData = data.mapToPair((PairFunction & Serializable)tuple -> {
            RowColumn rc = (RowColumn)tuple._1();
            byte[] row = rc.getRow().toArray();
            byte[] cf = rc.getColumn().getFamily().toArray();
            byte[] cq = rc.getColumn().getQualifier().toArray();
            byte[] val = ((Bytes)tuple._2()).toArray();
            return new Tuple2((Object)new Key(new Text(row), new Text(cf), new Text(cq), 0L), (Object)new Value(val));
        });
        this.bulkImportKvToAccumulo((JavaPairRDD<Key, Value>)kvData, accumuloTable, opts);
    }

    public void bulkImportKvToAccumulo(JavaPairRDD<Key, Value> data, String accumuloTable, BulkImportOptions opts) {
        Path tempDir = this.getTempDir(opts);
        Connector conn = this.chooseConnector(opts);
        try {
            if (this.hdfs.exists(tempDir)) {
                throw new IllegalArgumentException("HDFS temp dir already exists: " + tempDir.toString());
            }
            this.hdfs.mkdirs(tempDir);
            Path dataDir = new Path(tempDir.toString() + "/data");
            Path failDir = new Path(tempDir.toString() + "/fail");
            this.hdfs.mkdirs(failDir);
            Job job = Job.getInstance((Configuration)this.hadoopConfig);
            AccumuloFileOutputFormat.setOutputPath((Job)job, (Path)dataDir);
            data.saveAsNewAPIHadoopFile(dataDir.toString(), Key.class, Value.class, AccumuloFileOutputFormat.class, job.getConfiguration());
            log.info("Wrote data for bulk import to HDFS temp directory: {}", (Object)dataDir);
            conn.tableOperations().importDirectory(accumuloTable, dataDir.toString(), failDir.toString(), false);
            if (this.hdfs.listFiles(failDir, true).hasNext()) {
                throw new IllegalStateException("Bulk import failed!  Found files that failed to import in failures directory: " + failDir);
            }
            log.info("Successfully bulk imported data in {} to '{}' Accumulo table", (Object)dataDir, (Object)accumuloTable);
            this.hdfs.delete(tempDir, true);
            log.info("Deleted HDFS temp directory created for bulk import: {}", (Object)tempDir);
        }
        catch (IOException | AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

    private Path getPossibleTempDir() {
        return new Path(this.tempBaseDir.toString() + "/" + tempDirCounter.getAndIncrement());
    }

    private Path getTempDir(BulkImportOptions opts) {
        Path tempDir;
        if (opts.tempDir == null) {
            try {
                tempDir = this.getPossibleTempDir();
                while (this.hdfs.exists(tempDir)) {
                    tempDir = this.getPossibleTempDir();
                }
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
        } else {
            tempDir = opts.tempDir;
        }
        return tempDir;
    }

    private JavaPairRDD<RowColumn, Bytes> partitionForAccumulo(JavaPairRDD<RowColumn, Bytes> data, String accumuloTable, BulkImportOptions opts) {
        AccumuloRangePartitioner accumuloPartitioner;
        try {
            accumuloPartitioner = new AccumuloRangePartitioner(this.chooseConnector(opts).tableOperations().listSplits(accumuloTable));
        }
        catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
            throw new IllegalStateException(e);
        }
        return data.repartitionAndSortWithinPartitions((Partitioner)accumuloPartitioner);
    }

    private Connector chooseConnector(BulkImportOptions opts) {
        return opts.conn == null ? this.defaultConn : opts.conn;
    }

    public static class BulkImportOptions {
        public static BulkImportOptions DEFAULT = new BulkImportOptions();
        Connector conn = null;
        Path tempDir = null;

        public BulkImportOptions setAccumuloConnector(Connector conn) {
            Objects.requireNonNull(conn);
            this.conn = conn;
            return this;
        }

        public BulkImportOptions setTempDir(Path tempDir) {
            Objects.requireNonNull(tempDir);
            this.tempDir = tempDir;
            return this;
        }
    }
}

