package io.kgraph.rest.server.graph;

import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import io.kgraph.library.GraphAlgorithmType;
import io.kgraph.library.MultipleSourceShortestPaths;
import io.kgraph.library.PageRank;
import io.kgraph.library.cf.Svdpp;
import io.kgraph.pregel.ComputeFunction;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.pregel.ZKUtils;
import io.kgraph.rest.server.KafkaGraphsProperties;
import io.kgraph.tools.importer.GraphImporter;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.nodes.GroupMember;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.reactive.context.ReactiveWebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@EnableConfigurationProperties({KafkaGraphsProperties.class})
@Component
/* loaded from: input_file:BOOT-INF/classes/io/kgraph/rest/server/graph/GraphAlgorithmHandler.class */
public class GraphAlgorithmHandler<EV> implements ApplicationListener<ReactiveWebServerInitializedEvent> {
    private static final String X_KGRAPH_APPID = "X-KGraph-AppId";
    private final KafkaGraphsProperties props;
    private final CuratorFramework curator;
    private int port;
    private GroupMember group;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GraphAlgorithmHandler.class);
    private static final Flux<Long> INTERVAL = Flux.interval(Duration.ofMillis(100), Duration.ofSeconds(2));
    private final ConcurrentMap<String, PregelGraphAlgorithm<?, ?, ?, ?>> algorithms = new ConcurrentHashMap();
    private final String host = getHostAddress();

    public GraphAlgorithmHandler(KafkaGraphsProperties kafkaGraphsProperties, CuratorFramework curatorFramework) {
        this.props = kafkaGraphsProperties;
        this.curator = curatorFramework;
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ReactiveWebServerInitializedEvent reactiveWebServerInitializedEvent) {
        this.port = reactiveWebServerInitializedEvent.getWebServer().getPort();
        this.group = new GroupMember(this.curator, ZKPaths.makePath(ZKUtils.GRAPHS_PATH, ZKUtils.GROUP), getHostAndPort());
        this.group.start();
    }

    public Mono<ServerResponse> importGraph(ServerRequest serverRequest) {
        return ((Mono) serverRequest.body(BodyExtractors.toMultipartData())).flatMap(multiValueMap -> {
            try {
                Map singleValueMap = multiValueMap.toSingleValueMap();
                String str = null;
                FormFieldPart formFieldPart = (FormFieldPart) singleValueMap.get("verticesTopic");
                if (formFieldPart != null) {
                    str = formFieldPart.value();
                }
                String str2 = null;
                FormFieldPart formFieldPart2 = (FormFieldPart) singleValueMap.get("edgesTopic");
                if (formFieldPart2 != null) {
                    str2 = formFieldPart2.value();
                }
                File file = null;
                FilePart filePart = (FilePart) singleValueMap.get("vertexFile");
                if (filePart != null) {
                    file = new File(ClientUtils.tempDirectory(), filePart.filename());
                    filePart.transferTo(file);
                }
                File file2 = null;
                FilePart filePart2 = (FilePart) singleValueMap.get("edgeFile");
                if (filePart2 != null) {
                    file2 = new File(ClientUtils.tempDirectory(), filePart2.filename());
                    filePart2.transferTo(file2);
                }
                String str3 = null;
                FormFieldPart formFieldPart3 = (FormFieldPart) singleValueMap.get("vertexParser");
                if (formFieldPart3 != null) {
                    str3 = formFieldPart3.value();
                }
                String str4 = null;
                FormFieldPart formFieldPart4 = (FormFieldPart) singleValueMap.get("edgeParser");
                if (formFieldPart4 != null) {
                    str4 = formFieldPart4.value();
                }
                String str5 = null;
                FormFieldPart formFieldPart5 = (FormFieldPart) singleValueMap.get("keySerializer");
                if (formFieldPart5 != null) {
                    str5 = formFieldPart5.value();
                }
                String str6 = null;
                FormFieldPart formFieldPart6 = (FormFieldPart) singleValueMap.get("vertexValueSerializer");
                if (formFieldPart6 != null) {
                    str6 = formFieldPart6.value();
                }
                String str7 = null;
                FormFieldPart formFieldPart7 = (FormFieldPart) singleValueMap.get("edgeValueSerializer");
                if (formFieldPart7 != null) {
                    str7 = formFieldPart7.value();
                }
                int i = 50;
                FormFieldPart formFieldPart8 = (FormFieldPart) singleValueMap.get("numPartitions");
                if (formFieldPart8 != null) {
                    i = Integer.parseInt(formFieldPart8.value());
                }
                short s = 1;
                FormFieldPart formFieldPart9 = (FormFieldPart) singleValueMap.get("replicationFactor");
                if (formFieldPart9 != null) {
                    s = Short.parseShort(formFieldPart9.value());
                }
                new GraphImporter(this.props.getBootstrapServers(), str, str2, file, file2, str3, str4, str5, str6, str7, i, s).call();
                return ServerResponse.ok().build();
            } catch (NumberFormatException e) {
                throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid number", e);
            } catch (Exception e2) {
                throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR);
            }
        });
    }

    public Mono<ServerResponse> prepareGraph(ServerRequest serverRequest) {
        String generateRandomHexString = ClientUtils.generateRandomHexString(8);
        return serverRequest.bodyToMono(GroupEdgesBySourceRequest.class).doOnNext(groupEdgesBySourceRequest -> {
            try {
                GraphAlgorithmType algorithm = groupEdgesBySourceRequest.getAlgorithm();
                StreamsBuilder streamsBuilder = new StreamsBuilder();
                GraphSerialized graphSerialized = GraphAlgorithmType.graphSerialized(algorithm, groupEdgesBySourceRequest.isValuesOfTypeDouble());
                Properties streamsConfig = streamsConfig(generateRandomHexString, this.props.getBootstrapServers(), graphSerialized.keySerde(), graphSerialized.vertexValueSerde());
                KTable table = streamsBuilder.table(groupEdgesBySourceRequest.getInitialEdgesTopic(), Consumed.with(new KryoSerde(), graphSerialized.edgeValueSerde()), Materialized.with(new KryoSerde(), graphSerialized.edgeValueSerde()));
                CompletableFuture<Map<TopicPartition, Long>> groupEdgesBySourceAndRepartition = GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, streamsConfig, groupEdgesBySourceRequest.getInitialVerticesTopic() != null ? new KGraph(streamsBuilder.table(groupEdgesBySourceRequest.getInitialVerticesTopic(), Consumed.with(graphSerialized.keySerde(), graphSerialized.vertexValueSerde()), Materialized.with(new KryoSerde(), graphSerialized.edgeValueSerde())), table, graphSerialized) : KGraph.fromEdges(table, GraphAlgorithmType.initialVertexValueMapper(algorithm), graphSerialized), groupEdgesBySourceRequest.getVerticesTopic(), groupEdgesBySourceRequest.getEdgesGroupedBySourceTopic(), groupEdgesBySourceRequest.getNumPartitions(), groupEdgesBySourceRequest.getReplicationFactor());
                if (!groupEdgesBySourceRequest.isAsync()) {
                    groupEdgesBySourceAndRepartition.get();
                }
            } catch (Exception e) {
                throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR);
            }
        }).then(ServerResponse.ok().build());
    }

    public Mono<ServerResponse> configure(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String generateRandomHexString = header.isEmpty() ? ClientUtils.generateRandomHexString(8) : header.iterator().next();
        return serverRequest.bodyToMono(GraphAlgorithmCreateRequest.class).doOnNext(graphAlgorithmCreateRequest -> {
            PregelGraphAlgorithm<?, ?, ?, ?> algorithm = getAlgorithm(generateRandomHexString, graphAlgorithmCreateRequest);
            algorithm.configure(new StreamsBuilder(), streamsConfig(generateRandomHexString, this.props.getBootstrapServers(), algorithm.serialized().keySerde(), algorithm.serialized().vertexValueSerde()));
            this.algorithms.put(generateRandomHexString, algorithm);
        }).flatMapMany(graphAlgorithmCreateRequest2 -> {
            return proxyConfigure(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), generateRandomHexString, graphAlgorithmCreateRequest2);
        }).then(ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((ServerResponse.BodyBuilder) Mono.just(new GraphAlgorithmId(generateRandomHexString)), GraphAlgorithmId.class));
    }

    private PregelGraphAlgorithm<?, ?, ?, ?> getAlgorithm(String str, GraphAlgorithmCreateRequest graphAlgorithmCreateRequest) {
        try {
            GraphAlgorithmType algorithm = graphAlgorithmCreateRequest.getAlgorithm();
            ComputeFunction computeFunction = GraphAlgorithmType.computeFunction(algorithm);
            HashMap hashMap = new HashMap();
            Optional empty = Optional.empty();
            GraphSerialized graphSerialized = GraphAlgorithmType.graphSerialized(algorithm, graphAlgorithmCreateRequest.isValuesOfTypeDouble());
            switch (algorithm) {
                case bfs:
                    hashMap.put("srcVertexId", Long.valueOf(Long.parseLong(getConfig(graphAlgorithmCreateRequest.getConfigs(), "srcVertexId", true))));
                    break;
                case wcc:
                    break;
                case lcc:
                    break;
                case lp:
                    break;
                case mssp:
                    hashMap.put(MultipleSourceShortestPaths.LANDMARK_VERTEX_IDS, (Set) Arrays.stream(getConfig(graphAlgorithmCreateRequest.getConfigs(), MultipleSourceShortestPaths.LANDMARK_VERTEX_IDS, true).split(StringArrayPropertyEditor.DEFAULT_SEPARATOR)).map(Long::parseLong).collect(Collectors.toSet()));
                    break;
                case pagerank:
                    double parseDouble = Double.parseDouble(getConfig(graphAlgorithmCreateRequest.getConfigs(), "tolerance", true));
                    double parseDouble2 = Double.parseDouble(getConfig(graphAlgorithmCreateRequest.getConfigs(), PageRank.RESET_PROBABILITY, true));
                    String config = getConfig(graphAlgorithmCreateRequest.getConfigs(), "srcVertexId", false);
                    hashMap.put("tolerance", Double.valueOf(parseDouble));
                    hashMap.put(PageRank.RESET_PROBABILITY, Double.valueOf(parseDouble2));
                    if (config == null) {
                        empty = Optional.of(Double.valueOf(parseDouble2 / (1.0d - parseDouble2)));
                        break;
                    } else {
                        hashMap.put("srcVertexId", Long.valueOf(Long.parseLong(config)));
                        empty = Optional.of(Double.valueOf(0.0d));
                        break;
                    }
                case sssp:
                    hashMap.put("srcVertexId", Long.valueOf(Long.parseLong(getConfig(graphAlgorithmCreateRequest.getConfigs(), "srcVertexId", true))));
                    break;
                case svdpp:
                    String config2 = getConfig(graphAlgorithmCreateRequest.getConfigs(), Svdpp.BIAS_LAMBDA, false);
                    float parseFloat = config2 != null ? Float.parseFloat(config2) : 0.005f;
                    String config3 = getConfig(graphAlgorithmCreateRequest.getConfigs(), Svdpp.BIAS_GAMMA, false);
                    float parseFloat2 = config3 != null ? Float.parseFloat(config3) : 0.01f;
                    String config4 = getConfig(graphAlgorithmCreateRequest.getConfigs(), Svdpp.FACTOR_LAMBDA, false);
                    float parseFloat3 = config4 != null ? Float.parseFloat(config4) : 0.005f;
                    String config5 = getConfig(graphAlgorithmCreateRequest.getConfigs(), Svdpp.FACTOR_GAMMA, false);
                    float parseFloat4 = config5 != null ? Float.parseFloat(config5) : 0.01f;
                    String config6 = getConfig(graphAlgorithmCreateRequest.getConfigs(), "min.rating", false);
                    float parseFloat5 = config6 != null ? Float.parseFloat(config6) : 0.0f;
                    String config7 = getConfig(graphAlgorithmCreateRequest.getConfigs(), "max.rating", false);
                    float parseFloat6 = config7 != null ? Float.parseFloat(config7) : 5.0f;
                    String config8 = getConfig(graphAlgorithmCreateRequest.getConfigs(), "dim", false);
                    int parseInt = config8 != null ? Integer.parseInt(config8) : 2;
                    String config9 = getConfig(graphAlgorithmCreateRequest.getConfigs(), "random.seed", false);
                    Long valueOf = config9 != null ? Long.valueOf(Long.parseLong(config9)) : null;
                    String config10 = getConfig(graphAlgorithmCreateRequest.getConfigs(), "iterations", false);
                    int parseInt2 = config10 != null ? Integer.parseInt(config10) : Integer.MAX_VALUE;
                    hashMap.put(Svdpp.BIAS_LAMBDA, Float.valueOf(parseFloat));
                    hashMap.put(Svdpp.BIAS_GAMMA, Float.valueOf(parseFloat2));
                    hashMap.put(Svdpp.FACTOR_LAMBDA, Float.valueOf(parseFloat3));
                    hashMap.put(Svdpp.FACTOR_GAMMA, Float.valueOf(parseFloat4));
                    hashMap.put("min.rating", Float.valueOf(parseFloat5));
                    hashMap.put("max.rating", Float.valueOf(parseFloat6));
                    hashMap.put("dim", Integer.valueOf(parseInt));
                    hashMap.put("random.seed", valueOf);
                    hashMap.put("iterations", Integer.valueOf(parseInt2));
                    break;
                default:
                    throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid algorithm: " + algorithm);
            }
            return new PregelGraphAlgorithm<>(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), Collections.emptyMap(), graphSerialized, graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor(), hashMap, empty, computeFunction);
        } catch (NumberFormatException e) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid number", e);
        }
    }

    private String getConfig(Map<String, String> map, String str, boolean z) {
        String str2 = map != null ? map.get(str) : null;
        if (z && str2 == null) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Missing param: " + str);
        }
        return str2;
    }

    private Flux<GraphAlgorithmId> proxyConfigure(Set<String> set, String str, GraphAlgorithmCreateRequest graphAlgorithmCreateRequest) {
        return Flux.fromIterable(set).filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).flatMap(str3 -> {
            log.debug("proxy configure to {}", str3);
            return ((WebClient.RequestBodySpec) WebClient.create("http://" + str3).post().uri("/pregel", new Object[0])).accept2(MediaType.APPLICATION_JSON).header2(X_KGRAPH_APPID, str).body((WebClient.RequestBodySpec) Mono.just(graphAlgorithmCreateRequest), GraphAlgorithmCreateRequest.class).retrieve().bodyToMono(GraphAlgorithmId.class);
        });
    }

    public Mono<ServerResponse> state(ServerRequest serverRequest) {
        PregelGraphAlgorithm<?, ?, ?, ?> pregelGraphAlgorithm = this.algorithms.get(serverRequest.pathVariable("id"));
        return pregelGraphAlgorithm == null ? ServerResponse.notFound().build() : ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((ServerResponse.BodyBuilder) Mono.just(new GraphAlgorithmStatus(pregelGraphAlgorithm.state())), GraphAlgorithmStatus.class);
    }

    public Mono<ServerResponse> run(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String pathVariable = serverRequest.pathVariable("id");
        return serverRequest.bodyToMono(GraphAlgorithmRunRequest.class).flatMapMany(graphAlgorithmRunRequest -> {
            log.debug("num iterations: {}", Integer.valueOf(graphAlgorithmRunRequest.getNumIterations()));
            return Mono.just(new GraphAlgorithmStatus(this.algorithms.get(pathVariable).run(graphAlgorithmRunRequest.getNumIterations()))).mergeWith(proxyRun(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), pathVariable, graphAlgorithmRunRequest));
        }).onErrorMap(RuntimeException.class, runtimeException -> {
            return new ResponseStatusException(HttpStatus.NOT_FOUND);
        }).reduce((graphAlgorithmStatus, graphAlgorithmStatus2) -> {
            return graphAlgorithmStatus;
        }).flatMap(graphAlgorithmStatus3 -> {
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((ServerResponse.BodyBuilder) Mono.just(graphAlgorithmStatus3), GraphAlgorithmStatus.class);
        });
    }

    private Flux<GraphAlgorithmStatus> proxyRun(Set<String> set, String str, GraphAlgorithmRunRequest graphAlgorithmRunRequest) {
        return Flux.fromIterable(set).filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).flatMap(str3 -> {
            log.debug("proxy run to {}", str3);
            return ((WebClient.RequestBodySpec) WebClient.create("http://" + str3).post().uri("/pregel/{id}", str)).accept2(MediaType.APPLICATION_JSON).header2(X_KGRAPH_APPID, str).body((WebClient.RequestBodySpec) Mono.just(graphAlgorithmRunRequest), GraphAlgorithmRunRequest.class).retrieve().bodyToMono(GraphAlgorithmStatus.class);
        });
    }

    public Mono<ServerResponse> configs(ServerRequest serverRequest) {
        PregelGraphAlgorithm<?, ?, ?, ?> pregelGraphAlgorithm = this.algorithms.get(serverRequest.pathVariable("id"));
        return pregelGraphAlgorithm == null ? ServerResponse.notFound().build() : ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((ServerResponse.BodyBuilder) Mono.just(pregelGraphAlgorithm.configs()), Map.class);
    }

    public Mono<ServerResponse> result(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String pathVariable = serverRequest.pathVariable("id");
        PregelGraphAlgorithm<?, ?, ?, ?> pregelGraphAlgorithm = this.algorithms.get(pathVariable);
        if (pregelGraphAlgorithm == null) {
            return ServerResponse.notFound().build();
        }
        return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(BodyInserters.fromPublisher(proxyResult(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), pathVariable, Flux.fromIterable(pregelGraphAlgorithm.result()).map(keyValue -> {
            log.trace("result: ({}, {})", keyValue.key, keyValue.value);
            return new KeyValue(keyValue.key.toString(), keyValue.value.toString());
        })), KeyValue.class));
    }

    private Flux<KeyValue> proxyResult(Set<String> set, String str, Flux<KeyValue> flux) {
        return (Flux) set.stream().filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).map(str3 -> {
            log.debug("proxy result to {}", str3);
            return WebClient.create("http://" + str3).get().uri("/pregel/{id}/result", str).accept2(MediaType.TEXT_EVENT_STREAM).header2(X_KGRAPH_APPID, str).retrieve().bodyToFlux(KeyValue.class);
        }).reduce(flux, (v0, v1) -> {
            return v0.mergeWith(v1);
        });
    }

    public Mono<ServerResponse> filterResult(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String pathVariable = serverRequest.pathVariable("id");
        PregelGraphAlgorithm<?, ?, ?, ?> pregelGraphAlgorithm = this.algorithms.get(pathVariable);
        if (pregelGraphAlgorithm == null) {
            return ServerResponse.notFound().build();
        }
        return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(BodyInserters.fromPublisher(serverRequest.bodyToMono(GraphAlgorithmResultRequest.class).flatMapMany(graphAlgorithmResultRequest -> {
            return proxyFilterResult(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), pathVariable, graphAlgorithmResultRequest, Flux.fromIterable(pregelGraphAlgorithm.result()).filter(keyValue -> {
                return keyValue.key.toString().equals(graphAlgorithmResultRequest.getKey());
            }).map(keyValue2 -> {
                log.trace("result: ({}, {})", keyValue2.key, keyValue2.value);
                return new KeyValue(keyValue2.key.toString(), keyValue2.value.toString());
            }));
        }), KeyValue.class));
    }

    private Flux<KeyValue> proxyFilterResult(Set<String> set, String str, GraphAlgorithmResultRequest graphAlgorithmResultRequest, Flux<KeyValue> flux) {
        return (Flux) set.stream().filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).map(str3 -> {
            log.debug("proxy result to {}", str3);
            return ((WebClient.RequestBodySpec) WebClient.create("http://" + str3).post().uri("/pregel/{id}/result", str)).accept2(MediaType.TEXT_EVENT_STREAM).header2(X_KGRAPH_APPID, str).body((WebClient.RequestBodySpec) Mono.just(graphAlgorithmResultRequest), GraphAlgorithmResultRequest.class).retrieve().bodyToFlux(KeyValue.class);
        }).reduce(flux, (v0, v1) -> {
            return v0.mergeWith(v1);
        });
    }

    public Mono<ServerResponse> delete(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String pathVariable = serverRequest.pathVariable("id");
        this.algorithms.remove(pathVariable).close();
        return proxyDelete(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), pathVariable).then(ServerResponse.noContent().build());
    }

    private Flux<Void> proxyDelete(Set<String> set, String str) {
        return Flux.fromIterable(set).filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).flatMap(str3 -> {
            log.debug("proxy delete to {}", str3);
            return WebClient.create("http://" + str3).delete().uri("/pregel/{id}", str).accept2(MediaType.APPLICATION_JSON).header2(X_KGRAPH_APPID, str).retrieve().bodyToMono(Void.class);
        });
    }

    public static Properties streamsConfig(String str, String str2, Serde<?> serde, Serde<?> serde2) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, str);
        properties.put("client.id", str + "-client");
        properties.put("bootstrap.servers", str2);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serde.getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde2.getClass().getName());
        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
        properties.put(StreamsConfig.STATE_DIR_CONFIG, ClientUtils.tempDirectory().getAbsolutePath());
        return properties;
    }

    public String getHostAndPort() {
        return this.host + ":" + this.port;
    }

    public String getHostAddress() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    public int getPort() {
        return this.port;
    }
}
