/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;

public class RocksDBReducingState<K, N, V>
extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
implements ReducingState<V> {
    private final TypeSerializer<V> valueSerializer;
    private final ReducingStateDescriptor<V> stateDesc;
    private final ReduceFunction<V> reduceFunction;
    private final WriteOptions writeOptions;

    public RocksDBReducingState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<V> stateDesc, RocksDBStateBackend backend) {
        super(columnFamily, namespaceSerializer, backend);
        this.stateDesc = Objects.requireNonNull(stateDesc);
        this.valueSerializer = stateDesc.getSerializer();
        this.reduceFunction = stateDesc.getReduceFunction();
        this.writeOptions = new WriteOptions();
        this.writeOptions.setDisableWAL(true);
    }

    public V get() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)baos);
        try {
            this.writeKeyAndNamespace((DataOutputView)out);
            byte[] key = baos.toByteArray();
            byte[] valueBytes = this.backend.db.get(this.columnFamily, key);
            if (valueBytes == null) {
                return null;
            }
            return (V)this.valueSerializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStream(valueBytes)));
        }
        catch (IOException | RocksDBException e) {
            throw new RuntimeException("Error while retrieving data from RocksDB", e);
        }
    }

    public void add(V value) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)baos);
        try {
            this.writeKeyAndNamespace((DataOutputView)out);
            byte[] key = baos.toByteArray();
            byte[] valueBytes = this.backend.db.get(this.columnFamily, key);
            if (valueBytes == null) {
                baos.reset();
                this.valueSerializer.serialize(value, (DataOutputView)out);
                this.backend.db.put(this.columnFamily, this.writeOptions, key, baos.toByteArray());
            } else {
                Object oldValue = this.valueSerializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStream(valueBytes)));
                Object newValue = this.reduceFunction.reduce(oldValue, value);
                baos.reset();
                this.valueSerializer.serialize(newValue, (DataOutputView)out);
                this.backend.db.put(this.columnFamily, this.writeOptions, key, baos.toByteArray());
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Error while adding data to RocksDB", e);
        }
    }
}

