package org.apache.flink.runtime.state.filesystem;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.UUID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.shaded.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsStateBackend.class */
public class FsStateBackend extends AbstractStateBackend {
    private static final long serialVersionUID = -8191916350224044011L;
    private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
    public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
    public static final int MAX_FILE_STATE_THRESHOLD = 1048576;
    private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
    private final Path basePath;
    private final int fileStateThreshold;
    private transient Path checkpointDirectory;
    private transient FileSystem filesystem;
    private transient HashSet<FsCheckpointStateOutputStream> openStreams;
    private volatile transient boolean closed;

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsStateBackend$FsCheckpointStateOutputStream.class */
    public static final class FsCheckpointStateOutputStream extends AbstractStateBackend.CheckpointStateOutputStream {
        private final byte[] writeBuffer;
        private int pos;
        private FSDataOutputStream outStream;
        private final int localStateThreshold;
        private final Path basePath;
        private final FileSystem fs;
        private final HashSet<FsCheckpointStateOutputStream> openStreams;
        private Path statePath;
        private volatile boolean closed;

        public FsCheckpointStateOutputStream(Path path, FileSystem fileSystem, HashSet<FsCheckpointStateOutputStream> hashSet, int i, int i2) {
            if (i < i2) {
                throw new IllegalArgumentException();
            }
            this.basePath = path;
            this.fs = fileSystem;
            this.openStreams = (HashSet) Preconditions.checkNotNull(hashSet);
            this.writeBuffer = new byte[i];
            this.localStateThreshold = i2;
            synchronized (hashSet) {
                hashSet.add(this);
            }
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            if (this.pos >= this.writeBuffer.length) {
                flush();
            }
            byte[] bArr = this.writeBuffer;
            int i2 = this.pos;
            this.pos = i2 + 1;
            bArr[i2] = (byte) i;
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            if (i2 >= this.writeBuffer.length / 2) {
                flush();
                this.outStream.write(bArr, i, i2);
                return;
            }
            int length = this.writeBuffer.length - this.pos;
            if (i2 > length) {
                System.arraycopy(bArr, i, this.writeBuffer, this.pos, length);
                i += length;
                i2 -= length;
                this.pos += length;
                flush();
            }
            System.arraycopy(bArr, i, this.writeBuffer, this.pos, i2);
            this.pos += i2;
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            if (this.closed) {
                throw new IOException("stream is closed");
            }
            if (this.outStream == null) {
                this.fs.mkdirs(this.basePath);
                Exception exc = null;
                for (int i = 0; i < 10; i++) {
                    try {
                        this.statePath = new Path(this.basePath, UUID.randomUUID().toString());
                        this.outStream = this.fs.create(this.statePath, false);
                        break;
                    } catch (Exception e) {
                        exc = e;
                    }
                }
                if (this.outStream == null) {
                    throw new IOException("Could not open output stream for state backend", exc);
                }
            }
            if (this.pos > 0) {
                this.outStream.write(this.writeBuffer, 0, this.pos);
                this.pos = 0;
            }
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.pos = this.writeBuffer.length;
            synchronized (this.openStreams) {
                this.openStreams.remove(this);
            }
            try {
                if (this.outStream != null) {
                    try {
                        this.outStream.close();
                    } catch (Throwable th) {
                        FsStateBackend.LOG.warn("Cannot delete closed and discarded state stream for {}.", this.statePath, th);
                        try {
                            this.fs.delete(this.statePath, false);
                            try {
                                FileUtils.deletePathIfEmpty(this.fs, this.basePath);
                            } catch (Throwable th2) {
                                FsStateBackend.LOG.debug("Could not delete parent directory for path {}.", this.basePath, th2);
                            }
                        } catch (Throwable th3) {
                            FsStateBackend.LOG.warn("Could not delete stream file for {}.", this.statePath, th3);
                        }
                    }
                }
            } finally {
                try {
                    this.fs.delete(this.statePath, false);
                    try {
                        FileUtils.deletePathIfEmpty(this.fs, this.basePath);
                    } catch (Throwable th4) {
                        FsStateBackend.LOG.debug("Could not delete parent directory for path {}.", this.basePath, th4);
                    }
                } catch (Throwable th5) {
                    FsStateBackend.LOG.warn("Could not delete stream file for {}.", this.statePath, th5);
                }
            }
        }

        @Override // org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputStream
        public StreamStateHandle closeAndGetHandle() throws IOException {
            synchronized (this) {
                if (this.closed) {
                    throw new IOException("Stream has already been closed and discarded.");
                }
                synchronized (this.openStreams) {
                    this.openStreams.remove(this);
                }
                if (this.outStream == null && this.pos <= this.localStateThreshold) {
                    this.closed = true;
                    byte[] copyOf = Arrays.copyOf(this.writeBuffer, this.pos);
                    this.pos = this.writeBuffer.length;
                    return new ByteStreamStateHandle(copyOf);
                }
                try {
                    try {
                        flush();
                        this.outStream.close();
                        this.closed = true;
                        this.pos = this.writeBuffer.length;
                        return new FileStreamStateHandle(this.statePath);
                    } catch (Exception e) {
                        FsStateBackend.LOG.warn("Could not close the file system output stream. Trying to delete the underlying file.");
                        try {
                            this.fs.delete(this.statePath, false);
                            try {
                                FileUtils.deletePathIfEmpty(this.fs, this.basePath);
                            } catch (Throwable th) {
                                FsStateBackend.LOG.debug("Could not delete parent directory for path {}.", this.basePath, th);
                            }
                        } catch (Throwable th2) {
                            FsStateBackend.LOG.warn("Could not delete close and discarded state stream for {}.", this.statePath, th2);
                        }
                        throw new IOException("Could not close the file system output stream.", e);
                    }
                } catch (Throwable th3) {
                    this.closed = true;
                    this.pos = this.writeBuffer.length;
                    throw th3;
                }
            }
        }

        public Path closeAndGetPath() throws IOException {
            Path path;
            synchronized (this) {
                if (this.closed) {
                    throw new IOException("Stream has already been closed and discarded.");
                }
                synchronized (this.openStreams) {
                    this.openStreams.remove(this);
                }
                try {
                    try {
                        flush();
                        this.outStream.close();
                        this.closed = true;
                        this.pos = this.writeBuffer.length;
                        path = this.statePath;
                    } catch (Throwable th) {
                        this.closed = true;
                        this.pos = this.writeBuffer.length;
                        throw th;
                    }
                } catch (Exception e) {
                    FsStateBackend.LOG.warn("Could not close the file system output stream. Trying to delete the underlying file.");
                    try {
                        this.fs.delete(this.statePath, false);
                        try {
                            FileUtils.deletePathIfEmpty(this.fs, this.basePath);
                        } catch (Throwable th2) {
                            FsStateBackend.LOG.debug("Could not delete parent directory for path {}.", this.basePath, th2);
                        }
                    } catch (Throwable th3) {
                        FsStateBackend.LOG.warn("Could not delete close and discarded state stream for {}.", this.statePath, th3);
                    }
                    throw new IOException("Could not close the file system output stream.", e);
                }
            }
            return path;
        }

        public boolean isClosed() {
            return this.closed;
        }

        public final int hashCode() {
            return super.hashCode();
        }

        public final boolean equals(Object obj) {
            return this == obj;
        }
    }

    public FsStateBackend(String str) throws IOException {
        this(new Path(str));
    }

    public FsStateBackend(Path path) throws IOException {
        this(path.toUri());
    }

    public FsStateBackend(URI uri) throws IOException {
        this(uri, 1024);
    }

    public FsStateBackend(URI uri, int i) throws IOException {
        if (i < 0) {
            throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
        }
        if (i > 1048576) {
            throw new IllegalArgumentException("The threshold for file state size cannot be larger than 1048576");
        }
        this.fileStateThreshold = i;
        this.basePath = validateAndNormalizeUri(uri);
    }

    public Path getBasePath() {
        return this.basePath;
    }

    public Path getCheckpointDirectory() {
        return this.checkpointDirectory;
    }

    public int getFileStateSizeThreshold() {
        return this.fileStateThreshold;
    }

    public boolean isInitialized() {
        return (this.filesystem == null || this.checkpointDirectory == null) ? false : true;
    }

    public FileSystem getFileSystem() {
        if (this.filesystem != null) {
            return this.filesystem;
        }
        throw new IllegalStateException("State backend has not been initialized.");
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend
    public void initializeForJob(Environment environment, String str, TypeSerializer<?> typeSerializer) throws Exception {
        super.initializeForJob(environment, str, typeSerializer);
        Path path = new Path(this.basePath, environment.getJobID().toString());
        LOG.info("Initializing file state backend to URI " + path);
        this.filesystem = this.basePath.getFileSystem();
        this.filesystem.mkdirs(path);
        this.checkpointDirectory = path;
        this.openStreams = new HashSet<>();
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend
    public void disposeAllStateForCurrentJob() throws Exception {
        FileSystem fileSystem = this.filesystem;
        Path path = this.checkpointDirectory;
        if (fileSystem == null || path == null) {
            throw new IllegalStateException("state backend has not been initialized");
        }
        this.filesystem = null;
        this.checkpointDirectory = null;
        fileSystem.delete(path, true);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        ArrayList arrayList;
        this.closed = true;
        HashSet<FsCheckpointStateOutputStream> hashSet = this.openStreams;
        if (hashSet != null) {
            synchronized (hashSet) {
                arrayList = new ArrayList(hashSet);
                hashSet.clear();
            }
            Throwable th = null;
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                try {
                    ((FsCheckpointStateOutputStream) it.next()).close();
                } catch (Throwable th2) {
                    if (th == null) {
                        th = th2;
                    } else {
                        th.addSuppressed(th2);
                    }
                }
            }
            if (th != null) {
                ExceptionUtils.rethrowIOException(th);
            }
        }
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend
    public <N, V> ValueState<V> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<V> valueStateDescriptor) throws Exception {
        return new FsValueState(this, this.keySerializer, typeSerializer, valueStateDescriptor);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend
    public <N, T> ListState<T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        return new FsListState(this, this.keySerializer, typeSerializer, listStateDescriptor);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend
    public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
        return new FsReducingState(this, this.keySerializer, typeSerializer, reducingStateDescriptor);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend
    protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
        return new FsFoldingState(this, this.keySerializer, typeSerializer, foldingStateDescriptor);
    }

    /* JADX WARN: Failed to calculate best type for var: r16v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r16v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 16, insn: 0x0098: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r16 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:26:0x0098 */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x009d: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:28:0x009d */
    /* JADX WARN: Type inference failed for: r16v1, types: [org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream] */
    /* JADX WARN: Type inference failed for: r17v0, types: [java.lang.Throwable] */
    @Override // org.apache.flink.runtime.state.AbstractStateBackend
    public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S s, long j, long j2) throws Exception {
        checkFileSystemInitialized();
        try {
            try {
                FsCheckpointStateOutputStream fsCheckpointStateOutputStream = new FsCheckpointStateOutputStream(createCheckpointDirPath(j), this.filesystem, this.openStreams, Math.max(4096, this.fileStateThreshold), this.fileStateThreshold);
                Throwable th = null;
                if (this.closed) {
                    throw new IOException("closed");
                }
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(fsCheckpointStateOutputStream);
                objectOutputStream.writeObject(s);
                objectOutputStream.flush();
                StateHandle<S> serializableHandle = fsCheckpointStateOutputStream.closeAndGetHandle().toSerializableHandle();
                if (fsCheckpointStateOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fsCheckpointStateOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fsCheckpointStateOutputStream.close();
                    }
                }
                return serializableHandle;
            } finally {
            }
        } catch (IOException e) {
            throw new IOException("Could not serialize state.", e);
        }
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend
    public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long j, long j2) throws Exception {
        checkFileSystemInitialized();
        FsCheckpointStateOutputStream fsCheckpointStateOutputStream = new FsCheckpointStateOutputStream(createCheckpointDirPath(j), this.filesystem, this.openStreams, Math.max(4096, this.fileStateThreshold), this.fileStateThreshold);
        if (!this.closed) {
            return fsCheckpointStateOutputStream;
        }
        fsCheckpointStateOutputStream.close();
        throw new IOException("closed");
    }

    private void checkFileSystemInitialized() throws IllegalStateException {
        if (this.filesystem == null || this.checkpointDirectory == null) {
            throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
        }
    }

    private Path createCheckpointDirPath(long j) {
        return new Path(this.checkpointDirectory, "chk-" + j);
    }

    public String toString() {
        return this.checkpointDirectory == null ? "File State Backend @ " + this.basePath : "File State Backend (initialized) @ " + this.checkpointDirectory;
    }

    public static Path validateAndNormalizeUri(URI uri) throws IOException {
        String scheme = uri.getScheme();
        String path = uri.getPath();
        if (scheme == null) {
            throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. Please specify the file system scheme explicitly in the URI.");
        }
        if (path == null) {
            throw new IllegalArgumentException("The path to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.");
        }
        if (path.length() == 0 || path.equals(ZKPaths.PATH_SEPARATOR)) {
            throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
        }
        if (!FileSystem.isFlinkSupportedScheme(uri.getScheme())) {
            return new Path(uri);
        }
        FileSystem fileSystem = FileSystem.get(uri);
        if (fileSystem == null) {
            LOG.warn("Could not verify checkpoint path. This might be caused by a genuine problem or by the fact that the file system is not accessible from the client. Reason:{}", "Could not find a file system for the given scheme inthe available configurations.");
            return new Path(uri);
        }
        URI uri2 = fileSystem.getUri();
        try {
            return new Path(new URI(uri2.getScheme(), uri2.getAuthority(), path, null, null));
        } catch (URISyntaxException e) {
            LOG.warn("Could not verify checkpoint path. This might be caused by a genuine problem or by the fact that the file system is not accessible from the client. Reason: {}", String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s: " + e.toString(), uri, uri2));
            return new Path(uri);
        }
    }
}
