package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/AbstractRocksDBState.class */
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V> implements KvState<N>, State {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
    private final TypeSerializer<N> namespaceSerializer;
    private N currentNamespace;
    protected RocksDBKeyedStateBackend<K> backend;
    protected ColumnFamilyHandle columnFamily;
    protected final SD stateDesc;
    private final WriteOptions writeOptions = new WriteOptions();
    protected final ByteArrayOutputStreamWithPos keySerializationStream;
    protected final DataOutputView keySerializationDataOutputView;
    private final boolean ambiguousKeyPossible;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRocksDBState(ColumnFamilyHandle columnFamilyHandle, TypeSerializer<N> typeSerializer, SD sd, RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
        this.namespaceSerializer = typeSerializer;
        this.backend = rocksDBKeyedStateBackend;
        this.columnFamily = columnFamilyHandle;
        this.writeOptions.setDisableWAL(true);
        this.stateDesc = (SD) Preconditions.checkNotNull(sd, "State Descriptor");
        this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
        this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(this.keySerializationStream);
        this.ambiguousKeyPossible = rocksDBKeyedStateBackend.getKeySerializer().getLength() < 0 && typeSerializer.getLength() < 0;
    }

    public void clear() {
        try {
            writeCurrentKeyWithGroupAndNamespace();
            this.backend.db.remove(this.columnFamily, this.writeOptions, this.keySerializationStream.toByteArray());
        } catch (IOException | RocksDBException e) {
            throw new RuntimeException("Error while removing entry from RocksDB", e);
        }
    }

    public void setCurrentNamespace(N n) {
        this.currentNamespace = (N) Preconditions.checkNotNull(n, "Namespace");
    }

    /* JADX WARN: Multi-variable type inference failed */
    public byte[] getSerializedValue(byte[] bArr) throws Exception {
        Preconditions.checkNotNull(bArr, "Serialized key and namespace");
        Tuple2 deserializeKeyAndNamespace = KvStateRequestSerializer.deserializeKeyAndNamespace(bArr, this.backend.getKeySerializer(), this.namespaceSerializer);
        int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(deserializeKeyAndNamespace.f0, this.backend.getNumberOfKeyGroups());
        ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos(128);
        writeKeyWithGroupAndNamespace(assignToKeyGroup, deserializeKeyAndNamespace.f0, deserializeKeyAndNamespace.f1, byteArrayOutputStreamWithPos, new DataOutputViewStreamWrapper(byteArrayOutputStreamWithPos));
        return this.backend.db.get(this.columnFamily, byteArrayOutputStreamWithPos.toByteArray());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public void writeCurrentKeyWithGroupAndNamespace() throws IOException {
        writeKeyWithGroupAndNamespace(this.backend.getCurrentKeyGroupIndex(), this.backend.getCurrentKey(), this.currentNamespace, this.keySerializationStream, this.keySerializationDataOutputView);
    }

    protected void writeKeyWithGroupAndNamespace(int i, K k, N n, ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos, DataOutputView dataOutputView) throws IOException {
        byteArrayOutputStreamWithPos.reset();
        writeKeyGroup(i, dataOutputView);
        writeKey(k, byteArrayOutputStreamWithPos, dataOutputView);
        writeNameSpace(n, byteArrayOutputStreamWithPos, dataOutputView);
    }

    private void writeKeyGroup(int i, DataOutputView dataOutputView) throws IOException {
        int keyGroupPrefixBytes = this.backend.getKeyGroupPrefixBytes();
        while (true) {
            keyGroupPrefixBytes--;
            if (keyGroupPrefixBytes < 0) {
                return;
            } else {
                dataOutputView.writeByte(i >>> (keyGroupPrefixBytes << 3));
            }
        }
    }

    private void writeKey(K k, ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos, DataOutputView dataOutputView) throws IOException {
        int position = byteArrayOutputStreamWithPos.getPosition();
        this.backend.getKeySerializer().serialize(k, dataOutputView);
        if (this.ambiguousKeyPossible) {
            writeLengthFrom(position, byteArrayOutputStreamWithPos, dataOutputView);
        }
    }

    private void writeNameSpace(N n, ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos, DataOutputView dataOutputView) throws IOException {
        int position = byteArrayOutputStreamWithPos.getPosition();
        this.namespaceSerializer.serialize(n, dataOutputView);
        if (this.ambiguousKeyPossible) {
            writeLengthFrom(position, byteArrayOutputStreamWithPos, dataOutputView);
        }
    }

    private static void writeLengthFrom(int i, ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos, DataOutputView dataOutputView) throws IOException {
        writeVariableIntBytes(byteArrayOutputStreamWithPos.getPosition() - i, dataOutputView);
    }

    private static void writeVariableIntBytes(int i, DataOutputView dataOutputView) throws IOException {
        do {
            dataOutputView.writeByte(i);
            i >>>= 8;
        } while (i != 0);
    }
}
