package io.milvus.bulkwriter;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.param.collection.CollectionSchemaParam;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/milvus/bulkwriter/LocalBulkWriter.class */
public class LocalBulkWriter extends BulkWriter implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(LocalBulkWriter.class);
    protected String localPath;
    private String uuid;
    private int flushCount;
    private Map<String, Thread> workingThread;
    private ReentrantLock workingThreadLock;
    private List<List<String>> localFiles;

    public LocalBulkWriter(LocalBulkWriterParam localBulkWriterParam) throws IOException {
        super(localBulkWriterParam.getCollectionSchema(), localBulkWriterParam.getChunkSize(), localBulkWriterParam.getFileType());
        this.localPath = localBulkWriterParam.getLocalPath();
        this.uuid = UUID.randomUUID().toString();
        this.workingThreadLock = new ReentrantLock();
        this.workingThread = new HashMap();
        this.localFiles = Lists.newArrayList();
        makeDir();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LocalBulkWriter(CollectionSchemaParam collectionSchemaParam, int i, BulkFileType bulkFileType, String str) throws IOException {
        super(collectionSchemaParam, i, bulkFileType);
        this.localPath = str;
        this.uuid = UUID.randomUUID().toString();
        this.workingThreadLock = new ReentrantLock();
        this.workingThread = new HashMap();
        this.localFiles = Lists.newArrayList();
        makeDir();
    }

    @Override // io.milvus.bulkwriter.BulkWriter
    public void appendRow(JSONObject jSONObject) throws IOException, InterruptedException {
        super.appendRow(jSONObject);
        this.workingThreadLock.lock();
        if (super.getBufferSize().intValue() > super.getChunkSize().intValue()) {
            commit(true);
        }
        this.workingThreadLock.unlock();
    }

    @Override // io.milvus.bulkwriter.BulkWriter
    public void commit(boolean z) throws InterruptedException {
        while (this.workingThread.size() > 0) {
            logger.info(String.format("Previous flush action is not finished, %s is waiting...", Thread.currentThread().getName()));
            TimeUnit.SECONDS.sleep(5L);
        }
        logger.info(String.format("Prepare to flush buffer, row_count: %s, size: %s", super.getBufferRowCount(), super.getBufferSize()));
        int intValue = getBufferRowCount().intValue();
        int intValue2 = getBufferSize().intValue();
        Thread thread = new Thread(() -> {
            flush(Integer.valueOf(intValue2), Integer.valueOf(intValue));
        });
        logger.info("Flush thread begin, name: {}", thread.getName());
        this.workingThread.put(thread.getName(), thread);
        thread.start();
        if (!z) {
            logger.info("Wait flush to finish");
            thread.join();
        }
        super.commit(false);
        logger.info("Commit done with async={}", Boolean.valueOf(z));
    }

    private void flush(Integer num, Integer num2) {
        this.flushCount++;
        Path resolve = Paths.get(this.localPath, new String[0]).resolve(String.valueOf(this.flushCount));
        Buffer newBuffer = super.newBuffer();
        if (newBuffer.getRowCount().intValue() > 0) {
            List<String> persist = newBuffer.persist(resolve.toString(), num, num2);
            this.localFiles.add(persist);
            callBack(persist);
        }
        this.workingThread.remove(Thread.currentThread().getName());
        logger.info(String.format("Flush thread done, name: %s", Thread.currentThread().getName()));
    }

    protected void callBack(List<String> list) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.milvus.bulkwriter.BulkWriter
    public String getDataPath() {
        return this.localPath;
    }

    public List<List<String>> getBatchFiles() {
        return this.localFiles;
    }

    private void makeDir() throws IOException {
        Path path = Paths.get(this.localPath, new String[0]);
        createDirIfNotExist(path);
        Path resolve = path.resolve(this.uuid);
        createDirIfNotExist(resolve);
        this.localPath = resolve.toString();
    }

    private void createDirIfNotExist(Path path) throws IOException {
        try {
            Files.createDirectories(path, new FileAttribute[0]);
            logger.info("Data path created: {}", path);
        } catch (IOException e) {
            logger.error("Data Path create failed: {}", path);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void exit() throws InterruptedException {
        this.workingThreadLock.lock();
        if (getBufferSize() != null && getBufferSize().intValue() != 0) {
            commit(true);
        }
        this.workingThreadLock.unlock();
        if (this.workingThread.size() > 0) {
            for (String str : this.workingThread.keySet()) {
                logger.info("Wait flush thread '{}' to finish", str);
                this.workingThread.get(str).join();
            }
        }
        rmDir();
    }

    private void rmDir() {
        try {
            Path path = Paths.get(this.localPath, new String[0]);
            if (Files.exists(path, new LinkOption[0]) && isDirectoryEmpty(path)) {
                Files.delete(path);
                logger.info("Delete local directory {}", this.localPath);
            }
        } catch (IOException e) {
            logger.error("Error while deleting directory: " + e.getMessage());
        }
    }

    private boolean isDirectoryEmpty(Path path) throws IOException {
        DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(path);
        try {
            boolean z = !newDirectoryStream.iterator().hasNext();
            if (newDirectoryStream != null) {
                newDirectoryStream.close();
            }
            return z;
        } catch (Throwable th) {
            if (newDirectoryStream != null) {
                try {
                    newDirectoryStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getUUID() {
        return this.uuid;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        logger.info("execute remaining actions to prevent loss of memory data or residual empty directories.");
        exit();
        logger.info(String.format("LocalBulkWriter done! output local files: %s", getBatchFiles()));
    }
}
