package io.kgraph.pregel;

import io.kgraph.EdgeWithValue;
import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.VertexWithValue;
import io.kgraph.pregel.ComputeFunction;
import io.kgraph.pregel.PregelState;
import io.kgraph.pregel.aggregators.Aggregator;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.KryoSerializer;
import io.kgraph.utils.KryoUtils;
import io.vavr.Tuple2;
import io.vavr.Tuple3;
import io.vavr.Tuple4;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.nodes.GroupMember;
import org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.0.0.jar:io/kgraph/pregel/PregelComputation.class */
public class PregelComputation<K, VV, EV, Message> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PregelComputation.class);
    private final String hostAndPort;
    private final String applicationId;
    private final String bootstrapServers;
    private final CuratorFramework curator;
    private final String verticesTopic;
    private KTable<K, VV> vertices;
    private final String edgesGroupedBySourceTopic;
    private KTable<K, Map<K, EV>> edgesGroupedBySource;
    private final String solutionSetTopic;
    private final String solutionSetStore;
    private KTable<K, VV> solutionSet;
    private final String workSetTopic;
    private KStream<K, Tuple3<Integer, K, List<Message>>> workSet;
    private final int numPartitions;
    private final GraphSerialized<K, VV, EV> serialized;
    private final Map<String, ?> configs;
    private final Optional<Message> initialMessage;
    private final ComputeFunction<K, VV, EV, Message> computeFunction;
    private Properties streamsConfig;
    private volatile CompletableFuture<KTable<K, VV>> futureResult;
    private final String edgesStoreName;
    private final String verticesStoreName;
    private final String localworkSetStoreName;
    private final String localSolutionSetStoreName;
    private volatile int maxIterations = Integer.MAX_VALUE;
    private final Map<Integer, Map<Integer, Set<K>>> activeVertices = new ConcurrentHashMap();
    private final Map<Integer, Map<Integer, Boolean>> didPreSuperstep = new ConcurrentHashMap();
    private final Map<Integer, Map<Integer, Map<String, Aggregator<?>>>> aggregators = new ConcurrentHashMap();
    private final Map<Integer, Map<String, ?>> previousAggregates = new ConcurrentHashMap();
    private final Map<String, AggregatorWrapper<?>> registeredAggregators = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.0.0.jar:io/kgraph/pregel/PregelComputation$AggregatorWrapper.class */
    public static class AggregatorWrapper<T> {
        private final Class<? extends Aggregator<T>> aggregatorClass;
        private final boolean persistent;

        public AggregatorWrapper(Class<? extends Aggregator<T>> cls, boolean z) {
            this.aggregatorClass = cls;
            this.persistent = z;
        }

        public Class<? extends Aggregator<T>> getAggregatorClass() {
            return this.aggregatorClass;
        }

        public boolean isPersistent() {
            return this.persistent;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.0.0.jar:io/kgraph/pregel/PregelComputation$BarrierSync.class */
    private final class BarrierSync implements Transformer<K, Tuple3<Integer, K, List<Message>>, KeyValue<K, Tuple2<Integer, Map<K, List<Message>>>>> {
        private ProcessorContext context;
        private KeyValueStore<Integer, Map<K, Map<K, List<Message>>>> localworkSetStore;
        private Consumer<byte[], byte[]> internalConsumer;
        private LeaderLatch leaderLatch;
        private GroupMember group;
        private SharedValue sharedValue;
        private TreeCache aggregateCache;
        private TreeCache barrierCache;
        private PregelState pregelState;
        private final Map<Integer, Set<K>> forwardedVertices;

        private BarrierSync() {
            this.pregelState = new PregelState(GraphAlgorithmState.State.CREATED, -1, PregelState.Stage.SEND);
            this.forwardedVertices = new HashMap();
        }

        @Override // org.apache.kafka.streams.kstream.Transformer
        public void init(ProcessorContext processorContext) {
            try {
                this.context = processorContext;
                this.localworkSetStore = (KeyValueStore) processorContext.getStateStore(PregelComputation.this.localworkSetStoreName);
                this.internalConsumer = PregelComputation.internalConsumer(processorContext);
                String valueOf = String.valueOf(Thread.currentThread().getId());
                String str = PregelComputation.this.hostAndPort != null ? PregelComputation.this.hostAndPort + "#" + valueOf : "local:#" + valueOf;
                PregelComputation.log.debug("Registering worker {} for application {}", str, PregelComputation.this.applicationId);
                this.group = new GroupMember(PregelComputation.this.curator, ZKPaths.makePath(ZKUtils.PREGEL_PATH + PregelComputation.this.applicationId, ZKUtils.GROUP), str);
                this.group.start();
                this.leaderLatch = new LeaderLatch(PregelComputation.this.curator, ZKPaths.makePath(ZKUtils.PREGEL_PATH + PregelComputation.this.applicationId, ZKUtils.LEADER));
                this.leaderLatch.start();
                this.sharedValue = new SharedValue(PregelComputation.this.curator, ZKPaths.makePath(ZKUtils.PREGEL_PATH + PregelComputation.this.applicationId, ZKUtils.SUPERSTEP), this.pregelState.toBytes());
                this.sharedValue.start();
                this.context.schedule(Duration.ofMillis(500L), PunctuationType.WALL_CLOCK_TIME, j -> {
                    try {
                        this.pregelState = PregelState.fromBytes(this.sharedValue.getValue());
                        GraphAlgorithmState.State state = this.pregelState.state();
                        if (state == GraphAlgorithmState.State.CREATED) {
                            return;
                        }
                        if (state == GraphAlgorithmState.State.COMPLETED || state == GraphAlgorithmState.State.CANCELLED) {
                            if (PregelComputation.this.futureResult == null || PregelComputation.this.futureResult.isDone()) {
                                return;
                            }
                            if (this.pregelState.superstep() > PregelComputation.this.maxIterations || state == GraphAlgorithmState.State.CANCELLED) {
                                PregelComputation.log.info("Pregel computation halted after {} iterations", Integer.valueOf(this.pregelState.superstep()));
                            } else {
                                PregelComputation.log.info("Pregel computation converged after {} iterations", Integer.valueOf(this.pregelState.superstep()));
                            }
                            this.context.commit();
                            PregelComputation.this.futureResult.complete(PregelComputation.this.result());
                            return;
                        }
                        if (this.leaderLatch.hasLeadership()) {
                            if (this.aggregateCache == null) {
                                this.aggregateCache = new TreeCache(PregelComputation.this.curator, ZKPaths.makePath(ZKUtils.PREGEL_PATH + PregelComputation.this.applicationId, ZKUtils.AGGREGATES));
                                this.aggregateCache.start();
                            }
                            if (this.barrierCache == null) {
                                this.barrierCache = new TreeCache(PregelComputation.this.curator, ZKPaths.makePath(ZKUtils.PREGEL_PATH + PregelComputation.this.applicationId, ZKUtils.BARRIERS));
                                this.barrierCache.start();
                            }
                            if (this.pregelState.stage() == PregelState.Stage.RECEIVE) {
                                PregelState maybeCreateReadyToSendNode = ZKUtils.maybeCreateReadyToSendNode(PregelComputation.this.curator, PregelComputation.this.applicationId, this.pregelState, this.barrierCache, this.group.getCurrentMembers().size());
                                if (this.pregelState.equals(maybeCreateReadyToSendNode)) {
                                    PregelComputation.log.debug("Not ready to create snd: state {}", this.pregelState);
                                } else {
                                    this.pregelState = maybeCreateReadyToSendNode;
                                    this.sharedValue.setValue(this.pregelState.toBytes());
                                }
                            }
                            if (this.pregelState.stage() == PregelState.Stage.SEND) {
                                PregelState maybeCreateReadyToReceiveNode = ZKUtils.maybeCreateReadyToReceiveNode(PregelComputation.this.curator, PregelComputation.this.applicationId, this.pregelState, this.barrierCache);
                                if (this.pregelState.equals(maybeCreateReadyToReceiveNode)) {
                                    PregelComputation.log.debug("Not ready to create rcv: state {}", this.pregelState);
                                } else {
                                    this.pregelState = maybeCreateReadyToReceiveNode;
                                    this.sharedValue.setValue(this.pregelState.toBytes());
                                    if (masterCompute(this.pregelState.superstep())) {
                                        this.pregelState = this.pregelState.state(GraphAlgorithmState.State.CANCELLED);
                                        this.sharedValue.setValue(this.pregelState.toBytes());
                                    }
                                }
                            }
                            if (this.pregelState.superstep() > PregelComputation.this.maxIterations) {
                                this.pregelState = this.pregelState.state(GraphAlgorithmState.State.COMPLETED);
                                this.sharedValue.setValue(this.pregelState.toBytes());
                                return;
                            }
                        }
                        if (this.pregelState.stage() == PregelState.Stage.RECEIVE) {
                            if (this.pregelState.superstep() == 0 && !ZKUtils.hasChild(PregelComputation.this.curator, PregelComputation.this.applicationId, this.pregelState, str)) {
                                Set localPartitions = PregelComputation.localPartitions(this.internalConsumer, PregelComputation.this.workSetTopic);
                                Set localPartitions2 = PregelComputation.localPartitions(this.internalConsumer, PregelComputation.this.solutionSetTopic);
                                if (PregelComputation.isTopicSynced(this.internalConsumer, PregelComputation.this.verticesTopic, 0) && PregelComputation.isTopicSynced(this.internalConsumer, PregelComputation.this.edgesGroupedBySourceTopic, 0)) {
                                    ZKUtils.addChild(PregelComputation.this.curator, PregelComputation.this.applicationId, this.pregelState, str, CreateMode.EPHEMERAL);
                                    this.internalConsumer.seekToBeginning(localPartitions);
                                    this.internalConsumer.resume(localPartitions);
                                    this.internalConsumer.seekToBeginning(localPartitions2);
                                    this.internalConsumer.resume(localPartitions2);
                                } else {
                                    this.internalConsumer.pause(localPartitions);
                                    this.internalConsumer.pause(localPartitions2);
                                }
                            }
                            if (ZKUtils.isReady(PregelComputation.this.curator, PregelComputation.this.applicationId, this.pregelState) && !ZKUtils.hasChild(PregelComputation.this.curator, PregelComputation.this.applicationId, this.pregelState, str) && PregelComputation.isTopicSynced(this.internalConsumer, PregelComputation.this.workSetTopic, this.pregelState.superstep())) {
                                ZKUtils.addChild(PregelComputation.this.curator, PregelComputation.this.applicationId, this.pregelState, str, CreateMode.EPHEMERAL);
                            }
                        } else if (this.pregelState.stage() == PregelState.Stage.SEND && ZKUtils.isReady(PregelComputation.this.curator, PregelComputation.this.applicationId, this.pregelState)) {
                            Map<K, Map<K, List<Message>>> map = this.localworkSetStore.get(Integer.valueOf(this.pregelState.superstep()));
                            if (hasVerticesToForward(map) && PregelComputation.isTopicSynced(this.internalConsumer, PregelComputation.this.workSetTopic, this.pregelState.superstep())) {
                                forwardVertices(map);
                            }
                            int superstep = this.pregelState.superstep() - 1;
                            PregelComputation.this.activeVertices.remove(Integer.valueOf(superstep));
                            this.forwardedVertices.remove(Integer.valueOf(superstep));
                            PregelComputation.this.didPreSuperstep.remove(Integer.valueOf(superstep));
                            PregelComputation.this.aggregators.remove(Integer.valueOf(superstep));
                            PregelComputation.this.previousAggregates.remove(Integer.valueOf(superstep));
                            this.localworkSetStore.delete(Integer.valueOf(superstep));
                        }
                    } catch (Exception e) {
                        throw PregelComputation.toRuntimeException(e);
                    }
                });
            } catch (Exception e) {
                throw PregelComputation.toRuntimeException(e);
            }
        }

        private boolean masterCompute(int i) throws Exception {
            Map<String, Aggregator<?>> reduceAggregates = reduceAggregates(i - 1);
            ComputeFunction.MasterCallback masterCallback = new ComputeFunction.MasterCallback(reduceAggregates);
            PregelComputation.this.computeFunction.masterCompute(i, masterCallback);
            saveAggregates(i - 1, reduceAggregates);
            return masterCallback.haltComputation;
        }

        private Map<String, Aggregator<?>> reduceAggregates(int i) {
            String aggregatePath = ZKUtils.aggregatePath(PregelComputation.this.applicationId, i);
            Map<String, Aggregator<?>> initAggregators = PregelComputation.this.initAggregators(PregelComputation.this.newAggregators(), PregelComputation.this.previousAggregates(i));
            Map<String, ChildData> currentChildren = this.aggregateCache.getCurrentChildren(aggregatePath);
            if (currentChildren != null) {
                Iterator<Map.Entry<String, ChildData>> it = currentChildren.entrySet().iterator();
                while (it.hasNext()) {
                    ChildData value = it.next().getValue();
                    if (!value.getPath().endsWith(StreamsConfig.OPTIMIZE)) {
                        byte[] data = value.getData();
                        if (data.length > 0) {
                            initAggregators = PregelComputation.this.mergeAggregators(initAggregators, (Map) KryoUtils.deserialize(data));
                        }
                    }
                }
            }
            return initAggregators;
        }

        private void saveAggregates(int i, Map<String, Aggregator<?>> map) throws Exception {
            ZKUtils.addChild(PregelComputation.this.curator, ZKUtils.aggregatePath(PregelComputation.this.applicationId, i), StreamsConfig.OPTIMIZE, CreateMode.PERSISTENT, KryoUtils.serialize((Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return ((Aggregator) entry.getValue()).getAggregate();
            }))));
        }

        private boolean hasVerticesToForward(Map<K, Map<K, List<Message>>> map) {
            if (map == null) {
                return false;
            }
            for (Map.Entry<K, Map<K, List<Message>>> entry : map.entrySet()) {
                Set<K> set = this.forwardedVertices.get(Integer.valueOf(this.pregelState.superstep()));
                if (set == null || !set.contains(entry.getKey())) {
                    return true;
                }
            }
            return false;
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void forwardVertices(Map<K, Map<K, List<Message>>> map) {
            ArrayList<Map.Entry> arrayList = new ArrayList();
            for (Map.Entry<K, Map<K, List<Message>>> entry : map.entrySet()) {
                Set<K> computeIfAbsent = this.forwardedVertices.computeIfAbsent(Integer.valueOf(this.pregelState.superstep()), num -> {
                    return new HashSet();
                });
                if (!computeIfAbsent.contains(entry.getKey())) {
                    computeIfAbsent.add(entry.getKey());
                    activateVertex(entry);
                    arrayList.add(entry);
                }
            }
            for (Map.Entry entry2 : arrayList) {
                this.context.forward(entry2.getKey(), new Tuple2(Integer.valueOf(this.pregelState.superstep()), entry2.getValue()));
            }
            this.context.commit();
        }

        private void activateVertex(Map.Entry<K, Map<K, List<Message>>> entry) {
            int vertexToPartition = PregelComputation.vertexToPartition(entry.getKey(), PregelComputation.this.serialized.keySerde().serializer(), PregelComputation.this.numPartitions);
            ((Set) ((Map) PregelComputation.this.activeVertices.computeIfAbsent(Integer.valueOf(this.pregelState.superstep()), num -> {
                return new ConcurrentHashMap();
            })).computeIfAbsent(Integer.valueOf(vertexToPartition), num2 -> {
                return ConcurrentHashMap.newKeySet();
            })).add(entry.getKey());
            PregelComputation.log.debug("vertex {} for partition {} for step {} is active", entry.getKey(), Integer.valueOf(vertexToPartition), Integer.valueOf(this.pregelState.superstep()));
        }

        public KeyValue<K, Tuple2<Integer, Map<K, List<Message>>>> transform(K k, Tuple3<Integer, K, List<Message>> tuple3) {
            Map<K, Map<K, List<Message>>> map = this.localworkSetStore.get(tuple3._1);
            if (map == null) {
                map = new HashMap();
            }
            Map<K, List<Message>> computeIfAbsent = map.computeIfAbsent(k, obj -> {
                return new HashMap();
            });
            if (tuple3._3 != null) {
                computeIfAbsent.put(tuple3._2, tuple3._3);
            }
            this.localworkSetStore.put(tuple3._1, map);
            Set<K> set = this.forwardedVertices.get(tuple3._1);
            if (set == null) {
                return null;
            }
            set.remove(k);
            return null;
        }

        @Override // org.apache.kafka.streams.kstream.Transformer
        public void close() {
            if (this.aggregateCache != null) {
                this.aggregateCache.close();
            }
            if (this.barrierCache != null) {
                this.barrierCache.close();
            }
            if (this.sharedValue != null) {
                try {
                    this.sharedValue.close();
                } catch (Exception e) {
                }
            }
            if (this.leaderLatch != null) {
                try {
                    this.leaderLatch.close();
                } catch (Exception e2) {
                }
            }
            if (this.group != null) {
                this.group.close();
            }
        }

        @Override // org.apache.kafka.streams.kstream.Transformer
        public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
            return transform((BarrierSync) obj, (Tuple3<Integer, BarrierSync, List<Message>>) obj2);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.0.0.jar:io/kgraph/pregel/PregelComputation$SendMessages.class */
    private final class SendMessages implements Processor<K, Tuple2<Integer, Map<K, List<Message>>>> {
        private Producer<K, Tuple3<Integer, K, List<Message>>> producer;

        private SendMessages() {
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            this.producer = new KafkaProducer(ClientUtils.producerConfig(PregelComputation.this.bootstrapServers, PregelComputation.this.serialized.keySerde().serializer().getClass(), KryoSerializer.class, PregelComputation.this.streamsConfig != null ? PregelComputation.this.streamsConfig : new Properties()));
        }

        public void process(K k, Tuple2<Integer, Map<K, List<Message>>> tuple2) {
            try {
                for (Map.Entry<K, List<Message>> entry : tuple2._2.entrySet()) {
                    this.producer.send(new ProducerRecord<>(PregelComputation.this.workSetTopic, entry.getKey(), new Tuple3(tuple2._1, k, entry.getValue())), (recordMetadata, exc) -> {
                        if (exc == null) {
                            try {
                                int vertexToPartition = PregelComputation.vertexToPartition(entry.getKey(), PregelComputation.this.serialized.keySerde().serializer(), PregelComputation.this.numPartitions);
                                PregelComputation.log.debug("adding partition {} for vertex {}", Integer.valueOf(vertexToPartition), entry.getKey());
                                ZKUtils.addChild(PregelComputation.this.curator, PregelComputation.this.applicationId, new PregelState(GraphAlgorithmState.State.RUNNING, ((Integer) tuple2._1).intValue(), PregelState.Stage.SEND), "partition-" + vertexToPartition);
                            } catch (Exception e) {
                                throw PregelComputation.toRuntimeException(e);
                            }
                        }
                    });
                }
                this.producer.flush();
                deactivateVertex(tuple2._1.intValue() - 1, k);
            } catch (Exception e) {
                throw PregelComputation.toRuntimeException(e);
            }
        }

        private void deactivateVertex(int i, K k) throws Exception {
            int vertexToPartition = PregelComputation.vertexToPartition(k, PregelComputation.this.serialized.keySerde().serializer(), PregelComputation.this.numPartitions);
            Set set = (Set) ((Map) PregelComputation.this.activeVertices.get(Integer.valueOf(i))).get(Integer.valueOf(vertexToPartition));
            set.remove(k);
            PregelComputation.log.debug("vertex {} for partition {} for step {} is NOT active", k, Integer.valueOf(vertexToPartition), Integer.valueOf(i));
            if (set.isEmpty()) {
                PregelComputation.log.debug("removing partition {} for last vertex {}", Integer.valueOf(vertexToPartition), k);
                ZKUtils.removeChild(PregelComputation.this.curator, PregelComputation.this.applicationId, new PregelState(GraphAlgorithmState.State.RUNNING, i, PregelState.Stage.SEND), "partition-" + vertexToPartition);
                PregelComputation.this.computeFunction.postSuperstep(i, new ComputeFunction.Aggregators(PregelComputation.this.previousAggregates(i), PregelComputation.this.aggregators(vertexToPartition, i)));
                writeAggregate(i, vertexToPartition);
            }
        }

        private void writeAggregate(int i, int i2) throws Exception {
            Map map;
            Map map2 = (Map) PregelComputation.this.aggregators.get(Integer.valueOf(i));
            if (map2 == null || (map = (Map) map2.get(Integer.valueOf(i2))) == null) {
                return;
            }
            ZKUtils.addChild(PregelComputation.this.curator, ZKUtils.aggregatePath(PregelComputation.this.applicationId, i), "partition-" + i2, CreateMode.PERSISTENT, KryoUtils.serialize(map));
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void close() {
            this.producer.close();
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
            process((SendMessages) obj, (Tuple2<Integer, Map<SendMessages, List<Message>>>) obj2);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.0.0.jar:io/kgraph/pregel/PregelComputation$VertexComputeUdf.class */
    private final class VertexComputeUdf implements ValueTransformerWithKey<K, Tuple2<Integer, Map<K, List<Message>>>, Tuple3<Integer, Tuple4<Integer, VV, Integer, VV>, Map<K, List<Message>>>> {
        private KeyValueStore<K, Tuple4<Integer, VV, Integer, VV>> localSolutionSetStore;
        private ReadOnlyKeyValueStore<K, VV> verticesStore;
        private KeyValueStore<K, Map<K, EV>> edgesStore;

        private VertexComputeUdf() {
        }

        @Override // org.apache.kafka.streams.kstream.ValueTransformerWithKey
        public void init(ProcessorContext processorContext) {
            this.localSolutionSetStore = (KeyValueStore) processorContext.getStateStore(PregelComputation.this.localSolutionSetStoreName);
            this.verticesStore = (ReadOnlyKeyValueStore) processorContext.getStateStore(PregelComputation.this.vertices.queryableStoreName());
            this.edgesStore = (KeyValueStore) processorContext.getStateStore(PregelComputation.this.edgesGroupedBySource.queryableStoreName());
        }

        public Tuple3<Integer, Tuple4<Integer, VV, Integer, VV>, Map<K, List<Message>>> transform(K k, Tuple2<Integer, Map<K, List<Message>>> tuple2) {
            int intValue = tuple2._1.intValue();
            Tuple4<Integer, VV, Integer, VV> tuple4 = this.localSolutionSetStore.get(k);
            if (tuple4 == null) {
                VV vv = this.verticesStore.get(k);
                if (vv == null) {
                    PregelComputation.log.warn("No vertex value for {}", k);
                }
                tuple4 = new Tuple4<>(-1, vv, 0, vv);
            }
            Tuple3<Integer, Tuple4<Integer, VV, Integer, VV>, Map<K, List<Message>>> apply = apply(intValue, k, tuple4, tuple2._2);
            if (apply._2 != null) {
                this.localSolutionSetStore.put(k, apply._2);
            }
            return apply;
        }

        private Tuple3<Integer, Tuple4<Integer, VV, Integer, VV>, Map<K, List<Message>>> apply(int i, K k, Tuple4<Integer, VV, Integer, VV> tuple4, Map<K, List<Message>> map) {
            VV vv = tuple4._3.intValue() <= i ? tuple4._4 : tuple4._2;
            int vertexToPartition = PregelComputation.vertexToPartition(k, PregelComputation.this.serialized.keySerde().serializer(), PregelComputation.this.numPartitions);
            Map map2 = (Map) PregelComputation.this.didPreSuperstep.computeIfAbsent(Integer.valueOf(i), num -> {
                return new ConcurrentHashMap();
            });
            Boolean bool = (Boolean) map2.get(Integer.valueOf(vertexToPartition));
            if (bool == null || !bool.booleanValue()) {
                PregelComputation.this.computeFunction.preSuperstep(i, new ComputeFunction.Aggregators(PregelComputation.this.previousAggregates(i), PregelComputation.this.aggregators(vertexToPartition, i)));
                map2.put(Integer.valueOf(vertexToPartition), true);
            }
            ComputeFunction.Callback<K, VV, EV, Message> callback = new ComputeFunction.Callback<>(k, this.edgesStore, PregelComputation.this.previousAggregates(i), PregelComputation.this.aggregators(vertexToPartition, i));
            PregelComputation.this.computeFunction.compute(i, new VertexWithValue<>(k, vv), () -> {
                return map.values().stream().flatMap((v0) -> {
                    return v0.stream();
                }).iterator();
            }, () -> {
                Map<K, EV> map3 = this.edgesStore.get(k);
                if (map3 == null) {
                    map3 = Collections.emptyMap();
                }
                return map3.entrySet().stream().map(entry -> {
                    return new EdgeWithValue(k, entry.getKey(), entry.getValue());
                }).iterator();
            }, callback);
            Tuple4 tuple42 = callback.newVertexValue != null ? new Tuple4(Integer.valueOf(i), vv, Integer.valueOf(i + 1), callback.newVertexValue) : null;
            Map<K, List<Message>> map3 = callback.outgoingMessages;
            if (!callback.voteToHalt) {
                map3.computeIfAbsent(k, obj -> {
                    return new ArrayList();
                });
            }
            return new Tuple3<>(Integer.valueOf(i + 1), tuple42, map3);
        }

        @Override // org.apache.kafka.streams.kstream.ValueTransformerWithKey
        public void close() {
        }

        @Override // org.apache.kafka.streams.kstream.ValueTransformerWithKey
        public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
            return transform((VertexComputeUdf) obj, (Tuple2<Integer, Map<VertexComputeUdf, List<Message>>>) obj2);
        }
    }

    public PregelComputation(String str, String str2, String str3, CuratorFramework curatorFramework, String str4, String str5, GraphSerialized<K, VV, EV> graphSerialized, String str6, String str7, String str8, int i, Map<String, ?> map, Optional<Message> optional, ComputeFunction<K, VV, EV, Message> computeFunction) {
        this.hostAndPort = str;
        this.applicationId = str2;
        this.bootstrapServers = str3;
        this.curator = curatorFramework;
        this.verticesTopic = str4;
        this.edgesGroupedBySourceTopic = str5;
        this.solutionSetStore = str7;
        this.solutionSetTopic = str6;
        this.workSetTopic = str8;
        this.numPartitions = i;
        this.serialized = graphSerialized;
        this.configs = map;
        this.initialMessage = optional;
        this.computeFunction = computeFunction;
        this.edgesStoreName = "edgesStore-" + str2;
        this.verticesStoreName = "verticesStore-" + str2;
        this.localworkSetStoreName = "localworkSetStore-" + str2;
        this.localSolutionSetStoreName = "localSolutionSetStore-" + str2;
        computeFunction.init(map, new ComputeFunction.InitCallback(this.registeredAggregators));
    }

    public KTable<K, VV> vertices() {
        return this.vertices;
    }

    public KTable<K, Map<K, EV>> edgesGroupedBySource() {
        return this.edgesGroupedBySource;
    }

    public KTable<K, VV> result() {
        return this.solutionSet;
    }

    public CompletableFuture<KTable<K, VV>> futureResult() {
        return this.futureResult;
    }

    public void prepare(StreamsBuilder streamsBuilder, Properties properties) {
        this.streamsConfig = properties;
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(this.localworkSetStoreName), Serdes.Integer(), new KryoSerde()));
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(this.localSolutionSetStoreName), this.serialized.keySerde(), new KryoSerde()));
        this.vertices = streamsBuilder.table(this.verticesTopic, Materialized.as(this.verticesStoreName).withKeySerde(this.serialized.keySerde()).withValueSerde(this.serialized.vertexValueSerde()));
        this.edgesGroupedBySource = streamsBuilder.table(this.edgesGroupedBySourceTopic, Materialized.as(this.edgesStoreName).withKeySerde(this.serialized.keySerde()).withValueSerde(new KryoSerde()));
        this.solutionSet = streamsBuilder.table(this.solutionSetTopic, Consumed.with(this.serialized.keySerde(), new KryoSerde())).mapValues(tuple4 -> {
            return tuple4._4;
        }, Materialized.as(this.solutionSetStore));
        this.vertices.toStream().mapValues(obj -> {
            return new Tuple4(-1, obj, 0, obj);
        }).to(this.solutionSetTopic, (Produced<K, VR>) Produced.with(this.serialized.keySerde(), new KryoSerde()));
        this.vertices.toStream().peek((obj2, obj3) -> {
            try {
                ZKUtils.addChild(this.curator, this.applicationId, new PregelState(GraphAlgorithmState.State.CREATED, 0, PregelState.Stage.SEND), "partition-" + vertexToPartition(obj2, this.serialized.keySerde().serializer(), this.numPartitions));
            } catch (Exception e) {
                if (!(e instanceof RuntimeException)) {
                    throw new RuntimeException(e);
                }
            }
        }).mapValues((obj4, obj5) -> {
            return new Tuple3(0, obj4, this.initialMessage.map(Collections::singletonList).orElse(Collections.emptyList()));
        }).peek((obj6, tuple3) -> {
            log.trace("workset 0 before topic: (" + obj6 + ", " + tuple3 + ")");
        }).to(this.workSetTopic, Produced.with(this.serialized.keySerde(), new KryoSerde()));
        this.workSet = streamsBuilder.stream(this.workSetTopic, Consumed.with(this.serialized.keySerde(), new KryoSerde())).peek((obj7, tuple32) -> {
            log.trace("workset 1 after topic: (" + obj7 + ", " + tuple32 + ")");
        }).filter((obj8, tuple33) -> {
            return ((Integer) tuple33._1).intValue() <= this.maxIterations;
        });
        KStream transformValues = this.workSet.transform(() -> {
            return new BarrierSync();
        }, this.localworkSetStoreName).peek((obj9, tuple2) -> {
            log.trace("workset 2 after join: (" + obj9 + ", " + tuple2 + ")");
        }).transformValues(() -> {
            return new VertexComputeUdf();
        }, this.localSolutionSetStoreName, this.vertices.queryableStoreName(), this.edgesGroupedBySource.queryableStoreName());
        transformValues.flatMapValues(tuple34 -> {
            return tuple34._2 != 0 ? Collections.singletonList(tuple34._2) : Collections.emptyList();
        }).peek((obj10, tuple42) -> {
            log.trace("solution set: (" + obj10 + ", " + tuple42 + ")");
        }).to(this.solutionSetTopic, Produced.with(this.serialized.keySerde(), new KryoSerde()));
        transformValues.mapValues(tuple35 -> {
            return new Tuple2(tuple35._1, tuple35._3);
        }).peek((obj11, tuple22) -> {
            log.trace("workset new: (" + obj11 + ", " + tuple22 + ")");
        }).process(() -> {
            return new SendMessages();
        }, new String[0]);
    }

    public PregelState run(int i, CompletableFuture<KTable<K, VV>> completableFuture) {
        this.maxIterations = i;
        this.futureResult = completableFuture;
        PregelState pregelState = new PregelState(GraphAlgorithmState.State.RUNNING, -1, PregelState.Stage.SEND);
        try {
            SharedValue sharedValue = new SharedValue(this.curator, ZKPaths.makePath(ZKUtils.PREGEL_PATH + this.applicationId, ZKUtils.SUPERSTEP), pregelState.toBytes());
            Throwable th = null;
            try {
                try {
                    sharedValue.start();
                    sharedValue.setValue(pregelState.toBytes());
                    if (sharedValue != null) {
                        if (0 != 0) {
                            try {
                                sharedValue.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            sharedValue.close();
                        }
                    }
                    return pregelState;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw toRuntimeException(e);
        }
    }

    public PregelState state() {
        try {
            SharedValue sharedValue = new SharedValue(this.curator, ZKPaths.makePath(ZKUtils.PREGEL_PATH + this.applicationId, ZKUtils.SUPERSTEP), new PregelState(GraphAlgorithmState.State.RUNNING, -1, PregelState.Stage.SEND).toBytes());
            Throwable th = null;
            try {
                try {
                    sharedValue.start();
                    PregelState fromBytes = PregelState.fromBytes(sharedValue.getValue());
                    if (sharedValue != null) {
                        if (0 != 0) {
                            try {
                                sharedValue.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            sharedValue.close();
                        }
                    }
                    return fromBytes;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw toRuntimeException(e);
        }
    }

    protected Map<String, Aggregator<?>> newAggregators() {
        return (Map) this.registeredAggregators.entrySet().stream().collect(Collectors.toConcurrentMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            try {
                return (Aggregator) ClientUtils.getConfiguredInstance(((AggregatorWrapper) entry.getValue()).getAggregatorClass(), this.configs);
            } catch (Exception e) {
                throw toRuntimeException(e);
            }
        }));
    }

    protected Map<String, Aggregator<?>> initAggregators(Map<String, Aggregator<?>> map, Map<String, ?> map2) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            Aggregator aggregator = (Aggregator) entry.getValue();
            Object obj = map2.get(entry.getKey());
            if (obj != null && this.registeredAggregators.get(entry.getKey()).isPersistent()) {
                aggregator.aggregate(obj);
            }
            return aggregator;
        }));
    }

    protected Map<String, Aggregator<?>> mergeAggregators(Map<String, Aggregator<?>> map, Map<String, Aggregator<?>> map2) {
        return (Map) Stream.of((Object[]) new Map[]{map, map2}).map((v0) -> {
            return v0.entrySet();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (aggregator, aggregator2) -> {
            aggregator.aggregate(aggregator2.getAggregate());
            return aggregator;
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, ?> previousAggregates(int i) {
        return this.previousAggregates.computeIfAbsent(Integer.valueOf(i), num -> {
            try {
                String makePath = ZKPaths.makePath(ZKUtils.aggregatePath(this.applicationId, i - 1), StreamsConfig.OPTIMIZE);
                if (this.curator.checkExists().forPath(makePath) == null) {
                    return new HashMap();
                }
                byte[] forPath = this.curator.getData().forPath(makePath);
                return forPath.length > 0 ? (Map) KryoUtils.deserialize(forPath) : new HashMap();
            } catch (Exception e) {
                throw toRuntimeException(e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, Aggregator<?>> aggregators(int i, int i2) {
        return this.aggregators.computeIfAbsent(Integer.valueOf(i2), num -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(i), num2 -> {
            return newAggregators();
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            ZKUtils.removeRoot(this.curator, this.applicationId);
        } catch (Exception e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K> int vertexToPartition(K k, Serializer<K> serializer, int i) {
        return Utils.toPositive(Utils.murmur2(serializer.serialize(null, k))) % i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Consumer<byte[], byte[]> internalConsumer(ProcessorContext processorContext) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = ProcessorContextImpl.class.getDeclaredField("task");
        declaredField.setAccessible(true);
        StreamTask streamTask = (StreamTask) declaredField.get(processorContext);
        Field declaredField2 = AbstractTask.class.getDeclaredField(ConsumerProtocol.PROTOCOL_TYPE);
        declaredField2.setAccessible(true);
        return (Consumer) declaredField2.get(streamTask);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isTopicSynced(Consumer<byte[], byte[]> consumer, String str, int i) {
        Set<TopicPartition> localPartitions = localPartitions(consumer, str);
        Map<TopicPartition, Long> positions = positions(consumer, localPartitions);
        boolean equals = consumer.endOffsets(localPartitions).equals(positions);
        if (equals) {
            log.debug("Synced topic {}, step {}, offsets {}", str, Integer.valueOf(i), positions);
        }
        return equals;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Set<TopicPartition> localPartitions(Consumer<byte[], byte[]> consumer, String str) {
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : consumer.assignment()) {
            if (topicPartition.topic().equals(str)) {
                hashSet.add(topicPartition);
            }
        }
        return hashSet;
    }

    private static Map<TopicPartition, Long> positions(Consumer<byte[], byte[]> consumer, Set<TopicPartition> set) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : set) {
            hashMap.put(topicPartition, Long.valueOf(consumer.position(topicPartition)));
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static RuntimeException toRuntimeException(Exception exc) {
        return exc instanceof RuntimeException ? (RuntimeException) exc : new RuntimeException(exc);
    }
}
