package org.apache.accumulo.tserver.log;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/tserver/log/LogSorter.class */
public class LogSorter {
    private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
    AccumuloConfiguration sortedLogConf;
    private final Map<String, LogProcessor> currentWork = Collections.synchronizedMap(new HashMap());
    ThreadPoolExecutor threadPool;
    private final ServerContext context;
    private final double walBlockSize;
    private final CryptoService cryptoService;

    /* loaded from: input_file:org/apache/accumulo/tserver/log/LogSorter$LogProcessor.class */
    class LogProcessor implements DistributedWorkQueue.Processor {
        private FSDataInputStream input;
        private DataInputStream decryptingInput;
        private long bytesCopied = -1;
        private long sortStart = 0;
        private long sortStop = -1;

        LogProcessor() {
        }

        public DistributedWorkQueue.Processor newProcessor() {
            return new LogProcessor();
        }

        public void process(String str, byte[] bArr) {
            String[] split = new String(bArr).split("\\|");
            String str2 = split[0];
            String str3 = split[1];
            String name = new Path(str2).getName();
            LogSorter.log.debug("Sorting {} to {} using sortId {}", new Object[]{str2, str3, name});
            synchronized (LogSorter.this.currentWork) {
                if (LogSorter.this.currentWork.containsKey(name)) {
                    return;
                }
                LogSorter.this.currentWork.put(name, this);
                synchronized (this) {
                    this.sortStart = System.currentTimeMillis();
                }
                VolumeManager volumeManager = LogSorter.this.context.getVolumeManager();
                String name2 = Thread.currentThread().getName();
                try {
                    try {
                        sort(volumeManager, name, new Path(str2), str3);
                        Thread.currentThread().setName(name2);
                        try {
                            close();
                        } catch (Exception e) {
                            LogSorter.log.error("Error during cleanup sort/copy " + name, e);
                        }
                        synchronized (this) {
                            this.sortStop = System.currentTimeMillis();
                        }
                        LogSorter.this.currentWork.remove(name);
                    } catch (Throwable th) {
                        Thread.currentThread().setName(name2);
                        try {
                            close();
                        } catch (Exception e2) {
                            LogSorter.log.error("Error during cleanup sort/copy " + name, e2);
                        }
                        synchronized (this) {
                            this.sortStop = System.currentTimeMillis();
                            LogSorter.this.currentWork.remove(name);
                            throw th;
                        }
                    }
                } catch (Exception e3) {
                    try {
                        volumeManager.mkdirs(new Path(str3));
                        volumeManager.create(SortedLogState.getFailedMarkerPath(str3)).close();
                    } catch (IOException e4) {
                        LogSorter.log.error("Error creating failed flag file " + name, e4);
                    }
                    LogSorter.log.error("Caught exception", e3);
                    Thread.currentThread().setName(name2);
                    try {
                        close();
                    } catch (Exception e5) {
                        LogSorter.log.error("Error during cleanup sort/copy " + name, e5);
                    }
                    synchronized (this) {
                        this.sortStop = System.currentTimeMillis();
                        LogSorter.this.currentWork.remove(name);
                    }
                }
            }
        }

        public void sort(VolumeManager volumeManager, String str, Path path, String str2) throws IOException {
            int i = 0;
            if (volumeManager.exists(SortedLogState.getFinishedMarkerPath(str2))) {
                LogSorter.log.debug("Sorting already finished at {}", str2);
                return;
            }
            LogSorter.log.info("Copying {} to {}", path, str2);
            volumeManager.deleteRecursively(new Path(str2));
            this.input = volumeManager.open(path);
            try {
                this.input.setDropBehind(Boolean.TRUE);
            } catch (IOException e) {
                LogSorter.log.debug("IOException setting drop behind for file: {}, msg: {}", this.input, e.getMessage());
            } catch (UnsupportedOperationException e2) {
                LogSorter.log.debug("setDropBehind reads not enabled for wal file: {}", this.input);
            }
            try {
                this.decryptingInput = DfsLogger.getDecryptingStream(this.input, LogSorter.this.cryptoService);
                long asBytes = LogSorter.this.sortedLogConf.getAsBytes(LogSorter.this.sortedLogConf.resolve(Property.TSERV_WAL_SORT_BUFFER_SIZE, new Property[]{Property.TSERV_SORT_BUFFER_SIZE}));
                Thread.currentThread().setName("Sorting " + str + " for recovery");
                while (true) {
                    ArrayList arrayList = new ArrayList();
                    try {
                        long pos = this.input.getPos();
                        while (this.input.getPos() - pos < asBytes) {
                            LogFileKey logFileKey = new LogFileKey();
                            LogFileValue logFileValue = new LogFileValue();
                            logFileKey.readFields(this.decryptingInput);
                            logFileValue.readFields(this.decryptingInput);
                            arrayList.add(new Pair(logFileKey, logFileValue));
                        }
                        int i2 = i;
                        i++;
                        LogSorter.this.writeBuffer(str2, arrayList, i2);
                        arrayList.clear();
                    } catch (EOFException e3) {
                        int i3 = i;
                        int i4 = i + 1;
                        LogSorter.this.writeBuffer(str2, arrayList, i3);
                        volumeManager.create(new Path(str2, "finished")).close();
                        LogSorter.log.info("Finished log sort {} {} bytes {} parts in {}ms", new Object[]{str, Long.valueOf(getBytesCopied()), Integer.valueOf(i4), Long.valueOf(getSortTime())});
                        return;
                    }
                }
            } catch (DfsLogger.LogHeaderIncompleteException e4) {
                LogSorter.log.warn("Could not read header from write-ahead log {}. Not sorting.", path);
                volumeManager.mkdirs(new Path(str2));
                int i5 = 0 + 1;
                LogSorter.this.writeBuffer(str2, Collections.emptyList(), 0);
                volumeManager.create(SortedLogState.getFinishedMarkerPath(str2)).close();
            }
        }

        synchronized void close() throws IOException {
            if (this.input != null) {
                this.bytesCopied = this.input.getPos();
                this.input.close();
                if (this.decryptingInput != null) {
                    this.decryptingInput.close();
                }
                this.input = null;
            }
        }

        public synchronized long getSortTime() {
            if (this.sortStart > 0) {
                return this.sortStop > 0 ? this.sortStop - this.sortStart : System.currentTimeMillis() - this.sortStart;
            }
            return 0L;
        }

        synchronized long getBytesCopied() throws IOException {
            return this.input == null ? this.bytesCopied : this.input.getPos();
        }
    }

    public LogSorter(ServerContext serverContext, AccumuloConfiguration accumuloConfiguration) {
        this.context = serverContext;
        this.sortedLogConf = extractSortedLogConfig(accumuloConfiguration);
        this.threadPool = ThreadPools.getServerThreadPools().createFixedThreadPool(accumuloConfiguration.getCount(accumuloConfiguration.resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, new Property[]{Property.TSERV_RECOVERY_MAX_CONCURRENT})), getClass().getName(), true);
        this.walBlockSize = DfsLogger.getWalBlockSize(accumuloConfiguration);
        this.cryptoService = serverContext.getCryptoFactory().getService(new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY), accumuloConfiguration.getAllCryptoProperties());
    }

    private AccumuloConfiguration extractSortedLogConfig(AccumuloConfiguration accumuloConfiguration) {
        Map allPropertiesWithPrefixStripped = accumuloConfiguration.getAllPropertiesWithPrefixStripped(Property.TSERV_WAL_SORT_FILE_PREFIX);
        ConfigurationCopy configurationCopy = new ConfigurationCopy(accumuloConfiguration);
        allPropertiesWithPrefixStripped.forEach((str, str2) -> {
            String str = "table.file." + str;
            if (!Property.isValidProperty(str, str2) || !Property.isValidTablePropertyKey(str)) {
                throw new IllegalArgumentException("Invalid sort file property " + str + "=" + str2);
            }
            log.debug("Using property for writing sorted files: {}={}", str, str2);
            configurationCopy.set(str, str2);
        });
        return configurationCopy;
    }

    @VisibleForTesting
    void writeBuffer(String str, List<Pair<LogFileKey, LogFileValue>> list, int i) throws IOException {
        Path path = new Path(str, String.format("part-r-%05d.rf", Integer.valueOf(i)));
        FileSystem fileSystemByPath = this.context.getVolumeManager().getFileSystemByPath(path);
        Path makeQualified = fileSystemByPath.makeQualified(path);
        TreeMap treeMap = new TreeMap();
        for (Pair<LogFileKey, LogFileValue> pair : list) {
            LogFileKey logFileKey = (LogFileKey) pair.getFirst();
            LogFileValue logFileValue = (LogFileValue) pair.getSecond();
            List list2 = (List) treeMap.putIfAbsent(logFileKey.toKey(), logFileValue.mutations);
            if (list2 != null) {
                ArrayList arrayList = new ArrayList(list2);
                arrayList.addAll(logFileValue.mutations);
                treeMap.put(logFileKey.toKey(), arrayList);
            }
        }
        FileSKVWriter build = FileOperations.getInstance().newWriterBuilder().forFile(makeQualified.toString(), fileSystemByPath, fileSystemByPath.getConf(), this.cryptoService).withTableConfiguration(this.sortedLogConf).build();
        try {
            build.startDefaultLocalityGroup();
            for (Map.Entry entry : treeMap.entrySet()) {
                LogFileValue logFileValue2 = new LogFileValue();
                logFileValue2.mutations = (List) entry.getValue();
                build.append((Key) entry.getKey(), logFileValue2.toValue());
            }
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void startWatchingForRecoveryLogs(ThreadPoolExecutor threadPoolExecutor) throws KeeperException, InterruptedException {
        this.threadPool = threadPoolExecutor;
        new DistributedWorkQueue(this.context.getZooKeeperRoot() + "/recovery", this.sortedLogConf, this.context).startProcessing(new LogProcessor(), this.threadPool);
    }

    public List<RecoveryStatus> getLogSorts() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.currentWork) {
            for (Map.Entry<String, LogProcessor> entry : this.currentWork.entrySet()) {
                RecoveryStatus recoveryStatus = new RecoveryStatus();
                recoveryStatus.name = entry.getKey();
                try {
                    recoveryStatus.progress = Math.min(entry.getValue().getBytesCopied() / this.walBlockSize, 99.9d);
                } catch (IOException e) {
                    log.warn("Error getting bytes read");
                }
                recoveryStatus.runtime = (int) entry.getValue().getSortTime();
                arrayList.add(recoveryStatus);
            }
        }
        return arrayList;
    }
}
