package org.apache.hugegraph.loader.direct.loader;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hugegraph.loader.builder.ElementBuilder;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.direct.util.SinkToHBase;
import org.apache.hugegraph.loader.executor.LoadOptions;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
import org.apache.hugegraph.loader.util.HugeClientHolder;
import org.apache.hugegraph.serializer.direct.HBaseSerializer;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.Log;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import scala.Tuple2;

/* loaded from: input_file:org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.class */
public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, KeyValue> {
    private SinkToHBase sinkToHBase;
    private LoadDistributeMetrics loadDistributeMetrics;
    public static final Logger LOG = Log.logger(HBaseDirectLoader.class);

    public HBaseDirectLoader(LoadOptions loadOptions, InputStruct inputStruct, LoadDistributeMetrics loadDistributeMetrics) {
        super(loadOptions, inputStruct);
        this.loadDistributeMetrics = loadDistributeMetrics;
        this.sinkToHBase = new SinkToHBase(loadOptions);
    }

    public String getTableName() {
        String str = null;
        if (this.struct.edges().size() > 0) {
            str = this.loadOptions.edgeTablename;
        } else if (this.struct.vertices().size() > 0) {
            str = this.loadOptions.vertexTablename;
        }
        return str;
    }

    public Integer getTablePartitions() {
        return Integer.valueOf(this.struct.edges().size() > 0 ? this.loadOptions.edgePartitions : this.loadOptions.vertexPartitions);
    }

    @Override // org.apache.hugegraph.loader.direct.loader.DirectLoader
    public JavaPairRDD<ImmutableBytesWritable, KeyValue> buildVertexAndEdge(Dataset<Row> dataset) {
        LOG.info("Start build vertexes and edges");
        return dataset.toJavaRDD().mapPartitionsToPair(it -> {
            HBaseSerializer hBaseSerializer = new HBaseSerializer(HugeClientHolder.create(this.loadOptions), this.loadOptions.vertexPartitions, this.loadOptions.edgePartitions);
            List<ElementBuilder> elementBuilders = getElementBuilders();
            LinkedList linkedList = new LinkedList();
            while (it.hasNext()) {
                linkedList.addAll(buildAndSer(hBaseSerializer, (Row) it.next(), elementBuilders));
            }
            hBaseSerializer.close();
            return linkedList.iterator();
        });
    }

    @Override // org.apache.hugegraph.loader.direct.loader.DirectLoader
    String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRDD) {
        LOG.info("Start to generate hfile");
        try {
            Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> partitionerByTableName = this.sinkToHBase.getPartitionerByTableName(getTablePartitions().intValue(), getTableName());
            Partitioner partitioner = (Partitioner) partitionerByTableName._1;
            TableDescriptor tableDescriptor = (TableDescriptor) partitionerByTableName._2;
            JavaPairRDD repartitionAndSortWithinPartitions = javaPairRDD.repartitionAndSortWithinPartitions(partitioner);
            Configuration configuration = this.sinkToHBase.getHBaseConfiguration().get();
            Job job = Job.getInstance(configuration);
            HFileOutputFormat2.configureIncrementalLoadMap(job, tableDescriptor);
            configuration.set("hbase.mapreduce.hfileoutputformat.table.name", tableDescriptor.getTableName().getNameAsString());
            String hFilePath = getHFilePath(job.getConfiguration());
            repartitionAndSortWithinPartitions.saveAsNewAPIHadoopFile(hFilePath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, configuration);
            LOG.info("Saved HFiles to: '{}'", hFilePath);
            flushPermission(configuration, hFilePath);
            return hFilePath;
        } catch (IOException e) {
            LOG.error("Failed to generate files", e);
            return Constants.EMPTY_STR;
        }
    }

    public String getHFilePath(Configuration configuration) throws IOException {
        FileSystem fileSystem = FileSystem.get(configuration);
        String str = fileSystem.getWorkingDirectory().toString() + "/hfile-gen/" + System.currentTimeMillis() + "/";
        Path path = new Path(str);
        if (fileSystem.exists(path)) {
            LOG.info("\n Delete the path where the hfile is generated,path {} ", str);
            fileSystem.delete(path, true);
        }
        return str;
    }

    @Override // org.apache.hugegraph.loader.direct.loader.DirectLoader
    public void loadFiles(String str) {
        try {
            this.sinkToHBase.loadHfiles(str, getTableName());
        } catch (Exception e) {
            LOG.error(" Failed to load hfiles", e);
        }
    }

    private void flushPermission(Configuration configuration, String str) {
        FsShell fsShell = new FsShell(configuration);
        try {
            LOG.info("Chmod hfile directory permission");
            fsShell.run(new String[]{"-chmod", "-R", "777", str});
            fsShell.close();
        } catch (Exception e) {
            LOG.error("Couldn't change the file permissions " + e + " Please run command:hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + str + " 'test'\n to load generated HFiles into HBase table");
        }
    }

    List<Tuple2<ImmutableBytesWritable, KeyValue>> buildAndSer(HBaseSerializer hBaseSerializer, Row row, List<ElementBuilder> list) {
        LinkedList linkedList = new LinkedList();
        for (ElementBuilder elementBuilder : list) {
            if (!elementBuilder.mapping().skip()) {
                if (Constants.EMPTY_STR.equals(row.mkString())) {
                    return linkedList;
                }
                switch (this.struct.input().type()) {
                    case FILE:
                    case HDFS:
                        List<Edge> build = elementBuilder.build(row);
                        if (elementBuilder.mapping().type().isVertex()) {
                            Iterator it = build.iterator();
                            while (it.hasNext()) {
                                Vertex vertex = (Vertex) it.next();
                                LOG.debug("vertex already build done {} ", vertex.toString());
                                Tuple2<ImmutableBytesWritable, KeyValue> vertexSerialize = vertexSerialize(hBaseSerializer, vertex);
                                this.loadDistributeMetrics.increaseDisVertexInsertSuccess(elementBuilder.mapping());
                                linkedList.add(vertexSerialize);
                            }
                            break;
                        } else {
                            for (Edge edge : build) {
                                LOG.debug("edge already build done {}", edge.toString());
                                Tuple2<ImmutableBytesWritable, KeyValue> edgeSerialize = edgeSerialize(hBaseSerializer, edge);
                                this.loadDistributeMetrics.increaseDisEdgeInsertSuccess(elementBuilder.mapping());
                                linkedList.add(edgeSerialize);
                            }
                            break;
                        }
                    default:
                        throw new AssertionError(String.format("Unsupported input source '%s'", this.struct.input().type()));
                }
            }
        }
        return linkedList;
    }

    private Tuple2<ImmutableBytesWritable, KeyValue> edgeSerialize(HBaseSerializer hBaseSerializer, Edge edge) {
        LOG.debug("edge start serialize {}", edge.toString());
        byte[] keyBytes = hBaseSerializer.getKeyBytes(edge);
        byte[] valueBytes = hBaseSerializer.getValueBytes(edge);
        ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
        immutableBytesWritable.set(keyBytes);
        return new Tuple2<>(immutableBytesWritable, new KeyValue(keyBytes, Bytes.toBytes(Constants.HBASE_COL_FAMILY), Bytes.toBytes(Constants.EMPTY_STR), valueBytes));
    }

    private Tuple2<ImmutableBytesWritable, KeyValue> vertexSerialize(HBaseSerializer hBaseSerializer, Vertex vertex) {
        LOG.debug("vertex start serialize {}", vertex.toString());
        byte[] keyBytes = hBaseSerializer.getKeyBytes(vertex);
        byte[] valueBytes = hBaseSerializer.getValueBytes(vertex);
        ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
        immutableBytesWritable.set(keyBytes);
        return new Tuple2<>(immutableBytesWritable, new KeyValue(keyBytes, Bytes.toBytes(Constants.HBASE_COL_FAMILY), Bytes.toBytes(Constants.EMPTY_STR), valueBytes));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1229647183:
                if (implMethodName.equals("lambda$buildVertexAndEdge$ffd4323c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case Constants.EXIT_CODE_NORM /* 0 */:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    HBaseDirectLoader hBaseDirectLoader = (HBaseDirectLoader) serializedLambda.getCapturedArg(0);
                    return it -> {
                        HBaseSerializer hBaseSerializer = new HBaseSerializer(HugeClientHolder.create(this.loadOptions), this.loadOptions.vertexPartitions, this.loadOptions.edgePartitions);
                        List<ElementBuilder> elementBuilders = getElementBuilders();
                        LinkedList linkedList = new LinkedList();
                        while (it.hasNext()) {
                            linkedList.addAll(buildAndSer(hBaseSerializer, (Row) it.next(), elementBuilders));
                        }
                        hBaseSerializer.close();
                        return linkedList.iterator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
