/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.ExceptionUtils;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheKeyValueBytesStoreIterator;
import org.apache.kafka.streams.state.internals.PeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachingKeyValueStore
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
implements KeyValueStore<Bytes, byte[]>,
CachedStateStore<byte[], byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(CachingKeyValueStore.class);
    private CacheFlushListener<byte[], byte[]> flushListener;
    private boolean sendOldValues;
    private String cacheName;
    private InternalProcessorContext context;
    private Thread streamThread;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    CachingKeyValueStore(KeyValueStore<Bytes, byte[]> underlying) {
        super(underlying);
    }

    @Override
    @Deprecated
    public void init(ProcessorContext context, StateStore root) {
        this.initInternal(ProcessorContextUtils.asInternalProcessorContext(context));
        super.init(context, root);
        this.streamThread = Thread.currentThread();
    }

    @Override
    public void init(StateStoreContext context, StateStore root) {
        this.initInternal(ProcessorContextUtils.asInternalProcessorContext(context));
        super.init(context, root);
        this.streamThread = Thread.currentThread();
    }

    private void initInternal(InternalProcessorContext context) {
        this.context = context;
        this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), this.name());
        this.context.registerCacheFlushListener(this.cacheName, entries -> {
            for (ThreadCache.DirtyEntry entry : entries) {
                this.putAndMaybeForward(entry, context);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putAndMaybeForward(ThreadCache.DirtyEntry entry, InternalProcessorContext context) {
        if (this.flushListener != null) {
            byte[] rawOldValue;
            byte[] rawNewValue = entry.newValue();
            byte[] byArray = rawOldValue = rawNewValue == null || this.sendOldValues ? (byte[])((KeyValueStore)this.wrapped()).get(entry.key()) : null;
            if (rawNewValue != null || rawOldValue != null) {
                ((KeyValueStore)this.wrapped()).put(entry.key(), entry.newValue());
                ProcessorRecordContext current = context.recordContext();
                context.setRecordContext(entry.entry().context());
                try {
                    this.flushListener.apply(entry.key().get(), rawNewValue, (byte[])(this.sendOldValues ? rawOldValue : null), entry.entry().context().timestamp());
                }
                finally {
                    context.setRecordContext(current);
                }
            }
        } else {
            ((KeyValueStore)this.wrapped()).put(entry.key(), entry.newValue());
        }
    }

    @Override
    public boolean setFlushListener(CacheFlushListener<byte[], byte[]> flushListener, boolean sendOldValues) {
        this.flushListener = flushListener;
        this.sendOldValues = sendOldValues;
        return true;
    }

    @Override
    public void put(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            this.validateStoreOpen();
            this.putInternal(key, value);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private void putInternal(Bytes key, byte[] value) {
        this.context.cache().put(this.cacheName, key, new LRUCacheEntry(value, this.context.headers(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public byte[] putIfAbsent(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            this.validateStoreOpen();
            byte[] v = this.getInternal(key);
            if (v == null) {
                this.putInternal(key, value);
            }
            byte[] byArray = v;
            return byArray;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putAll(List<KeyValue<Bytes, byte[]>> entries) {
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            this.validateStoreOpen();
            for (KeyValue<Bytes, byte[]> entry : entries) {
                Objects.requireNonNull(entry.key, "key cannot be null");
                this.put((Bytes)entry.key, (byte[])entry.value);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public byte[] delete(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            this.validateStoreOpen();
            byte[] byArray = this.deleteInternal(key);
            return byArray;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private byte[] deleteInternal(Bytes key) {
        byte[] v = this.getInternal(key);
        this.putInternal(key, null);
        return v;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public byte[] get(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        Lock theLock = Thread.currentThread().equals(this.streamThread) ? this.lock.writeLock() : this.lock.readLock();
        theLock.lock();
        try {
            this.validateStoreOpen();
            byte[] byArray = this.getInternal(key);
            return byArray;
        }
        finally {
            theLock.unlock();
        }
    }

    private byte[] getInternal(Bytes key) {
        LRUCacheEntry entry = null;
        if (this.context.cache() != null) {
            entry = this.context.cache().get(this.cacheName, key);
        }
        if (entry == null) {
            byte[] rawValue = (byte[])((KeyValueStore)this.wrapped()).get(key);
            if (rawValue == null) {
                return null;
            }
            if (Thread.currentThread().equals(this.streamThread)) {
                this.context.cache().put(this.cacheName, key, new LRUCacheEntry(rawValue));
            }
            return rawValue;
        }
        return entry.value();
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
        if (from.compareTo(to) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        this.validateStoreOpen();
        KeyValueIterator<Bytes, byte[]> storeIterator = ((KeyValueStore)this.wrapped()).range(from, to);
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.context.cache().range(this.cacheName, from, to);
        return new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, storeIterator, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> reverseRange(Bytes from, Bytes to) {
        if (from.compareTo(to) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        this.validateStoreOpen();
        KeyValueIterator<Bytes, byte[]> storeIterator = ((KeyValueStore)this.wrapped()).reverseRange(from, to);
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.context.cache().reverseRange(this.cacheName, from, to);
        return new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, storeIterator, false);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> all() {
        this.validateStoreOpen();
        DelegatingPeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<Bytes, byte[]>(this.name(), ((KeyValueStore)this.wrapped()).all());
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.context.cache().all(this.cacheName);
        return new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, (KeyValueIterator<Bytes, byte[]>)storeIterator, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> reverseAll() {
        this.validateStoreOpen();
        DelegatingPeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<Bytes, byte[]>(this.name(), ((KeyValueStore)this.wrapped()).reverseAll());
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.context.cache().reverseAll(this.cacheName);
        return new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, (KeyValueIterator<Bytes, byte[]>)storeIterator, false);
    }

    @Override
    public long approximateNumEntries() {
        this.validateStoreOpen();
        this.lock.readLock().lock();
        try {
            this.validateStoreOpen();
            long l = ((KeyValueStore)this.wrapped()).approximateNumEntries();
            return l;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public void flush() {
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            this.validateStoreOpen();
            this.context.cache().flush(this.cacheName);
            ((KeyValueStore)this.wrapped()).flush();
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void flushCache() {
        this.validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            this.validateStoreOpen();
            this.context.cache().flush(this.cacheName);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void close() {
        this.lock.writeLock().lock();
        try {
            Runnable[] runnableArray = new Runnable[3];
            runnableArray[0] = () -> this.context.cache().flush(this.cacheName);
            runnableArray[1] = () -> this.context.cache().close(this.cacheName);
            runnableArray[2] = ((KeyValueStore)this.wrapped())::close;
            LinkedList<RuntimeException> suppressed = ExceptionUtils.executeAll(runnableArray);
            if (!suppressed.isEmpty()) {
                ExceptionUtils.throwSuppressed("Caught an exception while closing caching key value store for store " + this.name(), suppressed);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }
}

