package org.apache.storm.redis.state;

import com.google.common.collect.Maps;
import com.google.common.primitives.UnsignedBytes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.storm.redis.common.commands.RedisCommands;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.container.RedisCommandsContainerBuilder;
import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer;
import org.apache.storm.state.DefaultStateEncoder;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.util.SafeEncoder;

/* loaded from: input_file:org/apache/storm/redis/state/RedisKeyValueState.class */
public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
    public static final int ITERATOR_CHUNK_SIZE = 100;
    public static final NavigableMap<byte[], byte[]> EMPTY_PENDING_COMMIT_MAP = Maps.unmodifiableNavigableMap(new TreeMap(UnsignedBytes.lexicographicalComparator()));
    private static final Logger LOG = LoggerFactory.getLogger(RedisKeyValueState.class);
    private static final String COMMIT_TXID_KEY = "commit";
    private static final String PREPARE_TXID_KEY = "prepare";
    private final byte[] namespace;
    private final byte[] prepareNamespace;
    private final String txidNamespace;
    private final DefaultStateEncoder<K, V> encoder;
    private final RedisCommandsInstanceContainer container;
    private ConcurrentNavigableMap<byte[], byte[]> pendingPrepare;
    private NavigableMap<byte[], byte[]> pendingCommit;
    private Map<String, String> txIds;

    public RedisKeyValueState(String str) {
        this(str, new JedisPoolConfig.Builder().build());
    }

    public RedisKeyValueState(String str, JedisPoolConfig jedisPoolConfig) {
        this(str, jedisPoolConfig, (Serializer) new DefaultStateSerializer(), (Serializer) new DefaultStateSerializer());
    }

    public RedisKeyValueState(String str, JedisPoolConfig jedisPoolConfig, Serializer<K> serializer, Serializer<V> serializer2) {
        this(str, RedisCommandsContainerBuilder.build(jedisPoolConfig), serializer, serializer2);
    }

    public RedisKeyValueState(String str, JedisClusterConfig jedisClusterConfig, Serializer<K> serializer, Serializer<V> serializer2) {
        this(str, RedisCommandsContainerBuilder.build(jedisClusterConfig), serializer, serializer2);
    }

    public RedisKeyValueState(String str, RedisCommandsInstanceContainer redisCommandsInstanceContainer, Serializer<K> serializer, Serializer<V> serializer2) {
        this.namespace = SafeEncoder.encode(str);
        this.prepareNamespace = SafeEncoder.encode(str + "$prepare");
        this.txidNamespace = str + "$txid";
        this.encoder = new DefaultStateEncoder<>(serializer, serializer2);
        this.container = redisCommandsInstanceContainer;
        this.pendingPrepare = createPendingPrepareMap();
        initTxids();
        initPendingCommit();
    }

    private void initTxids() {
        RedisCommands redisCommands = null;
        try {
            redisCommands = this.container.getInstance();
            if (redisCommands.exists(this.txidNamespace)) {
                this.txIds = redisCommands.hgetAll(this.txidNamespace);
            } else {
                this.txIds = new HashMap();
            }
            LOG.debug("initTxids, txIds {}", this.txIds);
            this.container.returnInstance(redisCommands);
        } catch (Throwable th) {
            this.container.returnInstance(redisCommands);
            throw th;
        }
    }

    private void initPendingCommit() {
        try {
            RedisCommands redisCommandsInstanceContainer = this.container.getInstance();
            if (redisCommandsInstanceContainer.exists(this.prepareNamespace).booleanValue()) {
                LOG.debug("Loading previously prepared commit from {}", this.prepareNamespace);
                TreeMap treeMap = new TreeMap(UnsignedBytes.lexicographicalComparator());
                treeMap.putAll(redisCommandsInstanceContainer.hgetAll(this.prepareNamespace));
                this.pendingCommit = Maps.unmodifiableNavigableMap(treeMap);
            } else {
                LOG.debug("No previously prepared commits.");
                this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
            }
            this.container.returnInstance(redisCommandsInstanceContainer);
        } catch (Throwable th) {
            this.container.returnInstance(null);
            throw th;
        }
    }

    public void put(K k, V v) {
        LOG.debug("put key '{}', value '{}'", k, v);
        this.pendingPrepare.put(this.encoder.encodeKey(k), this.encoder.encodeValue(v));
    }

    public V get(K k) {
        byte[] hget;
        LOG.debug("get key '{}'", k);
        byte[] encodeKey = this.encoder.encodeKey(k);
        if (this.pendingPrepare.containsKey(encodeKey)) {
            hget = (byte[]) this.pendingPrepare.get(encodeKey);
        } else if (this.pendingCommit.containsKey(encodeKey)) {
            hget = (byte[]) this.pendingCommit.get(encodeKey);
        } else {
            RedisCommands redisCommands = null;
            try {
                redisCommands = this.container.getInstance();
                hget = redisCommands.hget(this.namespace, encodeKey);
                this.container.returnInstance(redisCommands);
            } catch (Throwable th) {
                this.container.returnInstance(redisCommands);
                throw th;
            }
        }
        Object obj = null;
        if (hget != null) {
            obj = this.encoder.decodeValue(hget);
        }
        LOG.debug("Value for key '{}' is '{}'", k, obj);
        return (V) obj;
    }

    public V get(K k, V v) {
        V v2 = get(k);
        return v2 != null ? v2 : v;
    }

    public V delete(K k) {
        LOG.debug("delete key '{}'", k);
        byte[] encodeKey = this.encoder.encodeKey(k);
        V v = get(k);
        this.pendingPrepare.put(encodeKey, this.encoder.getTombstoneValue());
        return v;
    }

    public Iterator<Map.Entry<K, V>> iterator() {
        return (Iterator<Map.Entry<K, V>>) new RedisKeyValueStateIterator(this.namespace, this.container, this.pendingPrepare.entrySet().iterator(), this.pendingCommit.entrySet().iterator(), 100, this.encoder.getKeySerializer(), this.encoder.getValueSerializer());
    }

    public void prepareCommit(long j) {
        LOG.debug("prepareCommit txid {}", Long.valueOf(j));
        validatePrepareTxid(j);
        RedisCommands redisCommands = null;
        try {
            ConcurrentNavigableMap<byte[], byte[]> concurrentNavigableMap = this.pendingPrepare;
            this.pendingPrepare = createPendingPrepareMap();
            redisCommands = this.container.getInstance();
            if (redisCommands.exists(this.prepareNamespace).booleanValue()) {
                LOG.debug("Prepared txn already exists, will merge", Long.valueOf(j));
                for (Map.Entry<byte[], byte[]> entry : this.pendingCommit.entrySet()) {
                    if (!concurrentNavigableMap.containsKey(entry.getKey())) {
                        concurrentNavigableMap.put(entry.getKey(), entry.getValue());
                    }
                }
            }
            if (concurrentNavigableMap.isEmpty()) {
                LOG.debug("Nothing to save for prepareCommit, txid {}.", Long.valueOf(j));
            } else {
                redisCommands.hmset(this.prepareNamespace, concurrentNavigableMap);
            }
            this.txIds.put(PREPARE_TXID_KEY, String.valueOf(j));
            redisCommands.hmset(this.txidNamespace, this.txIds);
            this.pendingCommit = Maps.unmodifiableNavigableMap(concurrentNavigableMap);
            this.container.returnInstance(redisCommands);
        } catch (Throwable th) {
            this.container.returnInstance(redisCommands);
            throw th;
        }
    }

    public void commit(long j) {
        LOG.debug("commit txid {}", Long.valueOf(j));
        validateCommitTxid(j);
        RedisCommands redisCommands = null;
        try {
            redisCommands = this.container.getInstance();
            if (this.pendingCommit.isEmpty()) {
                LOG.debug("Nothing to save for commit, txid {}.", Long.valueOf(j));
            } else {
                ArrayList arrayList = new ArrayList();
                HashMap hashMap = new HashMap();
                for (Map.Entry<byte[], byte[]> entry : this.pendingCommit.entrySet()) {
                    byte[] key = entry.getKey();
                    byte[] value = entry.getValue();
                    if (Arrays.equals(this.encoder.getTombstoneValue(), value)) {
                        arrayList.add(key);
                    } else {
                        hashMap.put(key, value);
                    }
                }
                if (!hashMap.isEmpty()) {
                    redisCommands.hmset(this.namespace, hashMap);
                }
                if (!arrayList.isEmpty()) {
                    redisCommands.hdel(this.namespace, (byte[][]) arrayList.toArray((Object[]) new byte[0]));
                }
            }
            this.txIds.put(COMMIT_TXID_KEY, String.valueOf(j));
            redisCommands.hmset(this.txidNamespace, this.txIds);
            redisCommands.del(this.prepareNamespace);
            this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
            this.container.returnInstance(redisCommands);
        } catch (Throwable th) {
            this.container.returnInstance(redisCommands);
            throw th;
        }
    }

    public void commit() {
        RedisCommands redisCommands = null;
        try {
            redisCommands = this.container.getInstance();
            if (this.pendingPrepare.isEmpty()) {
                LOG.debug("Nothing to save for commit");
            } else {
                redisCommands.hmset(this.namespace, this.pendingPrepare);
            }
            this.pendingPrepare = createPendingPrepareMap();
            this.container.returnInstance(redisCommands);
        } catch (Throwable th) {
            this.container.returnInstance(redisCommands);
            throw th;
        }
    }

    public void rollback() {
        LOG.debug("rollback");
        RedisCommands redisCommands = null;
        try {
            redisCommands = this.container.getInstance();
            if (redisCommands.exists(this.prepareNamespace).booleanValue()) {
                redisCommands.del(this.prepareNamespace);
            } else {
                LOG.debug("Nothing to rollback, prepared data is empty");
            }
            Long lastCommittedTxid = lastCommittedTxid();
            if (lastCommittedTxid != null) {
                this.txIds.put(PREPARE_TXID_KEY, String.valueOf(lastCommittedTxid));
            } else {
                this.txIds.remove(PREPARE_TXID_KEY);
            }
            if (!this.txIds.isEmpty()) {
                LOG.debug("hmset txidNamespace {}, txIds {}", this.txidNamespace, this.txIds);
                redisCommands.hmset(this.txidNamespace, this.txIds);
            }
            this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
            this.pendingPrepare = createPendingPrepareMap();
            this.container.returnInstance(redisCommands);
        } catch (Throwable th) {
            this.container.returnInstance(redisCommands);
            throw th;
        }
    }

    private void validatePrepareTxid(long j) {
        Long lastCommittedTxid = lastCommittedTxid();
        if (lastCommittedTxid != null && j <= lastCommittedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' for prepare. Txid '" + lastCommittedTxid + "' is already committed");
        }
    }

    private void validateCommitTxid(long j) {
        Long lastCommittedTxid = lastCommittedTxid();
        if (lastCommittedTxid != null && j < lastCommittedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' txid '" + lastCommittedTxid + "' is already committed");
        }
        Long lastPreparedTxid = lastPreparedTxid();
        if (lastPreparedTxid != null && j != lastPreparedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' not same as prepared txid '" + lastPreparedTxid + "'");
        }
    }

    private Long lastCommittedTxid() {
        return lastId(COMMIT_TXID_KEY);
    }

    private Long lastPreparedTxid() {
        return lastId(PREPARE_TXID_KEY);
    }

    private Long lastId(String str) {
        Long l = null;
        String str2 = this.txIds.get(str);
        if (str2 != null) {
            l = Long.valueOf(str2);
        }
        return l;
    }

    private ConcurrentNavigableMap<byte[], byte[]> createPendingPrepareMap() {
        return new ConcurrentSkipListMap(UnsignedBytes.lexicographicalComparator());
    }
}
