package org.apache.fluo.recipes.spark;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.mapreduce.FluoEntryInputFormat;
import org.apache.fluo.mapreduce.FluoKeyValue;
import org.apache.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/fluo/recipes/spark/FluoSparkHelper.class */
public class FluoSparkHelper {
    private static final Logger log = LoggerFactory.getLogger(FluoSparkHelper.class);
    private static AtomicInteger tempDirCounter = new AtomicInteger(0);
    private FluoConfiguration fluoConfig;
    private Configuration hadoopConfig;
    private Path tempBaseDir;
    private FileSystem hdfs;
    private Connector defaultConn;

    /* loaded from: input_file:org/apache/fluo/recipes/spark/FluoSparkHelper$BulkImportOptions.class */
    public static class BulkImportOptions {
        public static BulkImportOptions DEFAULT = new BulkImportOptions();
        Connector conn = null;
        Path tempDir = null;

        public BulkImportOptions setAccumuloConnector(Connector connector) {
            Objects.requireNonNull(connector);
            this.conn = connector;
            return this;
        }

        public BulkImportOptions setTempDir(Path path) {
            Objects.requireNonNull(path);
            this.tempDir = path;
            return this;
        }
    }

    public FluoSparkHelper(FluoConfiguration fluoConfiguration, Configuration configuration, Path path) {
        this.fluoConfig = fluoConfiguration;
        this.hadoopConfig = configuration;
        this.tempBaseDir = path;
        this.defaultConn = getAccumuloConnector(fluoConfiguration);
        try {
            this.hdfs = FileSystem.get(configuration);
        } catch (IOException e) {
            throw new IllegalStateException("Unable to get HDFS client from hadoop config", e);
        }
    }

    public static JavaPairRDD<RowColumn, Bytes> toPairRDD(JavaRDD<RowColumnValue> javaRDD) {
        return javaRDD.mapToPair(rowColumnValue -> {
            return new Tuple2(rowColumnValue.getRowColumn(), rowColumnValue.getValue());
        });
    }

    public static JavaRDD<RowColumnValue> toRcvRDD(JavaPairRDD<RowColumn, Bytes> javaPairRDD) {
        return javaPairRDD.map(tuple2 -> {
            return new RowColumnValue(((RowColumn) tuple2._1()).getRow(), ((RowColumn) tuple2._1()).getColumn(), (Bytes) tuple2._2());
        });
    }

    private static Instance getInstance(FluoConfiguration fluoConfiguration) {
        return new ZooKeeperInstance(new ClientConfiguration(new org.apache.commons.configuration.Configuration[0]).withInstance(fluoConfiguration.getAccumuloInstance()).withZkHosts(fluoConfiguration.getAccumuloZookeepers()).withZkTimeout(fluoConfiguration.getZookeeperTimeout() / 1000));
    }

    private static Connector getAccumuloConnector(FluoConfiguration fluoConfiguration) {
        try {
            return getInstance(fluoConfiguration).getConnector(fluoConfiguration.getAccumuloUser(), new PasswordToken(fluoConfiguration.getAccumuloPassword()));
        } catch (AccumuloException | AccumuloSecurityException e) {
            throw new IllegalStateException((Throwable) e);
        }
    }

    public JavaPairRDD<RowColumn, Bytes> readFromFluo(JavaSparkContext javaSparkContext) {
        try {
            Job job = Job.getInstance(this.hadoopConfig);
            FluoEntryInputFormat.configure(job, this.fluoConfig);
            return javaSparkContext.newAPIHadoopRDD(job.getConfiguration(), FluoEntryInputFormat.class, RowColumn.class, Bytes.class);
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    public void bulkImportRcvToFluo(JavaPairRDD<RowColumn, Bytes> javaPairRDD, BulkImportOptions bulkImportOptions) {
        bulkImportKvToAccumulo(partitionForAccumulo(javaPairRDD, this.fluoConfig.getAccumuloTable(), bulkImportOptions).flatMapToPair(tuple2 -> {
            LinkedList linkedList = new LinkedList();
            RowColumn rowColumn = (RowColumn) tuple2._1();
            FluoKeyValueGenerator fluoKeyValueGenerator = new FluoKeyValueGenerator();
            fluoKeyValueGenerator.setRow(rowColumn.getRow()).setColumn(rowColumn.getColumn()).setValue(((Bytes) tuple2._2()).toArray());
            for (FluoKeyValue fluoKeyValue : fluoKeyValueGenerator.getKeyValues()) {
                linkedList.add(new Tuple2(fluoKeyValue.getKey(), fluoKeyValue.getValue()));
            }
            return linkedList;
        }), this.fluoConfig.getAccumuloTable(), bulkImportOptions);
    }

    public void bulkImportKvToFluo(JavaPairRDD<Key, Value> javaPairRDD, BulkImportOptions bulkImportOptions) {
        bulkImportKvToAccumulo(javaPairRDD, this.fluoConfig.getAccumuloTable(), bulkImportOptions);
    }

    public void bulkImportRcvToAccumulo(JavaPairRDD<RowColumn, Bytes> javaPairRDD, String str, BulkImportOptions bulkImportOptions) {
        bulkImportKvToAccumulo(partitionForAccumulo(javaPairRDD, str, bulkImportOptions).mapToPair(tuple2 -> {
            RowColumn rowColumn = (RowColumn) tuple2._1();
            return new Tuple2(new Key(new Text(rowColumn.getRow().toArray()), new Text(rowColumn.getColumn().getFamily().toArray()), new Text(rowColumn.getColumn().getQualifier().toArray()), 0L), new Value(((Bytes) tuple2._2()).toArray()));
        }), str, bulkImportOptions);
    }

    public void bulkImportKvToAccumulo(JavaPairRDD<Key, Value> javaPairRDD, String str, BulkImportOptions bulkImportOptions) {
        Path tempDir = getTempDir(bulkImportOptions);
        Connector chooseConnector = chooseConnector(bulkImportOptions);
        try {
            if (this.hdfs.exists(tempDir)) {
                throw new IllegalArgumentException("HDFS temp dir already exists: " + tempDir.toString());
            }
            this.hdfs.mkdirs(tempDir);
            Path path = new Path(tempDir.toString() + "/data");
            Path path2 = new Path(tempDir.toString() + "/fail");
            this.hdfs.mkdirs(path2);
            Job job = Job.getInstance(this.hadoopConfig);
            AccumuloFileOutputFormat.setOutputPath(job, path);
            javaPairRDD.saveAsNewAPIHadoopFile(path.toString(), Key.class, Value.class, AccumuloFileOutputFormat.class, job.getConfiguration());
            log.info("Wrote data for bulk import to HDFS temp directory: {}", path);
            chooseConnector.tableOperations().importDirectory(str, path.toString(), path2.toString(), false);
            if (this.hdfs.listFiles(path2, true).hasNext()) {
                throw new IllegalStateException("Bulk import failed!  Found files that failed to import in failures directory: " + path2);
            }
            log.info("Successfully bulk imported data in {} to '{}' Accumulo table", path, str);
            this.hdfs.delete(tempDir, true);
            log.info("Deleted HDFS temp directory created for bulk import: {}", tempDir);
        } catch (IOException | TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
            throw new IllegalStateException(e);
        }
    }

    private Path getPossibleTempDir() {
        return new Path(this.tempBaseDir.toString() + "/" + tempDirCounter.getAndIncrement());
    }

    private Path getTempDir(BulkImportOptions bulkImportOptions) {
        Path possibleTempDir;
        if (bulkImportOptions.tempDir == null) {
            try {
                possibleTempDir = getPossibleTempDir();
                while (this.hdfs.exists(possibleTempDir)) {
                    possibleTempDir = getPossibleTempDir();
                }
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
        } else {
            possibleTempDir = bulkImportOptions.tempDir;
        }
        return possibleTempDir;
    }

    private JavaPairRDD<RowColumn, Bytes> partitionForAccumulo(JavaPairRDD<RowColumn, Bytes> javaPairRDD, String str, BulkImportOptions bulkImportOptions) {
        try {
            return javaPairRDD.repartitionAndSortWithinPartitions(new AccumuloRangePartitioner(chooseConnector(bulkImportOptions).tableOperations().listSplits(str)));
        } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
            throw new IllegalStateException((Throwable) e);
        }
    }

    private Connector chooseConnector(BulkImportOptions bulkImportOptions) {
        return bulkImportOptions.conn == null ? this.defaultConn : bulkImportOptions.conn;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1784609574:
                if (implMethodName.equals("lambda$toRcvRDD$a3168098$1")) {
                    z = 2;
                    break;
                }
                break;
            case -6304409:
                if (implMethodName.equals("lambda$bulkImportRcvToFluo$c31694b$1")) {
                    z = true;
                    break;
                }
                break;
            case 1179501313:
                if (implMethodName.equals("lambda$toPairRDD$af46baf0$1")) {
                    z = 3;
                    break;
                }
                break;
            case 2006890491:
                if (implMethodName.equals("lambda$bulkImportRcvToAccumulo$c90a876e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("org/apache/fluo/recipes/spark/FluoSparkHelper") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lscala/Tuple2;")) {
                    return tuple2 -> {
                        RowColumn rowColumn = (RowColumn) tuple2._1();
                        return new Tuple2(new Key(new Text(rowColumn.getRow().toArray()), new Text(rowColumn.getColumn().getFamily().toArray()), new Text(rowColumn.getColumn().getQualifier().toArray()), 0L), new Value(((Bytes) tuple2._2()).toArray()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Iterable;") && serializedLambda.getImplClass().equals("org/apache/fluo/recipes/spark/FluoSparkHelper") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Ljava/lang/Iterable;")) {
                    return tuple22 -> {
                        LinkedList linkedList = new LinkedList();
                        RowColumn rowColumn = (RowColumn) tuple22._1();
                        FluoKeyValueGenerator fluoKeyValueGenerator = new FluoKeyValueGenerator();
                        fluoKeyValueGenerator.setRow(rowColumn.getRow()).setColumn(rowColumn.getColumn()).setValue(((Bytes) tuple22._2()).toArray());
                        for (FluoKeyValue fluoKeyValue : fluoKeyValueGenerator.getKeyValues()) {
                            linkedList.add(new Tuple2(fluoKeyValue.getKey(), fluoKeyValue.getValue()));
                        }
                        return linkedList;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/fluo/recipes/spark/FluoSparkHelper") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lorg/apache/fluo/api/data/RowColumnValue;")) {
                    return tuple23 -> {
                        return new RowColumnValue(((RowColumn) tuple23._1()).getRow(), ((RowColumn) tuple23._1()).getColumn(), (Bytes) tuple23._2());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("org/apache/fluo/recipes/spark/FluoSparkHelper") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/fluo/api/data/RowColumnValue;)Lscala/Tuple2;")) {
                    return rowColumnValue -> {
                        return new Tuple2(rowColumnValue.getRowColumn(), rowColumnValue.getValue());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
