/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state;

import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;

public class KeyValueStoreTestDriver<K, V> {
    private final Properties props;
    private final Map<K, V> flushedEntries = new HashMap();
    private final Set<K> flushedRemovals = new HashSet<K>();
    private final List<KeyValue<K, V>> restorableEntries = new LinkedList<KeyValue<K, V>>();
    private final MockProcessorContext context;
    private final Map<String, StateStore> storeMap = new HashMap<String, StateStore>();
    private static final long DEFAULT_CACHE_SIZE_BYTES = 0x100000L;
    private final ThreadCache cache = new ThreadCache(0x100000L);
    private final StreamsMetrics metrics = new StreamsMetrics(){

        public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String ... tags) {
            return null;
        }

        public void recordLatency(Sensor sensor, long startNs, long endNs) {
        }
    };
    private final RecordCollector recordCollector;
    private File stateDir = null;

    public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
        StateSerdes serdes = StateSerdes.withBuiltinTypes((String)"unexpected", keyClass, valueClass);
        return new KeyValueStoreTestDriver<K, V>(serdes);
    }

    public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer, Deserializer<K> keyDeserializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
        StateSerdes serdes = new StateSerdes("unexpected", Serdes.serdeFrom(keySerializer, keyDeserializer), Serdes.serdeFrom(valueSerializer, valueDeserializer));
        return new KeyValueStoreTestDriver<K, V>(serdes);
    }

    protected KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
        ByteArraySerializer rawSerializer = new ByteArraySerializer();
        MockProducer producer = new MockProducer(true, (Serializer)rawSerializer, (Serializer)rawSerializer);
        this.recordCollector = new RecordCollector((Producer)producer, "KeyValueStoreTestDriver"){

            public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                Object key = serdes.keyFrom(keySerializer.serialize(record.topic(), record.key()));
                Object value = serdes.valueFrom(valueSerializer.serialize(record.topic(), record.value()));
                KeyValueStoreTestDriver.this.recordFlushed(key, value);
            }

            public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, StreamPartitioner<K1, V1> partitioner) {
                this.send(record, keySerializer, valueSerializer);
            }
        };
        this.stateDir = TestUtils.tempDirectory();
        this.stateDir.mkdirs();
        this.props = new Properties();
        this.props.put("application.id", "applicationId");
        this.props.put("bootstrap.servers", "localhost:9092");
        this.props.put("timestamp.extractor", MockTimestampExtractor.class);
        this.props.put("key.serde", serdes.keySerde().getClass());
        this.props.put("value.serde", serdes.valueSerde().getClass());
        this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), this.recordCollector, null){

            @Override
            public TaskId taskId() {
                return new TaskId(0, 1);
            }

            public <K1, V1> void forward(K1 key, V1 value, int childIndex) {
                this.forward(key, value);
            }

            @Override
            public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
                KeyValueStoreTestDriver.this.storeMap.put(store.name(), store);
                KeyValueStoreTestDriver.this.restoreEntries(func, serdes);
            }

            @Override
            public StateStore getStateStore(String name) {
                return (StateStore)KeyValueStoreTestDriver.this.storeMap.get(name);
            }

            @Override
            public StreamsMetrics metrics() {
                return KeyValueStoreTestDriver.this.metrics;
            }

            @Override
            public File stateDir() {
                return KeyValueStoreTestDriver.this.stateDir;
            }

            @Override
            public Map<String, Object> appConfigs() {
                return new StreamsConfig((Map)KeyValueStoreTestDriver.this.props).originals();
            }

            @Override
            public Map<String, Object> appConfigsWithPrefix(String prefix) {
                return new StreamsConfig((Map)KeyValueStoreTestDriver.this.props).originalsWithPrefix(prefix);
            }

            @Override
            public ThreadCache getCache() {
                return KeyValueStoreTestDriver.this.cache;
            }
        };
    }

    protected void recordFlushed(K key, V value) {
        if (value == null) {
            this.flushedRemovals.add(key);
            this.flushedEntries.remove(key);
        } else {
            this.flushedEntries.put(key, value);
            this.flushedRemovals.remove(key);
        }
    }

    private void restoreEntries(StateRestoreCallback func, StateSerdes<K, V> serdes) {
        for (KeyValue<K, V> entry : this.restorableEntries) {
            if (entry == null) continue;
            byte[] rawKey = serdes.rawKey(entry.key);
            byte[] rawValue = serdes.rawValue(entry.value);
            func.restore(rawKey, rawValue);
        }
    }

    public void addEntryToRestoreLog(K key, V value) {
        this.restorableEntries.add(new KeyValue(key, value));
    }

    public ProcessorContext context() {
        return this.context;
    }

    public Iterable<KeyValue<K, V>> restoredEntries() {
        return this.restorableEntries;
    }

    public int checkForRestoredEntries(KeyValueStore<K, V> store) {
        int missing = 0;
        for (KeyValue<K, V> kv : this.restorableEntries) {
            Object value;
            if (kv == null || Objects.equals(value = store.get(kv.key), kv.value)) continue;
            ++missing;
        }
        return missing;
    }

    public int sizeOf(KeyValueStore<K, V> store) {
        int size = 0;
        try (KeyValueIterator iterator = store.all();){
            while (iterator.hasNext()) {
                iterator.next();
                ++size;
            }
        }
        return size;
    }

    public V flushedEntryStored(K key) {
        return this.flushedEntries.get(key);
    }

    public boolean flushedEntryRemoved(K key) {
        return this.flushedRemovals.contains(key);
    }

    public int numFlushedEntryRemoved() {
        return this.flushedRemovals.size();
    }

    public void clear() {
        this.restorableEntries.clear();
        this.flushedEntries.clear();
        this.flushedRemovals.clear();
    }

    public void setConfig(String configName, Object configValue) {
        this.props.put(configName, configValue);
    }
}

