package io.kgraph.pregel;

import io.kgraph.GraphAlgorithm;
import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.pregel.aggregators.Aggregator;
import io.kgraph.utils.ClientUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kgraph/pregel/PregelGraphAlgorithm.class */
public abstract class PregelGraphAlgorithm<K, VV, EV, Message> implements GraphAlgorithm<K, VV, EV, KTable<K, VV>> {
    private static final Logger log = LoggerFactory.getLogger(PregelGraphAlgorithm.class);
    protected final String hostAndPort;
    protected final String applicationId;
    protected final String bootstrapServers;
    protected final String zookeeperConnect;
    protected final CuratorFramework curator;
    protected final String verticesTopic;
    protected final String edgesGroupedBySourceTopic;
    protected final GraphSerialized<K, VV, EV> serialized;
    protected final String solutionSetTopic;
    protected final String solutionSetStore;
    protected final String workSetTopic;
    protected final int numPartitions;
    protected final short replicationFactor;
    protected final PregelComputation<K, VV, EV, Message> computation;
    protected final Map<String, Class<? extends Aggregator<?>>> registeredAggregators;
    protected KafkaStreams streams;

    public PregelGraphAlgorithm(String str, String str2, String str3, String str4, GraphSerialized<K, VV, EV> graphSerialized, int i, short s, Optional<Message> optional) {
        this(str, str2, str3, ZKUtils.createCurator(str4), "vertices-" + str2, "edgesGroupedBySource-" + str2, graphSerialized, "solutionSet-" + str2, "solutionSetStore-" + str2, "workSet-" + str2, i, s, optional);
    }

    public PregelGraphAlgorithm(String str, String str2, String str3, CuratorFramework curatorFramework, String str4, String str5, GraphSerialized<K, VV, EV> graphSerialized, int i, short s, Optional<Message> optional) {
        this(str, str2, str3, curatorFramework, str4, str5, graphSerialized, "solutionSet-" + str2, "solutionSetStore-" + str2, "workSet-" + str2, i, s, optional);
    }

    public PregelGraphAlgorithm(String str, String str2, String str3, String str4, String str5, String str6, GraphSerialized<K, VV, EV> graphSerialized, String str7, String str8, String str9, int i, short s, Optional<Message> optional) {
        this(str, str2, str3, ZKUtils.createCurator(str4), str5, str6, graphSerialized, str7, str8, str9, i, s, optional);
    }

    public PregelGraphAlgorithm(String str, String str2, String str3, CuratorFramework curatorFramework, String str4, String str5, GraphSerialized<K, VV, EV> graphSerialized, String str6, String str7, String str8, int i, short s, Optional<Message> optional) {
        this.registeredAggregators = new HashMap();
        this.hostAndPort = str;
        this.applicationId = str2;
        this.bootstrapServers = str3;
        this.zookeeperConnect = null;
        this.curator = curatorFramework;
        this.verticesTopic = str4;
        this.edgesGroupedBySourceTopic = str5;
        this.serialized = graphSerialized;
        this.solutionSetTopic = str6;
        this.solutionSetStore = str7;
        this.workSetTopic = str8;
        this.numPartitions = i;
        this.replicationFactor = s;
        this.computation = new PregelComputation<>(str, str2, str3, curatorFramework, str4, str5, graphSerialized, str6, str7, str8, i, optional, computeFunction(), this.registeredAggregators);
    }

    public void registerAggregator(String str, Class<? extends Aggregator<?>> cls) {
        this.registeredAggregators.put(str, cls);
    }

    @Override // io.kgraph.GraphAlgorithm
    public GraphAlgorithmState<Void> configure(StreamsBuilder streamsBuilder, Properties properties) {
        ClientUtils.createTopic(this.solutionSetTopic, this.numPartitions, this.replicationFactor, properties);
        ClientUtils.createTopic(this.workSetTopic, this.numPartitions, this.replicationFactor, properties);
        this.computation.prepare(streamsBuilder, properties);
        Topology build = streamsBuilder.build();
        log.info("Topology description {}", build.describe());
        this.streams = new KafkaStreams(build, new StreamsConfig(properties), new PregelClientSupplier());
        this.streams.start();
        return new GraphAlgorithmState<>(this.streams, GraphAlgorithmState.State.CREATED, 0, 0L, null);
    }

    @Override // io.kgraph.GraphAlgorithm
    public GraphAlgorithmState<KTable<K, VV>> run(int i) {
        CompletableFuture<KTable<K, VV>> completableFuture = new CompletableFuture<>();
        PregelState run = this.computation.run(i, completableFuture);
        return new GraphAlgorithmState<>(this.streams, run.state(), run.superstep(), run.runningTime(), completableFuture);
    }

    @Override // io.kgraph.GraphAlgorithm
    public GraphAlgorithmState<KTable<K, VV>> state() {
        PregelState state = this.computation.state();
        return new GraphAlgorithmState<>(this.streams, state.state(), state.superstep(), state.runningTime(), this.computation.futureResult());
    }

    @Override // io.kgraph.GraphAlgorithm
    public Iterable<KeyValue<K, VV>> result() {
        return () -> {
            return ((ReadOnlyKeyValueStore) this.streams.store(this.solutionSetStore, QueryableStoreTypes.keyValueStore())).all();
        };
    }

    protected abstract ComputeFunction<K, VV, EV, Message> computeFunction();

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.streams.close();
    }
}
