package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.internal.utils.TableName;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.StoreAccessorUtil;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.class */
public class ResponsiveKeyValueStore implements KeyValueStore<Bytes, byte[]> {
    private final ResponsiveKeyValueParams params;
    private final TableName name;
    private final boolean isTimestamped;
    private final KVOperationsProvider opsProvider;
    private Position position;
    private boolean open;
    private Logger log;
    private KeyValueOperations operations;
    private StateStoreContext context;

    public ResponsiveKeyValueStore(ResponsiveKeyValueParams responsiveKeyValueParams, boolean z) {
        this(responsiveKeyValueParams, z, ResponsiveKeyValueStore::provideOperations);
    }

    public ResponsiveKeyValueStore(ResponsiveKeyValueParams responsiveKeyValueParams, boolean z, KVOperationsProvider kVOperationsProvider) {
        this.params = responsiveKeyValueParams;
        this.name = responsiveKeyValueParams.name();
        this.isTimestamped = z;
        this.position = Position.emptyPosition();
        this.opsProvider = kVOperationsProvider;
        this.log = new LogContext(String.format("store [%s] ", this.name.kafkaName())).logger(ResponsiveKeyValueStore.class);
    }

    public String name() {
        return this.name.kafkaName();
    }

    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        if (!(processorContext instanceof StateStoreContext)) {
            throw new UnsupportedOperationException("Use ResponsiveStore#init(StateStoreContext, StateStore) instead.");
        }
        init((StateStoreContext) processorContext, stateStore);
    }

    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        try {
            Task.TaskType taskType = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext).taskType();
            Object[] objArr = new Object[2];
            objArr[0] = taskType == Task.TaskType.GLOBAL ? "global-" : "";
            objArr[1] = this.name.kafkaName();
            this.log = new LogContext(String.format("%sstore [%s] ", objArr)).logger(ResponsiveKeyValueStore.class);
            this.log.info("Initializing state store");
            this.context = stateStoreContext;
            if (taskType == Task.TaskType.STANDBY) {
                this.log.warn("Unexpected standby task created, should transition to active shortly");
            }
            this.operations = this.opsProvider.provide(this.params, TtlResolver.fromTtlProviderAndStateSerdes(StoreAccessorUtil.extractKeyValueStoreSerdes(stateStore), this.params.ttlProvider()), stateStoreContext, taskType);
            this.log.info("Completed initializing state store");
            this.open = true;
            stateStoreContext.register(stateStore, this.operations);
        } catch (InterruptedException | TimeoutException e) {
            throw new ProcessorStateException("Failed to initialize store.", e);
        }
    }

    private static KeyValueOperations provideOperations(ResponsiveKeyValueParams responsiveKeyValueParams, Optional<TtlResolver<?, ?>> optional, StateStoreContext stateStoreContext, Task.TaskType taskType) throws InterruptedException, TimeoutException {
        return taskType == Task.TaskType.GLOBAL ? GlobalOperations.create(stateStoreContext, responsiveKeyValueParams, optional) : PartitionedOperations.create(responsiveKeyValueParams.name(), optional, stateStoreContext, responsiveKeyValueParams);
    }

    public void flush() {
    }

    public boolean isOpen() {
        return this.open;
    }

    public boolean persistent() {
        return false;
    }

    public void put(Bytes bytes, byte[] bArr) {
        if (bArr == null) {
            this.operations.delete(bytes);
        } else {
            this.operations.put(bytes, bArr);
        }
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    public byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        byte[] bArr2 = get(bytes);
        if (bArr2 == null && bArr != null) {
            put(bytes, bArr);
        }
        return bArr2;
    }

    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        list.forEach(keyValue -> {
            put((Bytes) keyValue.key, (byte[]) keyValue.value);
        });
    }

    public byte[] delete(Bytes bytes) {
        byte[] delete = this.operations.delete(bytes);
        StoreQueryUtils.updatePosition(this.position, this.context);
        return delete;
    }

    public byte[] get(Bytes bytes) {
        return this.operations.get(bytes);
    }

    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        return this.operations.range(bytes, bytes2);
    }

    public KeyValueIterator<Bytes, byte[]> all() {
        return this.operations.all();
    }

    public Position getPosition() {
        return this.position;
    }

    public long approximateNumEntries() {
        return this.operations.approximateNumEntries();
    }

    public void close() {
        if (this.operations != null) {
            this.operations.close();
        }
    }

    public KeyValueIterator<Bytes, byte[]> reverseRange(Bytes bytes, Bytes bytes2) {
        return this.operations.reverseRange(bytes, bytes2);
    }

    public KeyValueIterator<Bytes, byte[]> reverseAll() {
        return this.operations.reverseAll();
    }
}
