package dev.responsive.kafka.api.async.internals.stores;

import dev.responsive.kafka.api.async.internals.stores.StreamThreadFlushListeners;
import java.util.List;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/stores/AsyncFlushingKeyValueStore.class */
public class AsyncFlushingKeyValueStore extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]> implements KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
    private final Logger log;
    private final StreamThreadFlushListeners flushListeners;
    private int partition;
    private StreamThreadFlushListeners.AsyncFlushListener flushAsyncProcessor;

    public AsyncFlushingKeyValueStore(KeyValueStore<Bytes, byte[]> keyValueStore, StreamThreadFlushListeners streamThreadFlushListeners) {
        super(keyValueStore);
        this.flushListeners = streamThreadFlushListeners;
        this.log = new LogContext(String.format("stream-thread [%s] %s: ", streamThreadFlushListeners.streamThreadName(), keyValueStore.name())).logger(AsyncFlushingKeyValueStore.class);
    }

    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        this.partition = stateStoreContext.taskId().partition();
        this.flushListeners.registerStoreConnectorForPartition(this.partition, asyncFlushListener -> {
            this.flushAsyncProcessor = asyncFlushListener;
        });
        try {
            super.init(stateStoreContext, stateStore);
        } catch (RuntimeException e) {
            this.log.error("failed to initialize the wrapped store. Deregistering the store connector as its likely that this store was not registered with streams and close will not be called");
            this.flushListeners.unregisterListenerForPartition(this.partition);
            throw e;
        }
    }

    public void flushCache() {
        if (this.flushAsyncProcessor != null) {
            this.flushAsyncProcessor.flushBuffers();
        } else {
            this.log.warn("A flush was triggered on the async state store but the flush listener was not yet initialized. This can happen when a task is closed before it can be initialized.");
        }
        super.flushCache();
    }

    public void clearCache() {
        throw new IllegalStateException("Attempted to clear cache of async store, this implies the task is transitioning to standby which should not happen");
    }

    public void close() {
        this.flushListeners.unregisterListenerForPartition(this.partition);
        super.close();
    }

    public void put(Bytes bytes, byte[] bArr) {
        wrapped().put(bytes, bArr);
    }

    public byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        return (byte[]) wrapped().putIfAbsent(bytes, bArr);
    }

    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        wrapped().putAll(list);
    }

    public byte[] delete(Bytes bytes) {
        return (byte[]) wrapped().delete(bytes);
    }

    public byte[] get(Bytes bytes) {
        return (byte[]) wrapped().get(bytes);
    }

    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        return wrapped().range(bytes, bytes2);
    }

    public KeyValueIterator<Bytes, byte[]> all() {
        return wrapped().all();
    }

    public long approximateNumEntries() {
        return wrapped().approximateNumEntries();
    }

    public boolean setFlushListener(CacheFlushListener<byte[], byte[]> cacheFlushListener, boolean z) {
        return super.setFlushListener(cacheFlushListener, z);
    }
}
