/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.UUID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsFoldingState;
import org.apache.flink.runtime.state.filesystem.FsListState;
import org.apache.flink.runtime.state.filesystem.FsReducingState;
import org.apache.flink.runtime.state.filesystem.FsValueState;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FsStateBackend
extends AbstractStateBackend {
    private static final long serialVersionUID = -8191916350224044011L;
    private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
    public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
    public static final int MAX_FILE_STATE_THRESHOLD = 0x100000;
    private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
    private final Path basePath;
    private final int fileStateThreshold;
    private transient Path checkpointDirectory;
    private transient FileSystem filesystem;

    public FsStateBackend(String checkpointDataUri) throws IOException {
        this(new Path(checkpointDataUri));
    }

    public FsStateBackend(Path checkpointDataUri) throws IOException {
        this(checkpointDataUri.toUri());
    }

    public FsStateBackend(URI checkpointDataUri) throws IOException {
        this(checkpointDataUri, 1024);
    }

    public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
        if (fileStateSizeThreshold < 0) {
            throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
        }
        if (fileStateSizeThreshold > 0x100000) {
            throw new IllegalArgumentException("The threshold for file state size cannot be larger than 1048576");
        }
        this.fileStateThreshold = fileStateSizeThreshold;
        this.basePath = FsStateBackend.validateAndNormalizeUri(checkpointDataUri);
        this.filesystem = this.basePath.getFileSystem();
    }

    public Path getBasePath() {
        return this.basePath;
    }

    public Path getCheckpointDirectory() {
        return this.checkpointDirectory;
    }

    public int getFileStateSizeThreshold() {
        return this.fileStateThreshold;
    }

    public boolean isInitialized() {
        return this.filesystem != null && this.checkpointDirectory != null;
    }

    public FileSystem getFileSystem() {
        if (this.filesystem != null) {
            return this.filesystem;
        }
        throw new IllegalStateException("State backend has not been initialized.");
    }

    @Override
    public void initializeForJob(Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception {
        super.initializeForJob(env, operatorIdentifier, keySerializer);
        Path dir = new Path(this.basePath, env.getJobID().toString());
        LOG.info("Initializing file state backend to URI " + dir);
        this.filesystem = this.basePath.getFileSystem();
        this.filesystem.mkdirs(dir);
        this.checkpointDirectory = dir;
    }

    @Override
    public void disposeAllStateForCurrentJob() throws Exception {
        FileSystem fs = this.filesystem;
        Path dir = this.checkpointDirectory;
        if (fs == null || dir == null) {
            throw new IllegalStateException("state backend has not been initialized");
        }
        this.filesystem = null;
        this.checkpointDirectory = null;
        fs.delete(dir, true);
    }

    @Override
    public void close() throws Exception {
    }

    public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
        return new FsValueState(this, this.keySerializer, namespaceSerializer, stateDesc);
    }

    @Override
    public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
        return new FsListState(this, this.keySerializer, namespaceSerializer, stateDesc);
    }

    @Override
    public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
        return new FsReducingState(this, this.keySerializer, namespaceSerializer, stateDesc);
    }

    @Override
    protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
        return new FsFoldingState(this, this.keySerializer, namespaceSerializer, stateDesc);
    }

    @Override
    public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception {
        this.checkFileSystemInitialized();
        Path checkpointDir = this.createCheckpointDirPath(checkpointID);
        int bufferSize = Math.max(4096, this.fileStateThreshold);
        FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(checkpointDir, this.filesystem, bufferSize, this.fileStateThreshold);
        try (ObjectOutputStream os = new ObjectOutputStream(stream);){
            os.writeObject(state);
            StateHandle stateHandle = stream.closeAndGetHandle().toSerializableHandle();
            return stateHandle;
        }
    }

    @Override
    public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
        this.checkFileSystemInitialized();
        Path checkpointDir = this.createCheckpointDirPath(checkpointID);
        int bufferSize = Math.max(4096, this.fileStateThreshold);
        return new FsCheckpointStateOutputStream(checkpointDir, this.filesystem, bufferSize, this.fileStateThreshold);
    }

    private void checkFileSystemInitialized() throws IllegalStateException {
        if (this.filesystem == null || this.checkpointDirectory == null) {
            throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
        }
    }

    private Path createCheckpointDirPath(long checkpointID) {
        return new Path(this.checkpointDirectory, "chk-" + checkpointID);
    }

    public String toString() {
        return this.checkpointDirectory == null ? "File State Backend @ " + this.basePath : "File State Backend (initialized) @ " + this.checkpointDirectory;
    }

    public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException {
        String scheme = checkpointDataUri.getScheme();
        String path = checkpointDataUri.getPath();
        if (scheme == null) {
            throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. Please specify the file system scheme explicitly in the URI.");
        }
        if (path == null) {
            throw new IllegalArgumentException("The path to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.");
        }
        if (path.length() == 0 || path.equals("/")) {
            throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
        }
        FileSystem filesystem = FileSystem.get((URI)checkpointDataUri);
        if (filesystem == null) {
            throw new IOException("Could not find a file system for the given scheme in the available configurations.");
        }
        URI fsURI = filesystem.getUri();
        try {
            URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null);
            return new Path(baseURI);
        }
        catch (URISyntaxException e) {
            throw new IOException(String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", checkpointDataUri, fsURI), e);
        }
    }

    public static final class FsCheckpointStateOutputStream
    extends AbstractStateBackend.CheckpointStateOutputStream {
        private final byte[] writeBuffer;
        private int pos;
        private FSDataOutputStream outStream;
        private final int localStateThreshold;
        private final Path basePath;
        private final FileSystem fs;
        private Path statePath;
        private boolean closed;

        public FsCheckpointStateOutputStream(Path basePath, FileSystem fs, int bufferSize, int localStateThreshold) {
            if (bufferSize < localStateThreshold) {
                throw new IllegalArgumentException();
            }
            this.basePath = basePath;
            this.fs = fs;
            this.writeBuffer = new byte[bufferSize];
            this.localStateThreshold = localStateThreshold;
        }

        @Override
        public void write(int b) throws IOException {
            if (this.pos >= this.writeBuffer.length) {
                this.flush();
            }
            this.writeBuffer[this.pos++] = (byte)b;
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            if (len < this.writeBuffer.length / 2) {
                int remaining = this.writeBuffer.length - this.pos;
                if (len > remaining) {
                    System.arraycopy(b, off, this.writeBuffer, this.pos, remaining);
                    off += remaining;
                    len -= remaining;
                    this.pos += remaining;
                    this.flush();
                }
                System.arraycopy(b, off, this.writeBuffer, this.pos, len);
                this.pos += len;
            } else {
                this.flush();
                this.outStream.write(b, off, len);
            }
        }

        @Override
        public void flush() throws IOException {
            if (!this.closed) {
                if (this.outStream == null) {
                    this.fs.mkdirs(this.basePath);
                    Exception latestException = null;
                    for (int attempt = 0; attempt < 10; ++attempt) {
                        try {
                            this.statePath = new Path(this.basePath, UUID.randomUUID().toString());
                            this.outStream = this.fs.create(this.statePath, false);
                            break;
                        }
                        catch (Exception e) {
                            latestException = e;
                            continue;
                        }
                    }
                    if (this.outStream == null) {
                        throw new IOException("Could not open output stream for state backend", latestException);
                    }
                }
                if (this.pos > 0) {
                    this.outStream.write(this.writeBuffer, 0, this.pos);
                    this.pos = 0;
                }
            }
        }

        @Override
        public void close() {
            if (!this.closed) {
                this.closed = true;
                if (this.outStream != null) {
                    try {
                        this.outStream.close();
                        this.fs.delete(this.statePath, false);
                        try {
                            this.fs.delete(this.basePath, false);
                        }
                        catch (IOException iOException) {}
                    }
                    catch (Exception e) {
                        LOG.warn("Cannot delete closed and discarded state stream for " + this.statePath, (Throwable)e);
                    }
                }
            }
        }

        @Override
        public StreamStateHandle closeAndGetHandle() throws IOException {
            FsCheckpointStateOutputStream fsCheckpointStateOutputStream = this;
            synchronized (fsCheckpointStateOutputStream) {
                if (!this.closed) {
                    if (this.outStream == null && this.pos <= this.localStateThreshold) {
                        this.closed = true;
                        byte[] bytes = Arrays.copyOf(this.writeBuffer, this.pos);
                        return new ByteStreamStateHandle(bytes);
                    }
                    this.flush();
                    this.outStream.close();
                    this.closed = true;
                    return new FileStreamStateHandle(this.statePath);
                }
                throw new IOException("Stream has already been closed and discarded.");
            }
        }

        public Path closeAndGetPath() throws IOException {
            FsCheckpointStateOutputStream fsCheckpointStateOutputStream = this;
            synchronized (fsCheckpointStateOutputStream) {
                if (!this.closed) {
                    this.closed = true;
                    this.flush();
                    this.outStream.close();
                    return this.statePath;
                }
                throw new IOException("Stream has already been closed and discarded.");
            }
        }
    }
}

