package org.apache.accumulo.manager.tableOps.bulkVer1;

import com.google.common.annotations.VisibleForTesting;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.Utils;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.class */
public class BulkImport extends ManagerRepo {
    public static final String FAILURES_TXT = "failures.txt";
    private static final long serialVersionUID = 1;
    private static final Logger log = LoggerFactory.getLogger(BulkImport.class);
    private TableId tableId;
    private String sourceDir;
    private String errorDir;
    private boolean setTime;

    public BulkImport(TableId tableId, String str, String str2, boolean z) {
        this.tableId = tableId;
        this.sourceDir = str;
        this.errorDir = str2;
        this.setTime = z;
    }

    @Override // org.apache.accumulo.manager.tableOps.ManagerRepo
    public long isReady(long j, Manager manager) throws Exception {
        if (!Utils.getReadLock(manager, this.tableId, j).tryLock()) {
            return 100L;
        }
        manager.getContext().clearTableListCache();
        if (manager.getContext().getTableState(this.tableId) != TableState.ONLINE) {
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), (String) null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, (String) null);
        }
        long reserveHdfsDirectory = Utils.reserveHdfsDirectory(manager, this.sourceDir, j);
        long j2 = reserveHdfsDirectory;
        if (reserveHdfsDirectory == 0) {
            j2 = Utils.reserveHdfsDirectory(manager, this.errorDir, j);
        }
        return j2;
    }

    @Override // org.apache.accumulo.manager.tableOps.ManagerRepo
    public Repo<Manager> call(long j, Manager manager) throws Exception {
        log.debug(" {} sourceDir {}", FateTxId.formatTid(j), this.sourceDir);
        Utils.getReadLock(manager, this.tableId, j).lock();
        VolumeManager volumeManager = manager.getVolumeManager();
        Path path = new Path(this.errorDir);
        FileStatus fileStatus = null;
        try {
            fileStatus = volumeManager.getFileStatus(path);
        } catch (FileNotFoundException e) {
        }
        if (fileStatus == null) {
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), (String) null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " does not exist");
        }
        if (!fileStatus.isDirectory()) {
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), (String) null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " is not a directory");
        }
        if (volumeManager.listStatus(path).length != 0) {
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), (String) null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " is not empty");
        }
        TransactionWatcher.ZooArbitrator.start(manager.getContext(), "bulkTx", j);
        manager.updateBulkImportStatus(this.sourceDir, BulkImportState.MOVING);
        try {
            String prepareBulkImport = prepareBulkImport(manager.getContext(), volumeManager, this.sourceDir, this.tableId, j);
            log.debug(" {} bulkDir {}", Long.valueOf(j), prepareBulkImport);
            return new LoadFiles(this.tableId, this.sourceDir, prepareBulkImport, this.errorDir, this.setTime);
        } catch (IOException e2) {
            log.error("error preparing the bulk import directory", e2);
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), (String) null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, this.sourceDir + ": " + e2);
        }
    }

    private static Path createNewBulkDir(ServerContext serverContext, VolumeManager volumeManager, String str, TableId tableId) throws IOException {
        Path matchingFileSystem = volumeManager.matchingFileSystem(new Path(str), serverContext.getTablesDirs());
        if (matchingFileSystem == null) {
            throw new IOException(str + " is not in the same file system as any volume configured for Accumulo");
        }
        Path path = new Path(matchingFileSystem, tableId.canonical());
        volumeManager.mkdirs(path);
        UniqueNameAllocator uniqueNameAllocator = serverContext.getUniqueNameAllocator();
        while (true) {
            Path path2 = new Path(path, "b-" + uniqueNameAllocator.getNextName());
            if (volumeManager.exists(path2)) {
                throw new IOException("Dir exist when it should not " + path2);
            }
            if (volumeManager.mkdirs(path2)) {
                return path2;
            }
            log.warn("Failed to create {} for unknown reason", path2);
            UtilWaitThread.sleepUninterruptibly(3L, TimeUnit.SECONDS);
        }
    }

    @VisibleForTesting
    public static String prepareBulkImport(ServerContext serverContext, VolumeManager volumeManager, String str, TableId tableId, long j) throws Exception {
        Path createNewBulkDir = createNewBulkDir(serverContext, volumeManager, str, tableId);
        MetadataTableUtil.addBulkLoadInProgressFlag(serverContext, "/" + createNewBulkDir.getParent().getName() + "/" + createNewBulkDir.getName(), j);
        FileStatus[] listStatus = volumeManager.listStatus(new Path(str));
        UniqueNameAllocator uniqueNameAllocator = serverContext.getUniqueNameAllocator();
        AccumuloConfiguration configuration = serverContext.getConfiguration();
        ThreadPoolExecutor createExecutorService = ThreadPools.getServerThreadPools().createExecutorService(configuration, configuration.resolve(Property.MANAGER_RENAME_THREADS, new Property[]{Property.MANAGER_BULK_RENAME_THREADS}), false);
        ArrayList<Future> arrayList = new ArrayList();
        for (FileStatus fileStatus : listStatus) {
            arrayList.add(createExecutorService.submit(() -> {
                String str2;
                try {
                    String[] split = fileStatus.getPath().getName().split("\\.");
                    if (split.length > 1) {
                        str2 = split[split.length - 1];
                        if (!FileOperations.getValidExtensions().contains(str2)) {
                            log.warn("{} does not have a valid extension, ignoring", fileStatus.getPath());
                            return null;
                        }
                    } else {
                        str2 = "map";
                    }
                    if (str2.equals("map")) {
                        if (!fileStatus.isDirectory()) {
                            log.warn("{} is not a map file, ignoring", fileStatus.getPath());
                            return null;
                        }
                        if (fileStatus.getPath().getName().equals("_logs")) {
                            log.info("{} is probably a log directory from a map/reduce task, skipping", fileStatus.getPath());
                            return null;
                        }
                        try {
                            if (volumeManager.getFileStatus(new Path(fileStatus.getPath(), "data")).isDirectory()) {
                                log.warn("{} is not a map file, ignoring", fileStatus.getPath());
                                return null;
                            }
                        } catch (FileNotFoundException e) {
                            log.warn("{} is not a map file, ignoring", fileStatus.getPath());
                            return null;
                        }
                    }
                    Path path = new Path(createNewBulkDir, "I" + uniqueNameAllocator.getNextName() + "." + str2);
                    try {
                        volumeManager.rename(fileStatus.getPath(), path);
                        log.debug("Moved {} to {}", fileStatus.getPath(), path);
                    } catch (IOException e2) {
                        log.error("Could not move: {} {}", fileStatus.getPath(), e2.getMessage());
                    }
                    return null;
                } catch (Exception e3) {
                    return e3;
                }
            }));
        }
        createExecutorService.shutdown();
        do {
        } while (!createExecutorService.awaitTermination(1000L, TimeUnit.MILLISECONDS));
        for (Future future : arrayList) {
            if (future.get() != null) {
                throw ((Exception) future.get());
            }
        }
        return createNewBulkDir.toString();
    }

    @Override // org.apache.accumulo.manager.tableOps.ManagerRepo
    public void undo(long j, Manager manager) throws Exception {
        Utils.unreserveHdfsDirectory(manager, this.sourceDir, j);
        Utils.unreserveHdfsDirectory(manager, this.errorDir, j);
        Utils.getReadLock(manager, this.tableId, j).unlock();
        TransactionWatcher.ZooArbitrator.cleanup(manager.getContext(), "bulkTx", j);
    }
}
