package io.simplesource.kafka.internal.client;

import io.simplesource.api.CommandAPI;
import io.simplesource.api.CommandError;
import io.simplesource.data.FutureResult;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.CommandSerdes;
import io.simplesource.kafka.api.ResourceNamingStrategy;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.client.KafkaRequestAPI;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.CommandResponse;
import io.simplesource.kafka.spec.CommandSpec;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaCommandAPI.class */
public final class KafkaCommandAPI<K, C> implements CommandAPI<K, C> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaCommandAPI.class);
    private KafkaRequestAPI<K, CommandRequest<K, C>, CommandResponse> requestApi;

    public KafkaCommandAPI(CommandSpec<K, C> commandSpec, KafkaConfig kafkaConfig) {
        this.requestApi = new KafkaRequestAPI<>(getRequestAPIContext(commandSpec, kafkaConfig));
    }

    public KafkaCommandAPI(CommandSpec<K, C> commandSpec, KafkaConfig kafkaConfig, RequestPublisher<K, CommandRequest<K, C>> requestPublisher, RequestPublisher<UUID, String> requestPublisher2, Function<BiConsumer<UUID, CommandResponse>, ResponseSubscription> function) {
        this.requestApi = new KafkaRequestAPI<>(getRequestAPIContext(commandSpec, kafkaConfig), requestPublisher, requestPublisher2, function, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public FutureResult<CommandError, UUID> publishCommand(CommandAPI.Request<K, C> request) {
        return FutureResult.ofFuture(this.requestApi.publishRequest(request.key(), request.commandId(), new CommandRequest(request.key(), request.command(), request.readSequence(), request.commandId())).fold(nonEmptyList -> {
            return FutureResult.fail(CommandError.of(CommandError.Reason.CommandPublishError, (Throwable) nonEmptyList.head()), new CommandError[0]);
        }, publishResult -> {
            return FutureResult.of(request.commandId());
        }), exc -> {
            logger.debug("Error in publishing command", exc);
            return CommandError.of(CommandError.Reason.CommandPublishError, (Throwable) Optional.ofNullable(exc.getCause()).orElse(exc));
        }).flatMap(futureResult -> {
            return futureResult;
        });
    }

    public FutureResult<CommandError, Sequence> queryCommandResult(UUID uuid, Duration duration) {
        return FutureResult.ofCompletableFuture(this.requestApi.queryResponse(uuid, duration).thenApply((v0) -> {
            return v0.sequenceResult();
        }));
    }

    public void close() {
        this.requestApi.close();
    }

    public static <K, C> KafkaRequestAPI.RequestAPIContext<K, CommandRequest<K, C>, CommandResponse> getRequestAPIContext(CommandSpec<K, C> commandSpec, KafkaConfig kafkaConfig) {
        ResourceNamingStrategy resourceNamingStrategy = commandSpec.resourceNamingStrategy();
        CommandSerdes<K, C> serdes = commandSpec.serdes();
        return KafkaRequestAPI.RequestAPIContext.builder().kafkaConfig(kafkaConfig).requestTopic(resourceNamingStrategy.topicName(commandSpec.aggregateName(), AggregateResources.TopicEntity.command_request.name())).responseTopicMapTopic(resourceNamingStrategy.topicName(commandSpec.aggregateName(), AggregateResources.TopicEntity.command_response_topic_map.name())).privateResponseTopic(String.format("%s_%s", resourceNamingStrategy.topicName(commandSpec.aggregateName(), AggregateResources.TopicEntity.command_response.name()), commandSpec.clientId())).requestKeySerde(serdes.aggregateKey()).requestValueSerde(serdes.commandRequest()).responseKeySerde(serdes.commandResponseKey()).responseValueSerde(serdes.commandResponse()).responseWindowSpec(commandSpec.commandResponseWindowSpec()).outputTopicConfig(commandSpec.outputTopicConfig()).build();
    }
}
