package org.apache.rocketmq.streams.window.storage.rocksdb;

import com.alibaba.fastjson.JSONArray;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.utils.Base64Utils;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.SerializeUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.state.kv.rocksdb.RocksDBOperator;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.state.WindowBaseValue;
import org.apache.rocketmq.streams.window.storage.AbstractWindowStorage;
import org.apache.rocketmq.streams.window.storage.WindowStorage;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

/* loaded from: input_file:org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.class */
public class RocksdbStorage<T extends WindowBaseValue> extends AbstractWindowStorage<T> {
    protected static String DB_PATH = "/tmp/rocksdb";
    protected static String UTF8 = "UTF8";
    protected static AtomicBoolean hasCreate = new AtomicBoolean(false);
    protected static RocksDB rocksDB = new RocksDBOperator().getInstance();
    protected WriteOptions writeOptions = new WriteOptions();

    /* loaded from: input_file:org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage$LocalIterator.class */
    public static class LocalIterator<T extends WindowBaseValue> extends WindowStorage.WindowBaseValueIterator<T> {
        protected volatile boolean hasNext = true;
        protected AtomicBoolean hasInit = new AtomicBoolean(false);
        ReadOptions readOptions = new ReadOptions();
        private RocksIterator iter;
        protected String keyPrefix;
        protected Class<? extends T> clazz;
        protected boolean needKey;

        public LocalIterator(String str, Class<? extends T> cls, boolean z) {
            this.readOptions.setPrefixSameAsStart(true).setTotalOrderSeek(true);
            this.iter = RocksdbStorage.rocksDB.newIterator(this.readOptions);
            this.keyPrefix = str;
            this.clazz = cls;
            this.needKey = z;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.hasInit.compareAndSet(false, true)) {
                this.iter.seek(this.keyPrefix.getBytes());
            }
            return this.iter.isValid() && this.hasNext;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.rocketmq.streams.window.state.WindowBaseValue] */
        @Override // java.util.Iterator
        public T next() {
            String str = new String(this.iter.key());
            if (!str.startsWith(this.keyPrefix)) {
                this.hasNext = false;
                return null;
            }
            T t = (WindowBaseValue) SerializeUtil.deserialize(this.iter.value());
            if (this.needKey) {
                t.setMsgKey(str);
            }
            while (t.getPartitionNum() < this.partitionNum) {
                this.iter.next();
                t = next();
                if (t == null) {
                    this.hasNext = false;
                    return null;
                }
            }
            this.iter.next();
            return t;
        }
    }

    @Override // org.apache.rocketmq.streams.window.storage.ICommonStorage
    public void removeKeys(Collection<String> collection) {
        for (String str : collection) {
            try {
                rocksDB.delete(getKeyBytes(str));
            } catch (RocksDBException e) {
                throw new RuntimeException("delete error " + str);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.rocketmq.streams.window.storage.IWindowStorage
    public WindowStorage.WindowBaseValueIterator<T> loadWindowInstanceSplitData(String str, String str2, String str3, String str4, Class<T> cls) {
        String createKey = MapKeyUtil.createKey(new String[]{str2, str3, str4});
        if (StringUtil.isNotEmpty(str)) {
            createKey = str + createKey;
        }
        return getByKeyPrefix(createKey, cls, false);
    }

    @Override // org.apache.rocketmq.streams.window.storage.IWindowStorage
    public Long getMaxSplitNum(WindowInstance windowInstance, Class<T> cls) {
        throw new RuntimeException("can not support this method");
    }

    @Override // org.apache.rocketmq.streams.window.storage.ICommonStorage
    public void multiPut(Map<String, T> map) {
        if (map == null) {
            return;
        }
        try {
            WriteBatch writeBatch = new WriteBatch();
            for (Map.Entry<String, T> entry : map.entrySet()) {
                String key = entry.getKey();
                writeBatch.put(key.getBytes(UTF8), SerializeUtil.serialize(entry.getValue()));
            }
            WriteOptions writeOptions = new WriteOptions();
            writeOptions.setSync(false);
            writeOptions.setDisableWAL(true);
            rocksDB.write(writeOptions, writeBatch);
            writeBatch.close();
            writeOptions.close();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("put data to rocksdb error", e);
        }
    }

    @Override // org.apache.rocketmq.streams.window.storage.ICommonStorage
    public Map<String, T> multiGet(Class<T> cls, List<String> list) {
        if (list == null || list.size() == 0) {
            return new HashMap();
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (String str : list) {
            arrayList.add(getKeyBytes(str));
            arrayList2.add(str);
        }
        try {
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : rocksDB.multiGet(arrayList).entrySet()) {
                hashMap.put(getValueFromByte((byte[]) entry.getKey()), (WindowBaseValue) SerializeUtil.deserialize((byte[]) entry.getValue()));
            }
            return hashMap;
        } catch (RocksDBException e) {
            throw new RuntimeException("can not get value from rocksdb ", e);
        }
    }

    @Override // org.apache.rocketmq.streams.window.storage.ICommonStorage
    public void multiPutList(Map<String, List<T>> map) {
        if (CollectionUtil.isEmpty(map)) {
            return;
        }
        try {
            WriteBatch writeBatch = new WriteBatch();
            for (Map.Entry<String, List<T>> entry : map.entrySet()) {
                String key = entry.getKey();
                List<T> value = entry.getValue();
                JSONArray jSONArray = new JSONArray();
                Iterator<T> it = value.iterator();
                while (it.hasNext()) {
                    jSONArray.add(Base64Utils.encode(SerializeUtil.serialize(it.next())));
                }
                writeBatch.put(key.getBytes(UTF8), jSONArray.toJSONString().getBytes(UTF8));
            }
            WriteOptions writeOptions = new WriteOptions();
            writeOptions.setSync(false);
            writeOptions.setDisableWAL(true);
            rocksDB.write(writeOptions, writeBatch);
            writeBatch.close();
            writeOptions.close();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("put data to rocksdb error", e);
        }
    }

    @Override // org.apache.rocketmq.streams.window.storage.ICommonStorage
    public Map<String, List<T>> multiGetList(Class<T> cls, List<String> list) {
        if (CollectionUtil.isEmpty(list)) {
            return new HashMap(4);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(getKeyBytes(it.next()));
        }
        try {
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : rocksDB.multiGet(arrayList).entrySet()) {
                String valueFromByte = getValueFromByte((byte[]) entry.getKey());
                JSONArray parseArray = JSONArray.parseArray(getValueFromByte((byte[]) entry.getValue()));
                ArrayList arrayList2 = new ArrayList();
                for (int i = 0; i < parseArray.size(); i++) {
                    arrayList2.add((WindowBaseValue) SerializeUtil.deserialize(Base64Utils.decode(parseArray.getString(i))));
                }
                hashMap.put(valueFromByte, arrayList2);
            }
            return hashMap;
        } catch (RocksDBException e) {
            e.printStackTrace();
            throw new RuntimeException("can not get multi value from rocksdb! ", e);
        }
    }

    @Override // org.apache.rocketmq.streams.window.storage.IWindowStorage
    public void clearCache(ISplit iSplit, Class<T> cls) {
        deleteRange(iSplit.getQueueId(), cls);
    }

    @Override // org.apache.rocketmq.streams.window.storage.IWindowStorage
    public void delete(String str, String str2, Class<T> cls) {
        deleteRange(MapKeyUtil.createKey(new String[]{str2, str}), cls);
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void deleteRange(String str, Class<T> cls) {
        try {
            WindowStorage.WindowBaseValueIterator<T> byKeyPrefix = getByKeyPrefix(str, cls, true);
            HashSet hashSet = new HashSet();
            while (byKeyPrefix.hasNext()) {
                WindowBaseValue windowBaseValue = (WindowBaseValue) byKeyPrefix.next();
                if (windowBaseValue != null) {
                    hashSet.add(windowBaseValue.getMsgKey());
                    if (hashSet.size() >= 1000) {
                        removeKeys(hashSet);
                        hashSet = new HashSet();
                    }
                }
            }
            if (hashSet.size() > 0) {
                removeKeys(hashSet);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected WindowStorage.WindowBaseValueIterator<T> getByKeyPrefix(String str, Class<? extends T> cls, boolean z) {
        return new LocalIterator(str, cls, z);
    }

    protected byte[] getKeyBytes(String str) {
        try {
            if (StringUtil.isEmpty(str)) {
                return null;
            }
            return str.getBytes(UTF8);
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("get bytes error ", e);
        }
    }

    protected static String getValueFromByte(byte[] bArr) {
        try {
            return new String(bArr, UTF8);
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] strArr) {
        System.out.println("2012-01-03 00:03:09".substring("2012-01-03 00:03:09".length() - 2, "2012-01-03 00:03:09".length()));
    }
}
