package io.kgraph.utils;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import io.kgraph.Edge;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.1.1.jar:io/kgraph/utils/GraphUtils.class */
public class GraphUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GraphUtils.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.1.1.jar:io/kgraph/utils/GraphUtils$SendMessages.class */
    public static final class SendMessages<K, V> implements Processor<K, V> {
        private final String topic;
        private final Producer<K, V> producer;
        private final Map<TopicPartition, Long> lastWrittenOffsets;

        public SendMessages(String str, Producer<K, V> producer, Map<TopicPartition, Long> map) {
            this.topic = str;
            this.producer = producer;
            this.lastWrittenOffsets = map;
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void process(K k, V v) {
            try {
                this.producer.send(new ProducerRecord<>(this.topic, k, v), (recordMetadata, exc) -> {
                    if (exc == null) {
                        try {
                            this.lastWrittenOffsets.merge(new TopicPartition(recordMetadata.topic(), recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), (v0, v1) -> {
                                return Math.max(v0, v1);
                            });
                        } catch (Exception e) {
                            throw GraphUtils.toRuntimeException(e);
                        }
                    }
                });
                this.producer.flush();
            } catch (Exception e) {
                throw GraphUtils.toRuntimeException(e);
            }
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void close() {
        }
    }

    public static <T extends Number> void verticesToTopic(InputStream inputStream, Function<String, T> function, Serializer<T> serializer, Properties properties, String str, int i, short s) throws IOException {
        ClientUtils.createTopic(str, i, s, properties);
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
        Throwable th = null;
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(properties, (Serializer) new LongSerializer(), (Serializer) serializer);
            Throwable th2 = null;
            while (true) {
                try {
                    try {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        }
                        String[] split = readLine.trim().split("\\s");
                        long parseLong = Long.parseLong(split[0]);
                        log.trace("read vertex: {}", Long.valueOf(parseLong));
                        kafkaProducer.send(new ProducerRecord(str, Long.valueOf(parseLong), split.length > 1 ? function.apply(split[1]) : null));
                    } catch (Throwable th3) {
                        th2 = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (kafkaProducer != null) {
                        if (th2 != null) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                    throw th4;
                }
            }
            kafkaProducer.flush();
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th6) {
                        th2.addSuppressed(th6);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            if (bufferedReader != null) {
                if (0 == 0) {
                    bufferedReader.close();
                    return;
                }
                try {
                    bufferedReader.close();
                } catch (Throwable th7) {
                    th.addSuppressed(th7);
                }
            }
        } catch (Throwable th8) {
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            throw th8;
        }
    }

    public static <T extends Number> void edgesToTopic(InputStream inputStream, Function<String, T> function, Serializer<T> serializer, Properties properties, String str, int i, short s) throws IOException {
        ClientUtils.createTopic(str, i, s, properties);
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
        Throwable th = null;
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(properties, (Serializer) new KryoSerializer(), (Serializer) serializer);
            Throwable th2 = null;
            while (true) {
                try {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    String[] split = readLine.trim().split("\\s");
                    long parseLong = Long.parseLong(split[0]);
                    long parseLong2 = Long.parseLong(split[1]);
                    log.trace("read edge: ({}, {})", Long.valueOf(parseLong), Long.valueOf(parseLong2));
                    kafkaProducer.send(new ProducerRecord(str, new Edge(Long.valueOf(parseLong), Long.valueOf(parseLong2)), split.length > 2 ? function.apply(split[2]) : null));
                } catch (Throwable th3) {
                    if (kafkaProducer != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th4) {
                                th2.addSuppressed(th4);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                    throw th3;
                }
            }
            kafkaProducer.flush();
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th2.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            if (bufferedReader != null) {
                if (0 == 0) {
                    bufferedReader.close();
                    return;
                }
                try {
                    bufferedReader.close();
                } catch (Throwable th6) {
                    th.addSuppressed(th6);
                }
            }
        } catch (Throwable th7) {
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            throw th7;
        }
    }

    public static <T extends Number> void verticesToFile(KTable<Long, T> kTable, String str) {
        kTable.toStream().print(Printed.toFile(str).withKeyValueMapper((l, number) -> {
            return String.format("%d %f", l, number);
        }));
    }

    public static <K, VV, EV> CompletableFuture<Map<TopicPartition, Long>> groupEdgesBySourceAndRepartition(StreamsBuilder streamsBuilder, Properties properties, String str, String str2, GraphSerialized<K, VV, EV> graphSerialized, String str3, String str4, int i, short s) {
        return groupEdgesBySourceAndRepartition(streamsBuilder, properties, new KGraph(streamsBuilder.table(str, Consumed.with(graphSerialized.keySerde(), graphSerialized.vertexValueSerde())), streamsBuilder.table(str2, Consumed.with(new KryoSerde(), graphSerialized.edgeValueSerde())), graphSerialized), str3, str4, i, s);
    }

    public static <K, VV, EV> CompletableFuture<Map<TopicPartition, Long>> groupEdgesBySourceAndRepartition(StreamsBuilder streamsBuilder, Properties properties, KGraph<K, VV, EV> kGraph, String str, String str2, int i, short s) {
        log.info("Started loading graph");
        ClientUtils.createTopic(str, i, s, properties);
        ClientUtils.createTopic(str2, i, s, properties);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicLong atomicLong = new AtomicLong(0L);
        Properties producerConfig = ClientUtils.producerConfig(properties.getProperty("bootstrap.servers"), kGraph.keySerde().serializer().getClass(), kGraph.vertexValueSerde().serializer().getClass(), properties);
        producerConfig.setProperty("client.id", "pregel-vertex-producer");
        KafkaProducer kafkaProducer = new KafkaProducer(producerConfig);
        Properties producerConfig2 = ClientUtils.producerConfig(properties.getProperty("bootstrap.servers"), kGraph.keySerde().serializer().getClass(), KryoSerializer.class, properties);
        producerConfig2.setProperty("client.id", "pregel-edge-producer");
        KafkaProducer kafkaProducer2 = new KafkaProducer(producerConfig2);
        kGraph.vertices().toStream().peek((obj, obj2) -> {
            atomicInteger.incrementAndGet();
            atomicLong.set(System.currentTimeMillis());
        }).process(() -> {
            return new SendMessages(str, kafkaProducer, concurrentHashMap);
        }, new String[0]);
        kGraph.edgesGroupedBySource().toStream().peek((obj3, iterable) -> {
            atomicInteger2.incrementAndGet();
            atomicLong.set(System.currentTimeMillis());
        }).mapValues(iterable2 -> {
            return (Map) StreamSupport.stream(iterable2.spliterator(), false).collect(Collectors.toMap((v0) -> {
                return v0.target();
            }, (v0) -> {
                return v0.value();
            }));
        }).process(() -> {
            return new SendMessages(str2, kafkaProducer2, concurrentHashMap);
        }, new String[0]);
        Topology build = streamsBuilder.build();
        log.debug("Graph description {}", build.describe());
        KafkaStreams kafkaStreams = new KafkaStreams(build, properties);
        kafkaStreams.start();
        CompletableFuture completableFuture = new CompletableFuture();
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> scheduleWithFixedDelay = newSingleThreadScheduledExecutor.scheduleWithFixedDelay(() -> {
            long j = atomicLong.get();
            if (j <= 0 || System.currentTimeMillis() - j <= AbstractComponentTracker.LINGERING_TIMEOUT) {
                return;
            }
            try {
                kafkaProducer.close();
                kafkaProducer2.close();
                kafkaStreams.close();
                completableFuture.complete(concurrentHashMap);
                log.info("Finished loading graph: {} vertices, {} edges", Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()));
            } catch (Throwable th) {
                completableFuture.complete(concurrentHashMap);
                throw th;
            }
        }, 0L, 1L, TimeUnit.SECONDS);
        return completableFuture.whenCompleteAsync((map, th) -> {
            scheduleWithFixedDelay.cancel(true);
            newSingleThreadScheduledExecutor.shutdown();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static RuntimeException toRuntimeException(Exception exc) {
        return exc instanceof RuntimeException ? (RuntimeException) exc : new RuntimeException(exc);
    }
}
