package io.simplesource.kafka.internal.streams.statestore;

import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.AggregateSerdes;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.spec.AggregateSpec;
import java.util.Optional;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

/* loaded from: input_file:io/simplesource/kafka/internal/streams/statestore/KafkaStreamAggregateStoreBridge.class */
public final class KafkaStreamAggregateStoreBridge<K, A> implements AggregateStoreBridge<K, A> {
    private final KafkaStreams kafkaStreams;
    private final AggregateSerdes<K, ?, ?, ?> aggregateSerdes;
    private final String aggregateStoreName;

    public KafkaStreamAggregateStoreBridge(AggregateSpec<K, ?, ?, ?> aggregateSpec, KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
        this.aggregateSerdes = aggregateSpec.serialization().serdes();
        this.aggregateStoreName = aggregateSpec.serialization().resourceNamingStrategy().storeName(aggregateSpec.aggregateName(), AggregateResources.StateStoreEntity.aggregate_update.name());
    }

    @Override // io.simplesource.kafka.internal.streams.statestore.AggregateStoreBridge
    public ReadOnlyKeyValueStore<K, AggregateUpdate<A>> getAggregateStateStore() {
        return (ReadOnlyKeyValueStore) this.kafkaStreams.store(this.aggregateStoreName, QueryableStoreTypes.keyValueStore());
    }

    @Override // io.simplesource.kafka.internal.streams.statestore.AggregateStoreBridge
    public Optional<HostInfo> hostInfoForAggregateStoreKey(K k) {
        return Optional.ofNullable(this.kafkaStreams.metadataForKey(this.aggregateStoreName, k, this.aggregateSerdes.aggregateKey().serializer())).map((v0) -> {
            return v0.hostInfo();
        });
    }
}
