package org.apache.flink.contrib.streaming.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.CheckedSupplier;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateUploader.class */
public class RocksDBStateUploader extends RocksDBStateDataTransfer {
    private static final int READ_BUFFER_SIZE = 16384;

    public RocksDBStateUploader(int i) {
        super(i);
    }

    public Map<StateHandleID, StreamStateHandle> uploadFilesToCheckpointFs(@Nonnull Map<StateHandleID, Path> map, CheckpointStreamFactory checkpointStreamFactory, CloseableRegistry closeableRegistry) throws Exception {
        HashMap hashMap = new HashMap();
        Map<StateHandleID, CompletableFuture<StreamStateHandle>> createUploadFutures = createUploadFutures(map, checkpointStreamFactory, closeableRegistry);
        try {
            FutureUtils.waitForAll(createUploadFutures.values()).get();
            for (Map.Entry<StateHandleID, CompletableFuture<StreamStateHandle>> entry : createUploadFutures.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().get());
            }
            return hashMap;
        } catch (ExecutionException e) {
            Throwable stripException = ExceptionUtils.stripException(ExceptionUtils.stripExecutionException(e), RuntimeException.class);
            if (stripException instanceof IOException) {
                throw ((IOException) stripException);
            }
            throw new FlinkRuntimeException("Failed to upload data for state handles.", e);
        }
    }

    private Map<StateHandleID, CompletableFuture<StreamStateHandle>> createUploadFutures(Map<StateHandleID, Path> map, CheckpointStreamFactory checkpointStreamFactory, CloseableRegistry closeableRegistry) {
        HashMap hashMap = new HashMap(map.size());
        for (Map.Entry<StateHandleID, Path> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> {
                return uploadLocalFileToCheckpointFs((Path) entry.getValue(), checkpointStreamFactory, closeableRegistry);
            }), this.executorService));
        }
        return hashMap;
    }

    private StreamStateHandle uploadLocalFileToCheckpointFs(Path path, CheckpointStreamFactory checkpointStreamFactory, CloseableRegistry closeableRegistry) throws IOException {
        InputStream inputStream = null;
        Closeable closeable = null;
        try {
            byte[] bArr = new byte[16384];
            inputStream = Files.newInputStream(path, new OpenOption[0]);
            closeableRegistry.registerCloseable(inputStream);
            closeable = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
            closeableRegistry.registerCloseable(closeable);
            while (true) {
                int read = inputStream.read(bArr);
                if (read == -1) {
                    break;
                }
                closeable.write(bArr, 0, read);
            }
            StreamStateHandle streamStateHandle = null;
            if (closeableRegistry.unregisterCloseable(closeable)) {
                streamStateHandle = closeable.closeAndGetHandle();
                closeable = null;
            }
            StreamStateHandle streamStateHandle2 = streamStateHandle;
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly(inputStream);
            }
            if (closeableRegistry.unregisterCloseable(closeable)) {
                IOUtils.closeQuietly(closeable);
            }
            return streamStateHandle2;
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly(inputStream);
            }
            if (closeableRegistry.unregisterCloseable(closeable)) {
                IOUtils.closeQuietly(closeable);
            }
            throw th;
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.RocksDBStateDataTransfer, java.io.Closeable, java.lang.AutoCloseable
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }
}
