package org.apache.accumulo.tserver.log;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/tserver/log/LogSorter.class */
public class LogSorter {
    private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
    VolumeManager fs;
    AccumuloConfiguration conf;
    private final Map<String, LogProcessor> currentWork = Collections.synchronizedMap(new HashMap());
    ThreadPoolExecutor threadPool;
    private final Instance instance;

    /* loaded from: input_file:org/apache/accumulo/tserver/log/LogSorter$LogProcessor.class */
    class LogProcessor implements DistributedWorkQueue.Processor {
        private FSDataInputStream input;
        private DataInputStream decryptingInput;
        private long bytesCopied = -1;
        private long sortStart = 0;
        private long sortStop = -1;

        LogProcessor() {
        }

        public DistributedWorkQueue.Processor newProcessor() {
            return new LogProcessor();
        }

        public void process(String str, byte[] bArr) {
            String[] split = new String(bArr).split("\\|");
            String str2 = split[0];
            String str3 = split[1];
            String name = new Path(str2).getName();
            LogSorter.log.debug("Sorting " + str2 + " to " + str3 + " using sortId " + name);
            synchronized (LogSorter.this.currentWork) {
                if (LogSorter.this.currentWork.containsKey(name)) {
                    return;
                }
                LogSorter.this.currentWork.put(name, this);
                try {
                    sort(name, new Path(str2), str3);
                    LogSorter.this.currentWork.remove(name);
                } catch (Throwable th) {
                    LogSorter.this.currentWork.remove(name);
                    throw th;
                }
            }
        }

        public void sort(String str, Path path, String str2) {
            synchronized (this) {
                this.sortStart = System.currentTimeMillis();
            }
            String name = Thread.currentThread().getName();
            int i = 0;
            try {
                try {
                    if (LogSorter.this.fs.exists(SortedLogState.getFinishedMarkerPath(str2))) {
                        LogSorter.log.debug("Sorting already finished at {}", str2);
                        Thread.currentThread().setName(name);
                        try {
                            close();
                        } catch (Exception e) {
                            LogSorter.log.error("Error during cleanup sort/copy " + str, e);
                        }
                        synchronized (this) {
                            this.sortStop = System.currentTimeMillis();
                        }
                        return;
                    }
                    LogSorter.log.info("Copying " + path + " to " + str2);
                    LogSorter.this.fs.deleteRecursively(new Path(str2));
                    FSDataInputStream open = LogSorter.this.fs.open(path);
                    try {
                        try {
                            DfsLogger.DFSLoggerInputStreams readHeaderAndReturnStream = DfsLogger.readHeaderAndReturnStream(open, LogSorter.this.conf);
                            this.input = readHeaderAndReturnStream.getOriginalInput();
                            this.decryptingInput = readHeaderAndReturnStream.getDecryptingInputStream();
                            long memoryInBytes = LogSorter.this.conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
                            Thread.currentThread().setName("Sorting " + str + " for recovery");
                            while (true) {
                                ArrayList arrayList = new ArrayList();
                                try {
                                    long pos = this.input.getPos();
                                    while (this.input.getPos() - pos < memoryInBytes) {
                                        LogFileKey logFileKey = new LogFileKey();
                                        LogFileValue logFileValue = new LogFileValue();
                                        logFileKey.readFields(this.decryptingInput);
                                        logFileValue.readFields(this.decryptingInput);
                                        arrayList.add(new Pair(logFileKey, logFileValue));
                                    }
                                    int i2 = i;
                                    i++;
                                    writeBuffer(str2, arrayList, i2);
                                    arrayList.clear();
                                } catch (EOFException e2) {
                                    int i3 = i;
                                    int i4 = i + 1;
                                    writeBuffer(str2, arrayList, i3);
                                    LogSorter.this.fs.create(new Path(str2, "finished")).close();
                                    LogSorter.log.info("Finished log sort " + str + " " + getBytesCopied() + " bytes " + i4 + " parts in " + getSortTime() + "ms");
                                    if (open != null) {
                                        open.close();
                                    }
                                    Thread.currentThread().setName(name);
                                    try {
                                        close();
                                    } catch (Exception e3) {
                                        LogSorter.log.error("Error during cleanup sort/copy " + str, e3);
                                    }
                                    synchronized (this) {
                                        this.sortStop = System.currentTimeMillis();
                                        return;
                                    }
                                }
                            }
                        } catch (DfsLogger.LogHeaderIncompleteException e4) {
                            LogSorter.log.warn("Could not read header from write-ahead log " + path + ". Not sorting.");
                            LogSorter.this.fs.mkdirs(new Path(str2));
                            int i5 = 0 + 1;
                            writeBuffer(str2, Collections.emptyList(), 0);
                            LogSorter.this.fs.create(SortedLogState.getFinishedMarkerPath(str2)).close();
                            if (open != null) {
                                open.close();
                            }
                            Thread.currentThread().setName(name);
                            try {
                                close();
                            } catch (Exception e5) {
                                LogSorter.log.error("Error during cleanup sort/copy " + str, e5);
                            }
                            synchronized (this) {
                                this.sortStop = System.currentTimeMillis();
                            }
                        }
                    } catch (Throwable th) {
                        if (open != null) {
                            try {
                                open.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    try {
                        LogSorter.this.fs.mkdirs(new Path(str2));
                        LogSorter.this.fs.create(SortedLogState.getFailedMarkerPath(str2)).close();
                    } catch (IOException e6) {
                        LogSorter.log.error("Error creating failed flag file " + str, e6);
                    }
                    LogSorter.log.error("Caught throwable", th3);
                    Thread.currentThread().setName(name);
                    try {
                        close();
                    } catch (Exception e7) {
                        LogSorter.log.error("Error during cleanup sort/copy " + str, e7);
                    }
                    synchronized (this) {
                        this.sortStop = System.currentTimeMillis();
                    }
                }
            } catch (Throwable th4) {
                Thread.currentThread().setName(name);
                try {
                    close();
                } catch (Exception e8) {
                    LogSorter.log.error("Error during cleanup sort/copy " + str, e8);
                }
                synchronized (this) {
                    this.sortStop = System.currentTimeMillis();
                    throw th4;
                }
            }
        }

        private void writeBuffer(String str, List<Pair<LogFileKey, LogFileValue>> list, int i) throws IOException {
            Path path = new Path(str, String.format("part-r-%05d", Integer.valueOf(i)));
            FileSystem fileSystem = LogSorter.this.fs.getVolumeByPath(path).getFileSystem();
            MapFile.Writer writer = new MapFile.Writer(fileSystem.getConf(), fileSystem.makeQualified(path), new SequenceFile.Writer.Option[]{MapFile.Writer.keyClass(LogFileKey.class), MapFile.Writer.valueClass(LogFileValue.class)});
            try {
                Collections.sort(list, new Comparator<Pair<LogFileKey, LogFileValue>>() { // from class: org.apache.accumulo.tserver.log.LogSorter.LogProcessor.1
                    @Override // java.util.Comparator
                    public int compare(Pair<LogFileKey, LogFileValue> pair, Pair<LogFileKey, LogFileValue> pair2) {
                        return ((LogFileKey) pair.getFirst()).compareTo((LogFileKey) pair2.getFirst());
                    }
                });
                for (Pair<LogFileKey, LogFileValue> pair : list) {
                    writer.append((WritableComparable) pair.getFirst(), (Writable) pair.getSecond());
                }
            } finally {
                writer.close();
            }
        }

        synchronized void close() throws IOException {
            if (null != this.input) {
                this.bytesCopied = this.input.getPos();
                this.input.close();
                this.decryptingInput.close();
                this.input = null;
            }
        }

        public synchronized long getSortTime() {
            if (this.sortStart > 0) {
                return this.sortStop > 0 ? this.sortStop - this.sortStart : System.currentTimeMillis() - this.sortStart;
            }
            return 0L;
        }

        synchronized long getBytesCopied() throws IOException {
            return this.input == null ? this.bytesCopied : this.input.getPos();
        }
    }

    public LogSorter(Instance instance, VolumeManager volumeManager, AccumuloConfiguration accumuloConfiguration) {
        this.instance = instance;
        this.fs = volumeManager;
        this.conf = accumuloConfiguration;
        this.threadPool = new SimpleThreadPool(accumuloConfiguration.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT), getClass().getName());
    }

    public void startWatchingForRecoveryLogs(ThreadPoolExecutor threadPoolExecutor) throws KeeperException, InterruptedException {
        this.threadPool = threadPoolExecutor;
        new DistributedWorkQueue(ZooUtil.getRoot(this.instance) + "/recovery", this.conf).startProcessing(new LogProcessor(), this.threadPool);
    }

    public List<RecoveryStatus> getLogSorts() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.currentWork) {
            for (Map.Entry<String, LogProcessor> entry : this.currentWork.entrySet()) {
                RecoveryStatus recoveryStatus = new RecoveryStatus();
                recoveryStatus.name = entry.getKey();
                try {
                    recoveryStatus.progress = entry.getValue().getBytesCopied() / (0.0d + this.conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
                } catch (IOException e) {
                    log.warn("Error getting bytes read");
                }
                recoveryStatus.runtime = (int) entry.getValue().getSortTime();
                arrayList.add(recoveryStatus);
            }
        }
        return arrayList;
    }
}
