package io.milvus.bulkwriter;

import com.google.common.collect.Lists;
import com.google.gson.JsonObject;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/milvus/bulkwriter/LocalBulkWriter.class */
public class LocalBulkWriter extends BulkWriter {
    private static final Logger logger = LoggerFactory.getLogger(LocalBulkWriter.class);
    private Map<String, Thread> workingThread;
    private ReentrantLock workingThreadLock;
    private List<List<String>> localFiles;

    public LocalBulkWriter(LocalBulkWriterParam localBulkWriterParam) throws IOException {
        super(localBulkWriterParam.getCollectionSchema(), localBulkWriterParam.getChunkSize(), localBulkWriterParam.getFileType(), localBulkWriterParam.getLocalPath(), localBulkWriterParam.getConfig());
        this.workingThreadLock = new ReentrantLock();
        this.workingThread = new HashMap();
        this.localFiles = Lists.newArrayList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LocalBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema, long j, BulkFileType bulkFileType, String str, Map<String, Object> map) throws IOException {
        super(collectionSchema, j, bulkFileType, str, map);
        this.workingThreadLock = new ReentrantLock();
        this.workingThread = new HashMap();
        this.localFiles = Lists.newArrayList();
    }

    @Override // io.milvus.bulkwriter.BulkWriter
    public void appendRow(JsonObject jsonObject) throws IOException, InterruptedException {
        super.appendRow(jsonObject);
    }

    @Override // io.milvus.bulkwriter.BulkWriter
    protected void callBackIfCommitReady(List<String> list) throws InterruptedException {
        this.workingThreadLock.lock();
        callBack(true, list);
        this.workingThreadLock.unlock();
    }

    public void commit(boolean z) throws InterruptedException {
        callBack(z, commitIfFileReady(false));
    }

    @Override // io.milvus.bulkwriter.BulkWriter
    protected List<String> commitIfFileReady(boolean z) {
        if (super.getTotalRowCount().longValue() <= 0) {
            logger.info("current_file_total_row_count less than 0, no need to generator a file");
            return null;
        }
        String filePath = super.getFileWriter().getFilePath();
        logger.info(String.format("Prepare to commit file:%s, current_file_total_row_count: %s, current_file_total_size:%s, create_new_file:%s", filePath, super.getTotalRowCount(), super.getTotalSize(), Boolean.valueOf(z)));
        ArrayList newArrayList = Lists.newArrayList(new String[]{filePath});
        try {
            (z ? newFileWriter() : super.getFileWriter()).close();
            this.localFiles.add(newArrayList);
            super.commit();
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
        return newArrayList;
    }

    private void callBack(boolean z, List<String> list) throws InterruptedException {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        while (!this.workingThread.isEmpty()) {
            logger.info(String.format("Previous callBack action is not finished, %s is waiting...", Thread.currentThread().getName()));
            TimeUnit.SECONDS.sleep(5L);
        }
        logger.info(String.format("Prepare to callBack, async:%s, fileList:%s", Boolean.valueOf(z), list));
        Thread thread = new Thread(() -> {
            commitIfFileReady((List<String>) list);
        });
        logger.info("CallBack thread begin, name: {}", thread.getName());
        this.workingThread.put(thread.getName(), thread);
        thread.start();
        if (!z) {
            logger.info("Wait callBack to finish");
            thread.join();
        }
        logger.info("CallBack done with async={}", Boolean.valueOf(z));
    }

    private void commitIfFileReady(List<String> list) {
        if (CollectionUtils.isNotEmpty(list)) {
            callBack(list);
        }
        this.workingThread.remove(Thread.currentThread().getName());
        logger.info(String.format("Flush thread done, name: %s", Thread.currentThread().getName()));
    }

    protected void callBack(List<String> list) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.milvus.bulkwriter.BulkWriter
    public String getDataPath() {
        return this.localPath;
    }

    public List<List<String>> getBatchFiles() {
        return this.localFiles;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void exit() throws InterruptedException {
        this.workingThreadLock.lock();
        callBack(true, commitIfFileReady(false));
        this.workingThreadLock.unlock();
        if (!this.workingThread.isEmpty()) {
            for (String str : this.workingThread.keySet()) {
                logger.info("Wait flush thread '{}' to finish", str);
                Thread thread = this.workingThread.get(str);
                if (thread != null) {
                    thread.join();
                }
            }
        }
        rmDir();
    }

    private void rmDir() {
        try {
            Path path = Paths.get(this.localPath, new String[0]);
            if (Files.exists(path, new LinkOption[0]) && isDirectoryEmpty(path)) {
                Files.delete(path);
                logger.info("Delete local directory {}", this.localPath);
            }
        } catch (IOException e) {
            logger.error("Error while deleting directory: " + e.getMessage());
        }
    }

    private boolean isDirectoryEmpty(Path path) throws IOException {
        DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(path);
        Throwable th = null;
        try {
            try {
                boolean z = !newDirectoryStream.iterator().hasNext();
                if (newDirectoryStream != null) {
                    if (0 != 0) {
                        try {
                            newDirectoryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newDirectoryStream.close();
                    }
                }
                return z;
            } finally {
            }
        } catch (Throwable th3) {
            if (newDirectoryStream != null) {
                if (th != null) {
                    try {
                        newDirectoryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newDirectoryStream.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getUUID() {
        return this.uuid;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        logger.info("execute remaining actions to prevent loss of memory data or residual empty directories.");
        exit();
        logger.info(String.format("LocalBulkWriter done! output local files: %s", getBatchFiles()));
    }
}
