package io.simplesource.kafka.internal.client;

import avro.shaded.com.google.common.collect.Lists;
import io.simplesource.data.FutureResult;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.client.RequestPublisher;
import io.simplesource.kafka.spec.TopicSpec;
import io.simplesource.kafka.spec.WindowSpec;
import java.time.Clock;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaRequestAPI.class */
public final class KafkaRequestAPI<K, I, O> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRequestAPI.class);
    private final RequestAPIContext<K, I, O> ctx;
    private final ResponseSubscription responseSubscription;
    private final ExpiringMap<UUID, ResponseHandler<O>> responseHandlers;
    private final RequestPublisher<K, I> requestSender;
    private final RequestPublisher<UUID, String> responseTopicMapSender;

    /* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaRequestAPI$RequestAPIContext.class */
    public static final class RequestAPIContext<K, I, O> {
        private final KafkaConfig kafkaConfig;
        private final String requestTopic;
        private final String responseTopicMapTopic;
        private final String privateResponseTopic;
        private final Serde<K> requestKeySerde;
        private final Serde<I> requestValueSerde;
        private final Serde<UUID> responseKeySerde;
        private final Serde<O> responseValueSerde;
        private final WindowSpec responseWindowSpec;
        private final TopicSpec outputTopicConfig;

        /* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaRequestAPI$RequestAPIContext$RequestAPIContextBuilder.class */
        public static class RequestAPIContextBuilder<K, I, O> {
            private KafkaConfig kafkaConfig;
            private String requestTopic;
            private String responseTopicMapTopic;
            private String privateResponseTopic;
            private Serde<K> requestKeySerde;
            private Serde<I> requestValueSerde;
            private Serde<UUID> responseKeySerde;
            private Serde<O> responseValueSerde;
            private WindowSpec responseWindowSpec;
            private TopicSpec outputTopicConfig;

            RequestAPIContextBuilder() {
            }

            public RequestAPIContextBuilder<K, I, O> kafkaConfig(KafkaConfig kafkaConfig) {
                this.kafkaConfig = kafkaConfig;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> requestTopic(String str) {
                this.requestTopic = str;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> responseTopicMapTopic(String str) {
                this.responseTopicMapTopic = str;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> privateResponseTopic(String str) {
                this.privateResponseTopic = str;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> requestKeySerde(Serde<K> serde) {
                this.requestKeySerde = serde;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> requestValueSerde(Serde<I> serde) {
                this.requestValueSerde = serde;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> responseKeySerde(Serde<UUID> serde) {
                this.responseKeySerde = serde;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> responseValueSerde(Serde<O> serde) {
                this.responseValueSerde = serde;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> responseWindowSpec(WindowSpec windowSpec) {
                this.responseWindowSpec = windowSpec;
                return this;
            }

            public RequestAPIContextBuilder<K, I, O> outputTopicConfig(TopicSpec topicSpec) {
                this.outputTopicConfig = topicSpec;
                return this;
            }

            public RequestAPIContext<K, I, O> build() {
                return new RequestAPIContext<>(this.kafkaConfig, this.requestTopic, this.responseTopicMapTopic, this.privateResponseTopic, this.requestKeySerde, this.requestValueSerde, this.responseKeySerde, this.responseValueSerde, this.responseWindowSpec, this.outputTopicConfig);
            }

            public String toString() {
                return "KafkaRequestAPI.RequestAPIContext.RequestAPIContextBuilder(kafkaConfig=" + this.kafkaConfig + ", requestTopic=" + this.requestTopic + ", responseTopicMapTopic=" + this.responseTopicMapTopic + ", privateResponseTopic=" + this.privateResponseTopic + ", requestKeySerde=" + this.requestKeySerde + ", requestValueSerde=" + this.requestValueSerde + ", responseKeySerde=" + this.responseKeySerde + ", responseValueSerde=" + this.responseValueSerde + ", responseWindowSpec=" + this.responseWindowSpec + ", outputTopicConfig=" + this.outputTopicConfig + ")";
            }
        }

        RequestAPIContext(KafkaConfig kafkaConfig, String str, String str2, String str3, Serde<K> serde, Serde<I> serde2, Serde<UUID> serde3, Serde<O> serde4, WindowSpec windowSpec, TopicSpec topicSpec) {
            this.kafkaConfig = kafkaConfig;
            this.requestTopic = str;
            this.responseTopicMapTopic = str2;
            this.privateResponseTopic = str3;
            this.requestKeySerde = serde;
            this.requestValueSerde = serde2;
            this.responseKeySerde = serde3;
            this.responseValueSerde = serde4;
            this.responseWindowSpec = windowSpec;
            this.outputTopicConfig = topicSpec;
        }

        public static <K, I, O> RequestAPIContextBuilder<K, I, O> builder() {
            return new RequestAPIContextBuilder<>();
        }

        public KafkaConfig kafkaConfig() {
            return this.kafkaConfig;
        }

        public String requestTopic() {
            return this.requestTopic;
        }

        public String responseTopicMapTopic() {
            return this.responseTopicMapTopic;
        }

        public String privateResponseTopic() {
            return this.privateResponseTopic;
        }

        public Serde<K> requestKeySerde() {
            return this.requestKeySerde;
        }

        public Serde<I> requestValueSerde() {
            return this.requestValueSerde;
        }

        public Serde<UUID> responseKeySerde() {
            return this.responseKeySerde;
        }

        public Serde<O> responseValueSerde() {
            return this.responseValueSerde;
        }

        public WindowSpec responseWindowSpec() {
            return this.responseWindowSpec;
        }

        public TopicSpec outputTopicConfig() {
            return this.outputTopicConfig;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof RequestAPIContext)) {
                return false;
            }
            RequestAPIContext requestAPIContext = (RequestAPIContext) obj;
            KafkaConfig kafkaConfig = kafkaConfig();
            KafkaConfig kafkaConfig2 = requestAPIContext.kafkaConfig();
            if (kafkaConfig == null) {
                if (kafkaConfig2 != null) {
                    return false;
                }
            } else if (!kafkaConfig.equals(kafkaConfig2)) {
                return false;
            }
            String requestTopic = requestTopic();
            String requestTopic2 = requestAPIContext.requestTopic();
            if (requestTopic == null) {
                if (requestTopic2 != null) {
                    return false;
                }
            } else if (!requestTopic.equals(requestTopic2)) {
                return false;
            }
            String responseTopicMapTopic = responseTopicMapTopic();
            String responseTopicMapTopic2 = requestAPIContext.responseTopicMapTopic();
            if (responseTopicMapTopic == null) {
                if (responseTopicMapTopic2 != null) {
                    return false;
                }
            } else if (!responseTopicMapTopic.equals(responseTopicMapTopic2)) {
                return false;
            }
            String privateResponseTopic = privateResponseTopic();
            String privateResponseTopic2 = requestAPIContext.privateResponseTopic();
            if (privateResponseTopic == null) {
                if (privateResponseTopic2 != null) {
                    return false;
                }
            } else if (!privateResponseTopic.equals(privateResponseTopic2)) {
                return false;
            }
            Serde<K> requestKeySerde = requestKeySerde();
            Serde<K> requestKeySerde2 = requestAPIContext.requestKeySerde();
            if (requestKeySerde == null) {
                if (requestKeySerde2 != null) {
                    return false;
                }
            } else if (!requestKeySerde.equals(requestKeySerde2)) {
                return false;
            }
            Serde<I> requestValueSerde = requestValueSerde();
            Serde<I> requestValueSerde2 = requestAPIContext.requestValueSerde();
            if (requestValueSerde == null) {
                if (requestValueSerde2 != null) {
                    return false;
                }
            } else if (!requestValueSerde.equals(requestValueSerde2)) {
                return false;
            }
            Serde<UUID> responseKeySerde = responseKeySerde();
            Serde<UUID> responseKeySerde2 = requestAPIContext.responseKeySerde();
            if (responseKeySerde == null) {
                if (responseKeySerde2 != null) {
                    return false;
                }
            } else if (!responseKeySerde.equals(responseKeySerde2)) {
                return false;
            }
            Serde<O> responseValueSerde = responseValueSerde();
            Serde<O> responseValueSerde2 = requestAPIContext.responseValueSerde();
            if (responseValueSerde == null) {
                if (responseValueSerde2 != null) {
                    return false;
                }
            } else if (!responseValueSerde.equals(responseValueSerde2)) {
                return false;
            }
            WindowSpec responseWindowSpec = responseWindowSpec();
            WindowSpec responseWindowSpec2 = requestAPIContext.responseWindowSpec();
            if (responseWindowSpec == null) {
                if (responseWindowSpec2 != null) {
                    return false;
                }
            } else if (!responseWindowSpec.equals(responseWindowSpec2)) {
                return false;
            }
            TopicSpec outputTopicConfig = outputTopicConfig();
            TopicSpec outputTopicConfig2 = requestAPIContext.outputTopicConfig();
            return outputTopicConfig == null ? outputTopicConfig2 == null : outputTopicConfig.equals(outputTopicConfig2);
        }

        public int hashCode() {
            KafkaConfig kafkaConfig = kafkaConfig();
            int hashCode = (1 * 59) + (kafkaConfig == null ? 43 : kafkaConfig.hashCode());
            String requestTopic = requestTopic();
            int hashCode2 = (hashCode * 59) + (requestTopic == null ? 43 : requestTopic.hashCode());
            String responseTopicMapTopic = responseTopicMapTopic();
            int hashCode3 = (hashCode2 * 59) + (responseTopicMapTopic == null ? 43 : responseTopicMapTopic.hashCode());
            String privateResponseTopic = privateResponseTopic();
            int hashCode4 = (hashCode3 * 59) + (privateResponseTopic == null ? 43 : privateResponseTopic.hashCode());
            Serde<K> requestKeySerde = requestKeySerde();
            int hashCode5 = (hashCode4 * 59) + (requestKeySerde == null ? 43 : requestKeySerde.hashCode());
            Serde<I> requestValueSerde = requestValueSerde();
            int hashCode6 = (hashCode5 * 59) + (requestValueSerde == null ? 43 : requestValueSerde.hashCode());
            Serde<UUID> responseKeySerde = responseKeySerde();
            int hashCode7 = (hashCode6 * 59) + (responseKeySerde == null ? 43 : responseKeySerde.hashCode());
            Serde<O> responseValueSerde = responseValueSerde();
            int hashCode8 = (hashCode7 * 59) + (responseValueSerde == null ? 43 : responseValueSerde.hashCode());
            WindowSpec responseWindowSpec = responseWindowSpec();
            int hashCode9 = (hashCode8 * 59) + (responseWindowSpec == null ? 43 : responseWindowSpec.hashCode());
            TopicSpec outputTopicConfig = outputTopicConfig();
            return (hashCode9 * 59) + (outputTopicConfig == null ? 43 : outputTopicConfig.hashCode());
        }

        public String toString() {
            return "KafkaRequestAPI.RequestAPIContext(kafkaConfig=" + kafkaConfig() + ", requestTopic=" + requestTopic() + ", responseTopicMapTopic=" + responseTopicMapTopic() + ", privateResponseTopic=" + privateResponseTopic() + ", requestKeySerde=" + requestKeySerde() + ", requestValueSerde=" + requestValueSerde() + ", responseKeySerde=" + responseKeySerde() + ", responseValueSerde=" + responseValueSerde() + ", responseWindowSpec=" + responseWindowSpec() + ", outputTopicConfig=" + outputTopicConfig() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaRequestAPI$ResponseHandler.class */
    public static final class ResponseHandler<O> {
        private final List<CompletableFuture<O>> responseFutures;
        private final Optional<O> response;

        /* JADX INFO: Access modifiers changed from: package-private */
        public static <O> ResponseHandler<O> initialise(Optional<O> optional) {
            return new ResponseHandler<>(Lists.newArrayList(), optional);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void forEachFuture(Consumer<CompletableFuture<O>> consumer) {
            this.responseFutures.forEach(consumer);
        }

        public ResponseHandler(List<CompletableFuture<O>> list, Optional<O> optional) {
            this.responseFutures = list;
            this.response = optional;
        }

        public List<CompletableFuture<O>> responseFutures() {
            return this.responseFutures;
        }

        public Optional<O> response() {
            return this.response;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ResponseHandler)) {
                return false;
            }
            ResponseHandler responseHandler = (ResponseHandler) obj;
            List<CompletableFuture<O>> responseFutures = responseFutures();
            List<CompletableFuture<O>> responseFutures2 = responseHandler.responseFutures();
            if (responseFutures == null) {
                if (responseFutures2 != null) {
                    return false;
                }
            } else if (!responseFutures.equals(responseFutures2)) {
                return false;
            }
            Optional<O> response = response();
            Optional<O> response2 = responseHandler.response();
            return response == null ? response2 == null : response.equals(response2);
        }

        public int hashCode() {
            List<CompletableFuture<O>> responseFutures = responseFutures();
            int hashCode = (1 * 59) + (responseFutures == null ? 43 : responseFutures.hashCode());
            Optional<O> response = response();
            return (hashCode * 59) + (response == null ? 43 : response.hashCode());
        }

        public String toString() {
            return "KafkaRequestAPI.ResponseHandler(responseFutures=" + responseFutures() + ", response=" + response() + ")";
        }
    }

    /* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaRequestAPI$ResponseReceiver.class */
    static final class ResponseReceiver<K, M, V> {
        private final ExpiringMap<K, M> expiringMap;
        private final BiFunction<M, V, M> mapModifier;

        void receive(K k, V v) {
            this.expiringMap.computeIfPresent(k, obj -> {
                return this.mapModifier.apply(obj, v);
            });
        }

        public ResponseReceiver(ExpiringMap<K, M> expiringMap, BiFunction<M, V, M> biFunction) {
            this.expiringMap = expiringMap;
            this.mapModifier = biFunction;
        }

        public ExpiringMap<K, M> expiringMap() {
            return this.expiringMap;
        }

        public BiFunction<M, V, M> mapModifier() {
            return this.mapModifier;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ResponseReceiver)) {
                return false;
            }
            ResponseReceiver responseReceiver = (ResponseReceiver) obj;
            ExpiringMap<K, M> expiringMap = expiringMap();
            ExpiringMap<K, M> expiringMap2 = responseReceiver.expiringMap();
            if (expiringMap == null) {
                if (expiringMap2 != null) {
                    return false;
                }
            } else if (!expiringMap.equals(expiringMap2)) {
                return false;
            }
            BiFunction<M, V, M> mapModifier = mapModifier();
            BiFunction<M, V, M> mapModifier2 = responseReceiver.mapModifier();
            return mapModifier == null ? mapModifier2 == null : mapModifier.equals(mapModifier2);
        }

        public int hashCode() {
            ExpiringMap<K, M> expiringMap = expiringMap();
            int hashCode = (1 * 59) + (expiringMap == null ? 43 : expiringMap.hashCode());
            BiFunction<M, V, M> mapModifier = mapModifier();
            return (hashCode * 59) + (mapModifier == null ? 43 : mapModifier.hashCode());
        }

        public String toString() {
            return "KafkaRequestAPI.ResponseReceiver(expiringMap=" + expiringMap() + ", mapModifier=" + mapModifier() + ")";
        }
    }

    private static <K, V> RequestPublisher<K, V> kakfaProducerSender(KafkaConfig kafkaConfig, String str, Serde<K> serde, Serde<V> serde2) {
        KafkaProducer kafkaProducer = new KafkaProducer(kafkaConfig.producerConfig(), serde.serializer(), serde2.serializer());
        return (obj, obj2) -> {
            return FutureResult.ofFuture(kafkaProducer.send(new ProducerRecord(str, obj, obj2)), exc -> {
                return exc;
            }).map(recordMetadata -> {
                return new RequestPublisher.PublishResult(recordMetadata.timestamp());
            });
        };
    }

    public KafkaRequestAPI(RequestAPIContext<K, I, O> requestAPIContext) {
        this(requestAPIContext, kakfaProducerSender(((RequestAPIContext) requestAPIContext).kafkaConfig, ((RequestAPIContext) requestAPIContext).requestTopic, ((RequestAPIContext) requestAPIContext).requestKeySerde, ((RequestAPIContext) requestAPIContext).requestValueSerde), kakfaProducerSender(((RequestAPIContext) requestAPIContext).kafkaConfig, ((RequestAPIContext) requestAPIContext).responseTopicMapTopic, ((RequestAPIContext) requestAPIContext).responseKeySerde, Serdes.String()), biConsumer -> {
            return KafkaConsumerRunner.run(requestAPIContext.kafkaConfig().consumerConfig(), requestAPIContext.privateResponseTopic(), requestAPIContext.responseValueSerde(), biConsumer);
        }, true);
    }

    public KafkaRequestAPI(RequestAPIContext<K, I, O> requestAPIContext, RequestPublisher<K, I> requestPublisher, RequestPublisher<UUID, String> requestPublisher2, Function<BiConsumer<UUID, O>, ResponseSubscription> function, boolean z) {
        KafkaConfig kafkaConfig = requestAPIContext.kafkaConfig();
        this.ctx = requestAPIContext;
        long retentionInSeconds = requestAPIContext.responseWindowSpec().retentionInSeconds();
        this.requestSender = requestPublisher;
        this.responseTopicMapSender = requestPublisher2;
        if (z) {
            AdminClient create = AdminClient.create(kafkaConfig.adminClientConfig());
            try {
                Set set = (Set) create.listTopics().names().get();
                String str = ((RequestAPIContext) requestAPIContext).privateResponseTopic;
                if (!set.contains(str)) {
                    TopicSpec outputTopicConfig = requestAPIContext.outputTopicConfig();
                    create.createTopics(Collections.singletonList(new NewTopic(str, outputTopicConfig.partitionCount(), outputTopicConfig.replicaCount()))).all().get();
                }
            } catch (Exception e) {
                throw new RuntimeException("Unable to create required topics.", e);
            }
        }
        this.responseHandlers = new ExpiringMap<>(retentionInSeconds, Clock.systemUTC());
        ResponseReceiver responseReceiver = new ResponseReceiver(this.responseHandlers, (responseHandler, obj) -> {
            responseHandler.forEachFuture(completableFuture -> {
                completableFuture.complete(obj);
            });
            return ResponseHandler.initialise(Optional.of(obj));
        });
        responseReceiver.getClass();
        this.responseSubscription = function.apply((v1, v2) -> {
            r2.receive(v1, v2);
        });
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("CommandAPI shutting down");
            this.responseSubscription.close();
        }));
    }

    public FutureResult<Exception, RequestPublisher.PublishResult> publishRequest(K k, UUID uuid, I i) {
        FutureResult<Exception, RequestPublisher.PublishResult> map = this.responseTopicMapSender.publish(uuid, this.ctx.privateResponseTopic()).flatMap(publishResult -> {
            return this.requestSender.publish(k, i);
        }).map(publishResult2 -> {
            this.responseHandlers.insertIfAbsent(uuid, () -> {
                return ResponseHandler.initialise(Optional.empty());
            });
            return publishResult2;
        });
        this.responseHandlers.removeStaleAsync(responseHandler -> {
            responseHandler.forEachFuture(completableFuture -> {
                completableFuture.completeExceptionally(new Exception("Request timed out."));
            });
        });
        return map;
    }

    public CompletableFuture<O> queryResponse(UUID uuid, Duration duration) {
        CompletableFuture<O> completableFuture = new CompletableFuture<>();
        if (this.responseHandlers.computeIfPresent(uuid, responseHandler -> {
            Optional optional = responseHandler.response;
            if (optional.isPresent()) {
                completableFuture.complete(optional.get());
            } else {
                responseHandler.responseFutures.add(completableFuture);
            }
            return responseHandler;
        }) == null) {
            completableFuture.completeExceptionally(new Exception("Invalid commandId."));
        }
        return completableFuture;
    }

    public void close() {
        this.responseHandlers.removeAll(responseHandler -> {
            responseHandler.forEachFuture(completableFuture -> {
                completableFuture.completeExceptionally(new Exception("Consumer closed before future."));
            });
        });
        this.responseSubscription.close();
    }
}
