package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.3.0.jar:org/apache/kafka/streams/state/internals/CachingKeyValueStore.class */
public class CachingKeyValueStore extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]> implements KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CachingKeyValueStore.class);
    private CacheFlushListener<byte[], byte[]> flushListener;
    private boolean sendOldValues;
    private String cacheName;
    private ThreadCache cache;
    private InternalProcessorContext context;
    private Thread streamThread;
    private final ReadWriteLock lock;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingKeyValueStore(KeyValueStore<Bytes, byte[]> keyValueStore) {
        super(keyValueStore);
        this.lock = new ReentrantReadWriteLock();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        initInternal(processorContext);
        super.init(processorContext, stateStore);
        this.streamThread = Thread.currentThread();
    }

    private void initInternal(ProcessorContext processorContext) {
        this.context = (InternalProcessorContext) processorContext;
        this.cache = this.context.getCache();
        this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(processorContext.taskId().toString(), name());
        this.cache.addDirtyEntryFlushListener(this.cacheName, list -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                putAndMaybeForward((ThreadCache.DirtyEntry) it.next(), (InternalProcessorContext) processorContext);
            }
        });
    }

    private void putAndMaybeForward(ThreadCache.DirtyEntry dirtyEntry, InternalProcessorContext internalProcessorContext) {
        if (this.flushListener == null) {
            wrapped().put(dirtyEntry.key(), dirtyEntry.newValue());
            return;
        }
        byte[] newValue = dirtyEntry.newValue();
        byte[] bArr = (newValue == null || this.sendOldValues) ? wrapped().get(dirtyEntry.key()) : null;
        if (newValue == null && bArr == null) {
            return;
        }
        wrapped().put(dirtyEntry.key(), dirtyEntry.newValue());
        ProcessorRecordContext recordContext = internalProcessorContext.recordContext();
        internalProcessorContext.setRecordContext(dirtyEntry.entry().context());
        try {
            this.flushListener.apply(dirtyEntry.key().get(), newValue, this.sendOldValues ? bArr : null, dirtyEntry.entry().context().timestamp());
            internalProcessorContext.setRecordContext(recordContext);
        } catch (Throwable th) {
            internalProcessorContext.setRecordContext(recordContext);
            throw th;
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public boolean setFlushListener(CacheFlushListener<byte[], byte[]> cacheFlushListener, boolean z) {
        this.flushListener = cacheFlushListener;
        this.sendOldValues = z;
        return true;
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public void put(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            putInternal(bytes, bArr);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private void putInternal(Bytes bytes, byte[] bArr) {
        this.cache.put(this.cacheName, bytes, new LRUCacheEntry(bArr, this.context.headers(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            byte[] internal = getInternal(bytes);
            if (internal == null) {
                putInternal(bytes, bArr);
            }
            return internal;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            for (KeyValue<Bytes, byte[]> keyValue : list) {
                Objects.requireNonNull(keyValue.key, "key cannot be null");
                put(keyValue.key, keyValue.value);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public byte[] delete(Bytes bytes) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            return deleteInternal(bytes);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private byte[] deleteInternal(Bytes bytes) {
        byte[] internal = getInternal(bytes);
        putInternal(bytes, null);
        return internal;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public byte[] get(Bytes bytes) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        Lock writeLock = Thread.currentThread().equals(this.streamThread) ? this.lock.writeLock() : this.lock.readLock();
        writeLock.lock();
        try {
            byte[] internal = getInternal(bytes);
            writeLock.unlock();
            return internal;
        } catch (Throwable th) {
            writeLock.unlock();
            throw th;
        }
    }

    private byte[] getInternal(Bytes bytes) {
        LRUCacheEntry lRUCacheEntry = null;
        if (this.cache != null) {
            lRUCacheEntry = this.cache.get(this.cacheName, bytes);
        }
        if (lRUCacheEntry != null) {
            return lRUCacheEntry.value();
        }
        byte[] bArr = wrapped().get(bytes);
        if (bArr == null) {
            return null;
        }
        if (Thread.currentThread().equals(this.streamThread)) {
            this.cache.put(this.cacheName, bytes, new LRUCacheEntry(bArr));
        }
        return bArr;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        if (bytes.compareTo(bytes2) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        validateStoreOpen();
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.cache.range(this.cacheName, bytes, bytes2), wrapped().range(bytes, bytes2));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public KeyValueIterator<Bytes, byte[]> all() {
        validateStoreOpen();
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.cache.all(this.cacheName), new DelegatingPeekingKeyValueIterator(name(), wrapped().all()));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public long approximateNumEntries() {
        validateStoreOpen();
        this.lock.readLock().lock();
        try {
            return wrapped().approximateNumEntries();
        } finally {
            this.lock.readLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void flush() {
        this.lock.writeLock().lock();
        try {
            this.cache.flush(this.cacheName);
            super.flush();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        try {
            flush();
            try {
                super.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                super.close();
                throw th;
            } finally {
            }
        }
    }
}
