package org.apache.hugegraph.loader.flink;

import io.debezium.data.Envelope;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hugegraph.driver.GraphManager;
import org.apache.hugegraph.loader.builder.EdgeBuilder;
import org.apache.hugegraph.loader.builder.ElementBuilder;
import org.apache.hugegraph.loader.builder.VertexBuilder;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.exception.LoadException;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
import org.apache.hugegraph.loader.mapping.EdgeMapping;
import org.apache.hugegraph.loader.mapping.ElementMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.mapping.VertexMapping;
import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.BatchEdgeRequest;
import org.apache.hugegraph.structure.graph.BatchVertexRequest;
import org.apache.hugegraph.structure.graph.UpdateStrategy;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/loader/flink/HugeGraphOutputFormat.class */
public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
    private static final Logger LOG = Log.logger(HugeGraphOutputFormat.class);
    private static final long serialVersionUID = -4514164348993670086L;
    private LoadContext loadContext;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient boolean closed = false;
    private final LoadOptions loadOptions;
    private final InputStruct struct;
    private Map<ElementBuilder, List<String>> builders;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hugegraph.loader.flink.HugeGraphOutputFormat$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hugegraph/loader/flink/HugeGraphOutputFormat$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$debezium$data$Envelope$Operation = new int[Envelope.Operation.values().length];

        static {
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.READ.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.CREATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public HugeGraphOutputFormat(InputStruct inputStruct, String[] strArr) {
        this.struct = inputStruct;
        this.loadOptions = LoadOptions.parseOptions(strArr);
    }

    private Map<ElementBuilder, List<String>> initBuilders() {
        LoadContext loadContext = new LoadContext(this.loadOptions);
        HashMap hashMap = new HashMap();
        Iterator<VertexMapping> it = this.struct.vertices().iterator();
        while (it.hasNext()) {
            hashMap.put(new VertexBuilder(loadContext, this.struct, it.next()), new ArrayList());
        }
        Iterator<EdgeMapping> it2 = this.struct.edges().iterator();
        while (it2.hasNext()) {
            hashMap.put(new EdgeBuilder(loadContext, this.struct, it2.next()), new ArrayList());
        }
        loadContext.updateSchemaCache();
        return hashMap;
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i2) {
        this.builders = initBuilders();
        this.loadContext = new LoadContext(this.loadOptions);
        int i3 = this.loadOptions.flushIntervalMs;
        if (i3 > 0) {
            this.scheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ExecutorThreadFactory("hugegraph-streamload-outputformat"));
            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this::flushAll, i3, i3, TimeUnit.MILLISECONDS);
        }
    }

    private synchronized void flushAll() {
        if (this.closed) {
            return;
        }
        try {
            for (Map.Entry<ElementBuilder, List<String>> entry : this.builders.entrySet()) {
                List<String> value = entry.getValue();
                if (value.size() > 0) {
                    flush(entry.getKey(), value);
                }
            }
        } catch (Exception e) {
            throw new LoadException("Failed to flush all data.", e);
        }
    }

    public synchronized void writeRecord(T t) {
        for (Map.Entry<ElementBuilder, List<String>> entry : this.builders.entrySet()) {
            ElementMapping mapping = entry.getKey().mapping();
            if (!mapping.skip()) {
                entry.getValue().add(t.toString());
                if (r0.size() >= mapping.batchSize()) {
                    flush(entry.getKey(), entry.getValue());
                }
            }
        }
    }

    private Tuple2<String, List<GraphElement>> buildGraphData(ElementBuilder elementBuilder, String str) {
        try {
            JsonNode readTree = new ObjectMapper().readTree(str);
            JsonNode jsonNode = readTree.get(Constants.CDC_DATA);
            String asText = readTree.get(Constants.CDC_OP).asText();
            String[] header = this.struct.input().header();
            String[] strArr = new String[jsonNode.size()];
            for (int i = 0; i < header.length; i++) {
                strArr[i] = jsonNode.get(header[i]).asText();
            }
            return Tuple2.of(asText, elementBuilder.build(header, strArr));
        } catch (JsonProcessingException e) {
            throw new ParseException(str, e);
        }
    }

    private void flush(ElementBuilder<GraphElement> elementBuilder, List<String> list) {
        GraphManager graph = this.loadContext.client().graph();
        ElementMapping mapping = elementBuilder.mapping();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Tuple2<String, List<GraphElement>> buildGraphData = buildGraphData(elementBuilder, it.next());
            List list2 = (List) buildGraphData.f1;
            boolean isVertex = elementBuilder.mapping().type().isVertex();
            switch (AnonymousClass1.$SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.forCode((String) buildGraphData.f0).ordinal()]) {
                case 1:
                case 2:
                    if (!isVertex) {
                        graph.addEdges(list2);
                        break;
                    } else {
                        graph.addVertices(list2);
                        break;
                    }
                case 3:
                    Map<String, UpdateStrategy> updateStrategies = mapping.updateStrategies();
                    if (!isVertex) {
                        BatchEdgeRequest.Builder builder = new BatchEdgeRequest.Builder();
                        builder.edges(list2).updatingStrategies(updateStrategies).checkVertex(this.loadOptions.checkVertex).createIfNotExist(true);
                        graph.updateEdges(builder.build());
                        break;
                    } else {
                        BatchVertexRequest.Builder builder2 = new BatchVertexRequest.Builder();
                        builder2.vertices(list2).updatingStrategies(updateStrategies).createIfNotExist(true);
                        graph.updateVertices(builder2.build());
                        break;
                    }
                case 4:
                    String obj = ((GraphElement) list2.get(0)).id().toString();
                    if (!isVertex) {
                        graph.removeEdge(obj);
                        break;
                    } else {
                        graph.removeVertex(obj);
                        break;
                    }
                default:
                    throw new IllegalArgumentException("The type of `op` should be 'c' 'r' 'u' 'd' only");
            }
        }
        list.clear();
    }

    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            this.scheduler.shutdown();
        }
    }
}
