package io.milvus.bulkwriter;

import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.Lists;
import com.google.gson.JsonObject;
import io.milvus.bulkwriter.connect.AzureConnectParam;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.storage.StorageClient;
import io.milvus.bulkwriter.storage.client.AzureStorageClient;
import io.milvus.bulkwriter.storage.client.MinioStorageClient;
import io.milvus.common.utils.ExceptionUtils;
import io.milvus.param.Constant;
import io.minio.errors.ErrorResponseException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/milvus/bulkwriter/RemoteBulkWriter.class */
public class RemoteBulkWriter extends LocalBulkWriter {
    private static final Logger logger = LoggerFactory.getLogger(RemoteBulkWriter.class);
    private String remotePath;
    private StorageConnectParam connectParam;
    private StorageClient storageClient;
    private List<List<String>> remoteFiles;

    public RemoteBulkWriter(RemoteBulkWriterParam remoteBulkWriterParam) throws IOException {
        super(remoteBulkWriterParam.getCollectionSchema(), remoteBulkWriterParam.getChunkSize(), remoteBulkWriterParam.getFileType(), generatorLocalPath(), remoteBulkWriterParam.getConfig());
        this.remotePath = Paths.get(remoteBulkWriterParam.getRemotePath(), new String[0]).resolve(getUUID()).toString();
        this.connectParam = remoteBulkWriterParam.getConnectParam();
        getStorageClient();
        this.remoteFiles = Lists.newArrayList();
        logger.info("Remote buffer writer initialized, target path: {}", this.remotePath);
    }

    @Override // io.milvus.bulkwriter.LocalBulkWriter, io.milvus.bulkwriter.BulkWriter
    public void appendRow(JsonObject jsonObject) throws IOException, InterruptedException {
        super.appendRow(jsonObject);
    }

    @Override // io.milvus.bulkwriter.LocalBulkWriter, io.milvus.bulkwriter.BulkWriter
    public void commit(boolean z) throws InterruptedException {
        super.commit(z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.milvus.bulkwriter.LocalBulkWriter, io.milvus.bulkwriter.BulkWriter
    public String getDataPath() {
        return this.remotePath;
    }

    @Override // io.milvus.bulkwriter.LocalBulkWriter
    public List<List<String>> getBatchFiles() {
        return this.remoteFiles;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.milvus.bulkwriter.LocalBulkWriter
    public void exit() throws InterruptedException {
        super.exit();
        Path parent = Paths.get(this.localPath, new String[0]).getParent();
        if (parent.toFile().exists() && isEmptyDirectory(parent)) {
            try {
                Files.delete(parent);
                logger.info("Delete empty directory: " + parent);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static boolean isEmptyDirectory(Path path) {
        try {
            return !Files.walk(path, 1, FileVisitOption.FOLLOW_LINKS).skip(1L).findFirst().isPresent();
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
    }

    private void getStorageClient() {
        if (this.storageClient != null) {
            return;
        }
        if (this.connectParam instanceof S3ConnectParam) {
            S3ConnectParam s3ConnectParam = (S3ConnectParam) this.connectParam;
            this.storageClient = MinioStorageClient.getStorageClient(s3ConnectParam.getCloudName(), s3ConnectParam.getEndpoint(), s3ConnectParam.getAccessKey(), s3ConnectParam.getSecretKey(), s3ConnectParam.getSessionToken(), s3ConnectParam.getRegion(), s3ConnectParam.getHttpClient());
        } else if (this.connectParam instanceof AzureConnectParam) {
            AzureConnectParam azureConnectParam = (AzureConnectParam) this.connectParam;
            this.storageClient = AzureStorageClient.getStorageClient(azureConnectParam.getConnStr(), azureConnectParam.getAccountUrl(), azureConnectParam.getCredential());
        }
    }

    private void rmLocal(String str) {
        try {
            Path path = Paths.get(str, new String[0]);
            path.toFile().delete();
            Path parent = path.getParent();
            if (parent != null && !parent.toString().equals(this.localPath)) {
                try {
                    Files.delete(parent);
                    logger.info("Delete empty directory: " + parent);
                } catch (IOException e) {
                    logger.warn("Failed to delete empty directory: " + parent);
                }
            }
        } catch (Exception e2) {
            logger.warn("Failed to delete local file: " + str);
        }
    }

    @Override // io.milvus.bulkwriter.LocalBulkWriter
    protected void callBack(List<String> list) {
        ArrayList arrayList = new ArrayList();
        try {
            if (!bucketExists()) {
                ExceptionUtils.throwUnExpectedException("Blob storage bucket/container doesn't exist");
            }
            for (String str : list) {
                if (Lists.newArrayList(new String[]{".parquet"}).contains(getExtension(str))) {
                    String minioFilePath = getMinioFilePath(this.remotePath, str.replace(super.getDataPath(), Constant.DEFAULT_INDEX_NAME));
                    if (objectExists(minioFilePath)) {
                        logger.info(String.format("Remote file %s already exists, will overwrite it", minioFilePath));
                    }
                    uploadObject(str, minioFilePath);
                    arrayList.add(minioFilePath);
                    rmLocal(str);
                }
            }
        } catch (Exception e) {
            ExceptionUtils.throwUnExpectedException(String.format("Failed to upload files, error: %s", e));
        }
        logger.info("Successfully upload files: " + list);
        this.remoteFiles.add(arrayList);
    }

    @Override // io.milvus.bulkwriter.LocalBulkWriter, java.lang.AutoCloseable
    public void close() throws Exception {
        logger.info("execute remaining actions to prevent loss of memory data or residual empty directories.");
        exit();
        logger.info(String.format("RemoteBulkWriter done! output remote files: %s", getBatchFiles()));
    }

    private void getObjectEntity(String str) throws Exception {
        if (this.connectParam instanceof S3ConnectParam) {
            this.storageClient.getObjectEntity(((S3ConnectParam) this.connectParam).getBucketName(), str);
        } else if (this.connectParam instanceof AzureConnectParam) {
            this.storageClient.getObjectEntity(((AzureConnectParam) this.connectParam).getContainerName(), str);
        }
        ExceptionUtils.throwUnExpectedException("Blob storage client is not initialized");
    }

    private boolean objectExists(String str) throws Exception {
        try {
            getObjectEntity(str);
            return true;
        } catch (ErrorResponseException e) {
            if ("NoSuchKey".equals(e.errorResponse().code())) {
                return false;
            }
            ExceptionUtils.throwUnExpectedException(String.format("Failed to stat MinIO/S3 object %s, error: %s", str, e.errorResponse().message()));
            return true;
        } catch (BlobStorageException e2) {
            if (BlobErrorCode.BLOB_NOT_FOUND == e2.getErrorCode()) {
                return false;
            }
            ExceptionUtils.throwUnExpectedException(String.format("Failed to stat Azure object %s, error: %s", str, e2.getServiceMessage()));
            return true;
        }
    }

    private boolean bucketExists() throws Exception {
        if (this.connectParam instanceof S3ConnectParam) {
            return this.storageClient.checkBucketExist(((S3ConnectParam) this.connectParam).getBucketName());
        }
        if (this.connectParam instanceof AzureConnectParam) {
            return this.storageClient.checkBucketExist(((AzureConnectParam) this.connectParam).getContainerName());
        }
        ExceptionUtils.throwUnExpectedException("Blob storage client is not initialized");
        return false;
    }

    private void uploadObject(String str, String str2) throws Exception {
        logger.info(String.format("Prepare to upload %s to %s", str, str2));
        File file = new File(str);
        FileInputStream fileInputStream = new FileInputStream(file);
        if (this.connectParam instanceof S3ConnectParam) {
            this.storageClient.putObjectStream(fileInputStream, file.length(), ((S3ConnectParam) this.connectParam).getBucketName(), str2);
        } else if (this.connectParam instanceof AzureConnectParam) {
            this.storageClient.putObjectStream(fileInputStream, file.length(), ((AzureConnectParam) this.connectParam).getContainerName(), str2);
        } else {
            ExceptionUtils.throwUnExpectedException("Blob storage client is not initialized");
        }
        logger.info(String.format("Upload file %s to %s", str, str2));
    }

    private static String generatorLocalPath() {
        return Paths.get(Constant.DEFAULT_INDEX_NAME, new String[0]).toAbsolutePath().resolve("bulk_writer").toString();
    }

    private static String getMinioFilePath(String str, String str2) {
        return Paths.get(str.startsWith("/") ? str.substring(1) : str, new String[0]).resolve(Paths.get(str2.startsWith("/") ? str2.substring(1) : str2, new String[0])).toString();
    }

    private static String getExtension(String str) {
        String path = Paths.get(str, new String[0]).getFileName().toString();
        int lastIndexOf = path.lastIndexOf(46);
        return (lastIndexOf == -1 || lastIndexOf == path.length() - 1) ? Constant.DEFAULT_INDEX_NAME : path.substring(lastIndexOf);
    }
}
