package org.apache.flink.contrib.streaming.state;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBReducingState.class */
public class RocksDBReducingState<K, N, V> extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, V> implements ReducingState<V> {
    private final TypeSerializer<V> valueSerializer;
    private final ReduceFunction<V> reduceFunction;
    private final WriteOptions writeOptions;

    public RocksDBReducingState(ColumnFamilyHandle columnFamilyHandle, TypeSerializer<N> typeSerializer, ReducingStateDescriptor<V> reducingStateDescriptor, RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
        super(columnFamilyHandle, typeSerializer, reducingStateDescriptor, rocksDBKeyedStateBackend);
        this.valueSerializer = reducingStateDescriptor.getSerializer();
        this.reduceFunction = reducingStateDescriptor.getReduceFunction();
        this.writeOptions = new WriteOptions();
        this.writeOptions.setDisableWAL(true);
    }

    public V get() {
        try {
            writeCurrentKeyWithGroupAndNamespace();
            byte[] bArr = this.backend.db.get(this.columnFamily, this.keySerializationStream.toByteArray());
            if (bArr == null) {
                return null;
            }
            return (V) this.valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(bArr)));
        } catch (IOException | RocksDBException e) {
            throw new RuntimeException("Error while retrieving data from RocksDB", e);
        }
    }

    public void add(V v) throws IOException {
        try {
            writeCurrentKeyWithGroupAndNamespace();
            byte[] byteArray = this.keySerializationStream.toByteArray();
            byte[] bArr = this.backend.db.get(this.columnFamily, byteArray);
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(this.keySerializationStream);
            if (bArr == null) {
                this.keySerializationStream.reset();
                this.valueSerializer.serialize(v, dataOutputViewStreamWrapper);
                this.backend.db.put(this.columnFamily, this.writeOptions, byteArray, this.keySerializationStream.toByteArray());
            } else {
                Object reduce = this.reduceFunction.reduce(this.valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(bArr))), v);
                this.keySerializationStream.reset();
                this.valueSerializer.serialize(reduce, dataOutputViewStreamWrapper);
                this.backend.db.put(this.columnFamily, this.writeOptions, byteArray, this.keySerializationStream.toByteArray());
            }
        } catch (Exception e) {
            throw new RuntimeException("Error while adding data to RocksDB", e);
        }
    }
}
