package io.simplesource.kafka.internal.streams.topology;

import io.simplesource.api.CommandError;
import io.simplesource.data.Result;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.ValueWithSequence;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/simplesource/kafka/internal/streams/topology/CommandRequestTransformer.class */
public final class CommandRequestTransformer<K, C, E, A> implements ValueTransformerWithKey<K, CommandRequest<K, C>, CommandEvents<E, A>> {
    private static final Logger logger = LoggerFactory.getLogger(CommandRequestTransformer.class);
    private ReadOnlyKeyValueStore<K, AggregateUpdate<A>> stateStore;
    private TopologyContext<K, C, E, A> ctx;

    public CommandRequestTransformer(TopologyContext<K, C, E, A> topologyContext) {
        this.ctx = topologyContext;
    }

    public void init(ProcessorContext processorContext) {
        this.stateStore = processorContext.getStateStore(this.ctx.stateStoreName(AggregateResources.StateStoreEntity.aggregate_update));
    }

    public CommandEvents<E, A> transform(K k, CommandRequest<K, C> commandRequest) {
        AggregateUpdate of;
        Result failure;
        try {
            of = (AggregateUpdate) Optional.ofNullable(this.stateStore.get(k)).orElse(AggregateUpdate.of(this.ctx.initialValue().empty(k)));
        } catch (Exception e) {
            of = AggregateUpdate.of(this.ctx.initialValue().empty(k));
        }
        AggregateUpdate aggregateUpdate = of;
        try {
            failure = (Result) (Objects.equals(commandRequest.readSequence(), aggregateUpdate.sequence()) ? Optional.empty() : this.ctx.aggregateSpec().generation().invalidSequenceHandler().shouldReject(k, aggregateUpdate.sequence(), commandRequest.readSequence(), aggregateUpdate.aggregate(), commandRequest.command())).map(commandError -> {
                return Result.failure(commandError, new CommandError[0]);
            }).orElseGet(() -> {
                return this.ctx.aggregateSpec().generation().commandHandler().interpretCommand(k, aggregateUpdate.aggregate(), commandRequest.command());
            });
        } catch (Exception e2) {
            logger.warn("[{} aggregate] Failed to apply command handler on key {} to request {}", new Object[]{this.ctx.aggregateSpec().aggregateName(), k, commandRequest, e2});
            failure = Result.failure(CommandError.of(CommandError.Reason.CommandHandlerFailed, e2), new CommandError[0]);
        }
        return new CommandEvents<>(commandRequest.commandId(), commandRequest.readSequence(), aggregateUpdate.aggregate(), failure.map(nonEmptyList -> {
            Sequence[] sequenceArr = {aggregateUpdate.sequence()};
            return nonEmptyList.map(obj -> {
                sequenceArr[0] = sequenceArr[0].next();
                return new ValueWithSequence(obj, sequenceArr[0]);
            });
        }));
    }

    public void close() {
    }

    /* JADX WARN: Multi-variable type inference failed */
    public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
        return transform((CommandRequestTransformer<K, C, E, A>) obj, (CommandRequest<CommandRequestTransformer<K, C, E, A>, C>) obj2);
    }
}
