package org.apache.hugegraph.loader.spark;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hugegraph.driver.GraphManager;
import org.apache.hugegraph.loader.builder.EdgeBuilder;
import org.apache.hugegraph.loader.builder.ElementBuilder;
import org.apache.hugegraph.loader.builder.VertexBuilder;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.direct.loader.HBaseDirectLoader;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
import org.apache.hugegraph.loader.mapping.EdgeMapping;
import org.apache.hugegraph.loader.mapping.ElementMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.mapping.LoadMapping;
import org.apache.hugegraph.loader.mapping.VertexMapping;
import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
import org.apache.hugegraph.loader.source.InputSource;
import org.apache.hugegraph.loader.source.file.FileFormat;
import org.apache.hugegraph.loader.source.file.FileSource;
import org.apache.hugegraph.loader.source.jdbc.JDBCSource;
import org.apache.hugegraph.loader.util.Printer;
import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.BatchEdgeRequest;
import org.apache.hugegraph.structure.graph.BatchVertexRequest;
import org.apache.hugegraph.structure.graph.UpdateStrategy;
import org.apache.hugegraph.util.Log;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.class */
public class HugeGraphSparkLoader implements Serializable {
    public static final Logger LOG = Log.logger(HugeGraphSparkLoader.class);
    private final LoadOptions loadOptions;
    private final Map<ElementBuilder, List<GraphElement>> builders = new HashMap();

    public static void main(String[] strArr) {
        try {
            new HugeGraphSparkLoader(strArr).load();
        } catch (Throwable th) {
            Printer.printError("Failed to start loading", th);
        }
    }

    public HugeGraphSparkLoader(String[] strArr) {
        this.loadOptions = LoadOptions.parseOptions(strArr);
    }

    public void load() {
        LoadMapping of = LoadMapping.of(this.loadOptions.file);
        List<InputStruct> structs = of.structs();
        boolean z = this.loadOptions.sinkType;
        if (!z) {
            this.loadOptions.copyBackendStoreInfo(of.getBackendStoreInfo());
        }
        SparkConf sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryo.registrationRequired", "true");
        try {
            sparkConf.registerKryoClasses(new Class[]{ImmutableBytesWritable.class, KeyValue.class, StructType.class, StructField[].class, StructField.class, LongType$.class, Metadata.class, StringType$.class, Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"), Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("scala.collection.immutable.Set$EmptySet$"), Class.forName("org.apache.spark.sql.types.DoubleType$")});
        } catch (ClassNotFoundException e) {
            LOG.error("spark kryo serialized registration failed");
        }
        SparkSession orCreate = SparkSession.builder().config(sparkConf).getOrCreate();
        SparkContext sparkContext = orCreate.sparkContext();
        LongAccumulator longAccumulator = sparkContext.longAccumulator("totalInsertSuccess");
        for (InputStruct inputStruct : structs) {
            LOG.info("\n Initializes the accumulator corresponding to the  {} ", inputStruct.input().asFileSource().path());
            LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(inputStruct);
            loadDistributeMetrics.init(sparkContext);
            LOG.info("\n  Start to load data, data info is: \t {} ", inputStruct.input().asFileSource().path());
            Dataset<Row> read = read(orCreate, inputStruct);
            if (z) {
                LOG.info("\n  Start to load data using spark apis  \n");
                read.foreachPartition(it -> {
                    LoadContext initPartition = initPartition(this.loadOptions, inputStruct);
                    it.forEachRemaining(row -> {
                        loadRow(inputStruct, row, it, initPartition);
                    });
                    initPartition.close();
                });
            } else {
                LOG.info("\n Start to load data using spark bulkload \n");
                new HBaseDirectLoader(this.loadOptions, inputStruct, loadDistributeMetrics).bulkload(read);
            }
            collectLoadMetrics(loadDistributeMetrics, longAccumulator);
            LOG.info("\n Finished  load {}  data ", inputStruct.input().asFileSource().path());
        }
        LOG.info("\n ------------The data load task is complete-------------------\n\n insertSuccessCnt:\t {} \n ---------------------------------------------\n", longAccumulator.value());
        sparkContext.stop();
        orCreate.close();
        orCreate.stop();
    }

    private void collectLoadMetrics(LoadDistributeMetrics loadDistributeMetrics, LongAccumulator longAccumulator) {
        Long readEdgeInsertSuccess = loadDistributeMetrics.readEdgeInsertSuccess();
        Long readVertexInsertSuccess = loadDistributeMetrics.readVertexInsertSuccess();
        longAccumulator.add(readEdgeInsertSuccess);
        longAccumulator.add(readVertexInsertSuccess);
    }

    private LoadContext initPartition(LoadOptions loadOptions, InputStruct inputStruct) {
        LoadContext loadContext = new LoadContext(loadOptions);
        Iterator<VertexMapping> it = inputStruct.vertices().iterator();
        while (it.hasNext()) {
            this.builders.put(new VertexBuilder(loadContext, inputStruct, it.next()), new ArrayList());
        }
        Iterator<EdgeMapping> it2 = inputStruct.edges().iterator();
        while (it2.hasNext()) {
            this.builders.put(new EdgeBuilder(loadContext, inputStruct, it2.next()), new ArrayList());
        }
        loadContext.updateSchemaCache();
        return loadContext;
    }

    private void loadRow(InputStruct inputStruct, Row row, Iterator<Row> it, LoadContext loadContext) {
        for (Map.Entry<ElementBuilder, List<GraphElement>> entry : this.builders.entrySet()) {
            ElementMapping mapping = entry.getKey().mapping();
            if (!mapping.skip()) {
                parse(row, entry, inputStruct);
                List<GraphElement> value = entry.getValue();
                if (value.size() >= mapping.batchSize() || (!it.hasNext() && value.size() > 0)) {
                    flush(entry, loadContext.client().graph(), this.loadOptions.checkVertex);
                }
            }
        }
    }

    private Dataset<Row> read(SparkSession sparkSession, InputStruct inputStruct) {
        Dataset<Row> jdbc;
        InputSource input = inputStruct.input();
        input.charset();
        DataFrameReader read = sparkSession.read();
        switch (input.type()) {
            case FILE:
            case HDFS:
                FileSource asFileSource = input.asFileSource();
                asFileSource.header();
                asFileSource.delimiter();
                String path = asFileSource.path();
                asFileSource.filter();
                FileFormat format = asFileSource.format();
                asFileSource.dateFormat();
                asFileSource.timeZone();
                asFileSource.skippedLine();
                asFileSource.compression();
                asFileSource.batchSize();
                switch (format) {
                    case TEXT:
                        jdbc = read.text(path);
                        break;
                    case JSON:
                        jdbc = read.json(path);
                        break;
                    case CSV:
                        jdbc = read.csv(path);
                        break;
                    default:
                        throw new IllegalStateException("Unexpected format value: " + format);
                }
            case JDBC:
                JDBCSource jDBCSource = (JDBCSource) inputStruct.input();
                String str = jDBCSource.url() + "/" + jDBCSource.database();
                String table = jDBCSource.table();
                String username = jDBCSource.username();
                String password = jDBCSource.password();
                Properties properties = new Properties();
                properties.put("user", username);
                properties.put("password", password);
                jdbc = read.jdbc(str, table, properties);
                break;
            default:
                throw new AssertionError(String.format("Unsupported input source '%s'", input.type()));
        }
        return jdbc;
    }

    private void parse(Row row, Map.Entry<ElementBuilder, List<GraphElement>> entry, InputStruct inputStruct) {
        List build;
        ElementBuilder key = entry.getKey();
        List<GraphElement> value = entry.getValue();
        if (Constants.EMPTY_STR.equals(row.mkString())) {
            return;
        }
        switch (inputStruct.input().type()) {
            case FILE:
            case HDFS:
                FileSource asFileSource = inputStruct.input().asFileSource();
                String delimiter = asFileSource.delimiter();
                build = key.build(asFileSource.header(), row.mkString(delimiter).split(delimiter));
                break;
            case JDBC:
                Object[] array = JavaConverters.asJavaCollection(row.schema().toList()).toArray();
                int length = row.schema().length();
                String[] strArr = new String[length];
                Object[] objArr = new Object[length];
                for (int i = 0; i < length; i++) {
                    strArr[i] = ((StructField) array[i]).name();
                    objArr[i] = row.get(i);
                }
                build = key.build(strArr, objArr);
                break;
            default:
                throw new AssertionError(String.format("Unsupported input source '%s'", inputStruct.input().type()));
        }
        value.addAll(build);
    }

    private void flush(Map.Entry<ElementBuilder, List<GraphElement>> entry, GraphManager graphManager, boolean z) {
        ElementBuilder key = entry.getKey();
        ElementMapping mapping = key.mapping();
        List<GraphElement> value = entry.getValue();
        boolean isVertex = key.mapping().type().isVertex();
        Map<String, UpdateStrategy> updateStrategies = mapping.updateStrategies();
        if (updateStrategies.isEmpty()) {
            if (isVertex) {
                graphManager.addVertices(value);
            } else {
                graphManager.addEdges(value);
            }
        } else if (isVertex) {
            BatchVertexRequest.Builder builder = new BatchVertexRequest.Builder();
            builder.vertices(value).updatingStrategies(updateStrategies).createIfNotExist(true);
            graphManager.updateVertices(builder.build());
        } else {
            BatchEdgeRequest.Builder builder2 = new BatchEdgeRequest.Builder();
            builder2.edges(value).updatingStrategies(updateStrategies).checkVertex(z).createIfNotExist(true);
            graphManager.updateEdges(builder2.build());
        }
        value.clear();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 217747704:
                if (implMethodName.equals("lambda$load$60a00452$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case Constants.EXIT_CODE_NORM /* 0 */:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/ForeachPartitionFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Iterator;)V") && serializedLambda.getImplClass().equals("org/apache/hugegraph/loader/spark/HugeGraphSparkLoader") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hugegraph/loader/mapping/InputStruct;Ljava/util/Iterator;)V")) {
                    HugeGraphSparkLoader hugeGraphSparkLoader = (HugeGraphSparkLoader) serializedLambda.getCapturedArg(0);
                    InputStruct inputStruct = (InputStruct) serializedLambda.getCapturedArg(1);
                    return it -> {
                        LoadContext initPartition = initPartition(this.loadOptions, inputStruct);
                        it.forEachRemaining(row -> {
                            loadRow(inputStruct, row, it, initPartition);
                        });
                        initPartition.close();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
