package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.rocksdb.Options;
import org.rocksdb.StringAppendOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackend.class */
public class RocksDBStateBackend extends AbstractStateBackend {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
    private final Path checkpointDirectory;
    private final AbstractStateBackend nonPartitionedStateBackend;
    private String operatorIdentifier;
    private JobID jobId;
    private Path[] configuredDbBasePaths;
    private File[] initializedDbBasePaths;
    private int nextDirectory;
    private PredefinedOptions predefinedOptions;
    private OptionsFactory optionsFactory;
    private transient Options rocksDbOptions;

    public RocksDBStateBackend(String str) throws IOException {
        this(new Path(str).toUri());
    }

    public RocksDBStateBackend(URI uri) throws IOException {
        this.predefinedOptions = PredefinedOptions.DEFAULT;
        FsStateBackend fsStateBackend = new FsStateBackend(uri);
        this.nonPartitionedStateBackend = fsStateBackend;
        this.checkpointDirectory = fsStateBackend.getBasePath();
    }

    public RocksDBStateBackend(String str, AbstractStateBackend abstractStateBackend) throws IOException {
        this(new Path(str).toUri(), abstractStateBackend);
    }

    public RocksDBStateBackend(URI uri, AbstractStateBackend abstractStateBackend) throws IOException {
        this.predefinedOptions = PredefinedOptions.DEFAULT;
        this.nonPartitionedStateBackend = (AbstractStateBackend) Objects.requireNonNull(abstractStateBackend);
        this.checkpointDirectory = FsStateBackend.validateAndNormalizeUri(uri);
    }

    public void initializeForJob(Environment environment, String str, TypeSerializer<?> typeSerializer) throws Exception {
        super.initializeForJob(environment, str, typeSerializer);
        this.nonPartitionedStateBackend.initializeForJob(environment, str, typeSerializer);
        this.operatorIdentifier = str.replace(" ", "");
        this.jobId = environment.getJobID();
        if (this.configuredDbBasePaths == null) {
            this.initializedDbBasePaths = environment.getIOManager().getSpillingDirectories();
        } else {
            ArrayList arrayList = new ArrayList(this.configuredDbBasePaths.length);
            String str2 = "";
            for (Path path : this.configuredDbBasePaths) {
                File file = new File(path.toUri().getPath());
                File file2 = new File(file, UUID.randomUUID().toString());
                if (file2.mkdirs()) {
                    arrayList.add(file);
                } else {
                    String str3 = "Local DB files directory '" + path + "' does not exist and cannot be created. ";
                    LOG.error(str3);
                    str2 = str2 + str3;
                }
                file2.delete();
            }
            if (arrayList.isEmpty()) {
                throw new Exception("No local storage directories available. " + str2);
            }
            this.initializedDbBasePaths = (File[]) arrayList.toArray(new File[arrayList.size()]);
        }
        this.nextDirectory = new Random().nextInt(this.initializedDbBasePaths.length);
    }

    public void disposeAllStateForCurrentJob() throws Exception {
        this.nonPartitionedStateBackend.disposeAllStateForCurrentJob();
    }

    public void close() throws Exception {
        this.nonPartitionedStateBackend.close();
        Options options = this.rocksDbOptions;
        if (options != null) {
            options.dispose();
            this.rocksDbOptions = null;
        }
    }

    File getDbPath(String str) {
        return new File(new File(new File(getNextStoragePath(), this.jobId.toString()), this.operatorIdentifier), str);
    }

    String getCheckpointPath(String str) {
        return this.checkpointDirectory + "/" + this.jobId.toString() + "/" + this.operatorIdentifier + "/" + str;
    }

    File[] getStoragePaths() {
        return this.initializedDbBasePaths;
    }

    File getNextStoragePath() {
        int i = this.nextDirectory + 1;
        int i2 = i >= this.initializedDbBasePaths.length ? 0 : i;
        this.nextDirectory = i2;
        return this.initializedDbBasePaths[i2];
    }

    protected <N, T> ValueState<T> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<T> valueStateDescriptor) throws Exception {
        return new RocksDBValueState(this.keySerializer, typeSerializer, valueStateDescriptor, getDbPath(valueStateDescriptor.getName()), getCheckpointPath(valueStateDescriptor.getName()), getRocksDBOptions());
    }

    protected <N, T> ListState<T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        return new RocksDBListState(this.keySerializer, typeSerializer, listStateDescriptor, getDbPath(listStateDescriptor.getName()), getCheckpointPath(listStateDescriptor.getName()), getRocksDBOptions());
    }

    protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
        return new RocksDBReducingState(this.keySerializer, typeSerializer, reducingStateDescriptor, getDbPath(reducingStateDescriptor.getName()), getCheckpointPath(reducingStateDescriptor.getName()), getRocksDBOptions());
    }

    protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
        return new RocksDBFoldingState(this.keySerializer, typeSerializer, foldingStateDescriptor, getDbPath(foldingStateDescriptor.getName()), getCheckpointPath(foldingStateDescriptor.getName()), getRocksDBOptions());
    }

    public AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream(long j, long j2) throws Exception {
        return this.nonPartitionedStateBackend.createCheckpointStateOutputStream(j, j2);
    }

    public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S s, long j, long j2) throws Exception {
        return this.nonPartitionedStateBackend.checkpointStateSerializable(s, j, j2);
    }

    public void setDbStoragePath(String str) {
        setDbStoragePaths(str == null ? null : new String[]{str});
    }

    public void setDbStoragePaths(String... strArr) {
        if (strArr == null) {
            this.configuredDbBasePaths = null;
            return;
        }
        if (strArr.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        Path[] pathArr = new Path[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            if (strArr[i] == null) {
                throw new IllegalArgumentException("null path");
            }
            pathArr[i] = new Path(strArr[i]);
            String scheme = pathArr[i].toUri().getScheme();
            if (scheme != null && !scheme.equalsIgnoreCase("file")) {
                throw new IllegalArgumentException("Path " + strArr[i] + " has a non local scheme");
            }
        }
        this.configuredDbBasePaths = pathArr;
    }

    public String[] getDbStoragePaths() {
        if (this.configuredDbBasePaths == null) {
            return null;
        }
        String[] strArr = new String[this.configuredDbBasePaths.length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = this.configuredDbBasePaths[i].toString();
        }
        return strArr;
    }

    public void setPredefinedOptions(PredefinedOptions predefinedOptions) {
        this.predefinedOptions = (PredefinedOptions) Objects.requireNonNull(predefinedOptions);
    }

    public PredefinedOptions getPredefinedOptions() {
        return this.predefinedOptions;
    }

    public void setOptions(OptionsFactory optionsFactory) {
        this.optionsFactory = optionsFactory;
    }

    public OptionsFactory getOptions() {
        return this.optionsFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Options getRocksDBOptions() {
        if (this.rocksDbOptions == null) {
            Options createOptions = this.predefinedOptions.createOptions();
            if (this.optionsFactory != null) {
                createOptions = this.optionsFactory.createOptions(createOptions);
            }
            this.rocksDbOptions = createOptions.setCreateIfMissing(true).setMergeOperator(new StringAppendOperator());
        }
        return this.rocksDbOptions;
    }
}
