package org.apache.tajo.util.history;

import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.metrics.reporter.MetricsFileScheduledReporter;

/* loaded from: input_file:org/apache/tajo/util/history/HistoryWriter.class */
public class HistoryWriter extends AbstractService {
    private static final Log LOG = LogFactory.getLog(HistoryWriter.class);
    public static final String HISTORY_QUERY_REPLICATION = "tajo.history.query.replication";
    public static final String HISTORY_TASK_REPLICATION = "tajo.history.task.replication";
    public static final String QUERY_LIST = "query-list";
    public static final String QUERY_DETAIL = "query-detail";
    public static final String HISTORY_FILE_POSTFIX = ".hist";
    private final LinkedBlockingQueue<WriterFuture<WriterHolder>> historyQueue;
    private Map<String, WriterHolder> taskWriters;
    private WriterHolder querySummaryWriter;
    private WriterThread writerThread;
    private AtomicBoolean stopped;
    private Path historyParentPath;
    private Path taskHistoryParentPath;
    private String processName;
    private TajoConf tajoConf;
    private HistoryCleaner historyCleaner;
    private boolean isMaster;
    private short queryReplication;
    private short taskReplication;

    /* loaded from: input_file:org/apache/tajo/util/history/HistoryWriter$WriterFuture.class */
    public static class WriterFuture<T> implements Future<T> {
        private T result;
        private History history;
        private Throwable error;
        private boolean done = false;
        private CountDownLatch latch = new CountDownLatch(1);

        public WriterFuture(History history) {
            this.history = history;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public History getHistory() {
            return this.history;
        }

        public void done(T t) {
            this.result = t;
            this.done = true;
            this.latch.countDown();
        }

        public void failed(Throwable th) {
            this.error = th;
            done(null);
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.done;
        }

        public boolean isSucceed() {
            return this.error == null;
        }

        public Throwable getError() {
            return this.error;
        }

        @Override // java.util.concurrent.Future
        public T get() throws InterruptedException {
            this.latch.await();
            return this.result;
        }

        @Override // java.util.concurrent.Future
        public T get(long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
            if (this.latch.await(j, timeUnit)) {
                return this.result;
            }
            throw new TimeoutException();
        }
    }

    /* loaded from: input_file:org/apache/tajo/util/history/HistoryWriter$WriterHolder.class */
    public static class WriterHolder implements Closeable {
        long lastWritingTime;
        Path path;
        FSDataOutputStream out;

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.out != null) {
                this.out.close();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flush() throws IOException {
            if (this.out != null) {
                this.out.hsync();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tajo/util/history/HistoryWriter$WriterThread.class */
    public class WriterThread extends Thread {
        private AtomicBoolean needTaskFlush = new AtomicBoolean(false);

        WriterThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            HistoryWriter.LOG.info("HistoryWriter_" + HistoryWriter.this.processName + " started.");
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHH");
            while (!HistoryWriter.this.stopped.get() && !Thread.interrupted()) {
                List<WriterFuture<WriterHolder>> newArrayList = Lists.newArrayList();
                try {
                    drainHistory(newArrayList, 100, 1000L);
                } catch (InterruptedException e) {
                    if (HistoryWriter.this.stopped.get()) {
                        break;
                    }
                }
                try {
                } catch (Throwable th) {
                    HistoryWriter.LOG.error(th.getMessage(), th);
                }
                if (!HistoryWriter.this.stopped.get() && !newArrayList.isEmpty()) {
                    writeHistory(newArrayList);
                    Calendar calendar = Calendar.getInstance();
                    calendar.add(11, -2);
                    String format = simpleDateFormat.format(calendar.getTime());
                    ArrayList arrayList = new ArrayList();
                    for (String str : HistoryWriter.this.taskWriters.keySet()) {
                        if (str.compareTo(format) <= 0) {
                            arrayList.add(str);
                        }
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        WriterHolder writerHolder = (WriterHolder) HistoryWriter.this.taskWriters.remove((String) it.next());
                        if (writerHolder != null) {
                            HistoryWriter.LOG.info("Closing task history file: " + writerHolder.path);
                            IOUtils.cleanup(HistoryWriter.LOG, new Closeable[]{writerHolder});
                        }
                    }
                }
            }
            HistoryWriter.LOG.info("HistoryWriter_" + HistoryWriter.this.processName + " stopped.");
        }

        private int drainHistory(Collection<WriterFuture<WriterHolder>> collection, int i, long j) throws InterruptedException {
            long nanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(j);
            int i2 = 0;
            while (i2 < i) {
                i2 += HistoryWriter.this.historyQueue.drainTo(collection, i - i2);
                if (i2 < i) {
                    WriterFuture<WriterHolder> writerFuture = (WriterFuture) HistoryWriter.this.historyQueue.poll(nanoTime - System.nanoTime(), TimeUnit.NANOSECONDS);
                    if (writerFuture == null) {
                        break;
                    }
                    collection.add(writerFuture);
                    i2++;
                }
            }
            return i2;
        }

        private List<WriterFuture<WriterHolder>> writeHistory(List<WriterFuture<WriterHolder>> list) {
            if (list.isEmpty()) {
                return list;
            }
            for (WriterFuture<WriterHolder> writerFuture : list) {
                History history = writerFuture.getHistory();
                switch (history.getHistoryType()) {
                    case TASK:
                        try {
                            writerFuture.done(writeTaskHistory((org.apache.tajo.worker.TaskHistory) history));
                            break;
                        } catch (Throwable th) {
                            HistoryWriter.LOG.error("Error while saving task history: " + ((org.apache.tajo.worker.TaskHistory) history).getTaskAttemptId() + ":" + th.getMessage(), th);
                            writerFuture.failed(th);
                            break;
                        }
                    case QUERY:
                        try {
                            writeQueryHistory((QueryHistory) history);
                            writerFuture.done(null);
                            break;
                        } catch (Throwable th2) {
                            HistoryWriter.LOG.error("Error while saving query history: " + ((QueryHistory) history).getQueryId() + ":" + th2.getMessage(), th2);
                            writerFuture.failed(th2);
                            break;
                        }
                    case QUERY_SUMMARY:
                        try {
                            writerFuture.done(writeQuerySummary((QueryInfo) history));
                            break;
                        } catch (Throwable th3) {
                            HistoryWriter.LOG.error("Error while saving query summary: " + ((QueryInfo) history).getQueryId() + ":" + th3.getMessage(), th3);
                            writerFuture.failed(th3);
                            break;
                        }
                    default:
                        HistoryWriter.LOG.warn("Wrong history type: " + history.getHistoryType());
                        break;
                }
            }
            if (this.needTaskFlush.getAndSet(false)) {
                flushTaskHistories();
            }
            return list;
        }

        private void writeQueryHistory(QueryHistory queryHistory) throws Exception {
            Path queryHistoryFilePath = HistoryWriter.getQueryHistoryFilePath(HistoryWriter.this.historyParentPath, queryHistory.getQueryId());
            FileSystem nonCrcFileSystem = HistoryWriter.getNonCrcFileSystem(queryHistoryFilePath, HistoryWriter.this.tajoConf);
            if (!nonCrcFileSystem.exists(queryHistoryFilePath.getParent()) && !nonCrcFileSystem.mkdirs(queryHistoryFilePath.getParent())) {
                HistoryWriter.LOG.error("Can't make QueryHistory dir: " + queryHistoryFilePath.getParent());
                return;
            }
            Closeable closeable = null;
            try {
                HistoryWriter.LOG.info("Saving query summary: " + queryHistoryFilePath);
                closeable = nonCrcFileSystem.create(queryHistoryFilePath, HistoryWriter.this.queryReplication);
                closeable.write(queryHistory.toJson().getBytes(Bytes.UTF8_CHARSET));
                IOUtils.cleanup(HistoryWriter.LOG, new Closeable[]{closeable});
                if (queryHistory.getStageHistories() != null) {
                    for (StageHistory stageHistory : queryHistory.getStageHistories()) {
                        Path path = new Path(queryHistoryFilePath.getParent(), stageHistory.getExecutionBlockId() + HistoryWriter.HISTORY_FILE_POSTFIX);
                        Closeable closeable2 = null;
                        try {
                            closeable2 = nonCrcFileSystem.create(path, HistoryWriter.this.queryReplication);
                            closeable2.write(stageHistory.toTasksJson().getBytes(Bytes.UTF8_CHARSET));
                            HistoryWriter.LOG.info("Saving query unit: " + path);
                            IOUtils.cleanup(HistoryWriter.LOG, new Closeable[]{closeable2});
                        } finally {
                            IOUtils.cleanup(HistoryWriter.LOG, new Closeable[]{closeable2});
                        }
                    }
                }
            } finally {
                IOUtils.cleanup(HistoryWriter.LOG, new Closeable[]{closeable});
            }
        }

        private WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception {
            if (HistoryWriter.this.stopped.get()) {
                return null;
            }
            if (HistoryWriter.this.querySummaryWriter == null) {
                HistoryWriter.this.querySummaryWriter = new WriterHolder();
                rollingQuerySummaryWriter();
            } else if (HistoryWriter.this.querySummaryWriter.out == null) {
                rollingQuerySummaryWriter();
            } else if (System.currentTimeMillis() - HistoryWriter.this.querySummaryWriter.lastWritingTime >= 3600000) {
                HistoryWriter.LOG.info("Close query history file: " + HistoryWriter.this.querySummaryWriter.path);
                IOUtils.cleanup(HistoryWriter.LOG, new Closeable[]{HistoryWriter.this.querySummaryWriter});
                rollingQuerySummaryWriter();
            }
            byte[] bytes = ("\n" + queryInfo.toJson() + "\n").getBytes(Bytes.UTF8_CHARSET);
            try {
                HistoryWriter.this.querySummaryWriter.out.writeInt(bytes.length);
                HistoryWriter.this.querySummaryWriter.out.write(bytes);
                return HistoryWriter.this.querySummaryWriter;
            } catch (IOException e) {
                IOUtils.cleanup(HistoryWriter.LOG, new Closeable[]{HistoryWriter.this.querySummaryWriter});
                HistoryWriter.this.querySummaryWriter.out = null;
                throw e;
            }
        }

        private void rollingQuerySummaryWriter() throws Exception {
            String format = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
            Path path = new Path(HistoryWriter.this.historyParentPath, format.substring(0, 8) + "/" + HistoryWriter.QUERY_LIST);
            FileSystem nonCrcFileSystem = HistoryWriter.getNonCrcFileSystem(path, HistoryWriter.this.tajoConf);
            if (!nonCrcFileSystem.exists(path) && !nonCrcFileSystem.mkdirs(path)) {
                HistoryWriter.LOG.error("Can't make QueryList history dir: " + path.getParent());
                return;
            }
            Path path2 = new Path(path, "query-list-" + format.substring(8, 14) + HistoryWriter.HISTORY_FILE_POSTFIX);
            HistoryWriter.this.querySummaryWriter.path = path2;
            HistoryWriter.this.querySummaryWriter.lastWritingTime = System.currentTimeMillis();
            HistoryWriter.LOG.info("Create query history file: " + path2);
            HistoryWriter.this.querySummaryWriter.out = nonCrcFileSystem.create(path2, HistoryWriter.this.queryReplication);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushTaskHistories() {
            Iterator it = HistoryWriter.this.taskWriters.values().iterator();
            while (it.hasNext()) {
                try {
                    ((WriterHolder) it.next()).flush();
                } catch (IOException e) {
                    HistoryWriter.LOG.warn(e, e);
                }
            }
        }

        private WriterHolder writeTaskHistory(org.apache.tajo.worker.TaskHistory taskHistory) throws Exception {
            String format = new SimpleDateFormat("yyyyMMddHH").format(new Date(taskHistory.getStartTime()));
            WriterHolder writerHolder = (WriterHolder) HistoryWriter.this.taskWriters.get(format);
            if (writerHolder == null) {
                writerHolder = new WriterHolder();
                writerHolder.out = createTaskHistoryFile(format, writerHolder);
                HistoryWriter.this.taskWriters.put(format, writerHolder);
            }
            writerHolder.lastWritingTime = System.currentTimeMillis();
            if (writerHolder.out != null) {
                try {
                    byte[] byteArray = taskHistory.m1504getProto().toByteArray();
                    writerHolder.out.writeInt(byteArray.length);
                    writerHolder.out.write(byteArray);
                } catch (IOException e) {
                    HistoryWriter.this.taskWriters.remove(format);
                    IOUtils.cleanup(HistoryWriter.LOG, new Closeable[]{writerHolder});
                    throw e;
                }
            }
            return writerHolder;
        }

        private FSDataOutputStream createTaskHistoryFile(String str, WriterHolder writerHolder) throws IOException {
            FileSystem nonCrcFileSystem = HistoryWriter.getNonCrcFileSystem(HistoryWriter.this.taskHistoryParentPath, HistoryWriter.this.tajoConf);
            Path queryTaskHistoryPath = HistoryWriter.getQueryTaskHistoryPath(nonCrcFileSystem, HistoryWriter.this.taskHistoryParentPath, HistoryWriter.this.processName, str);
            if (nonCrcFileSystem.exists(queryTaskHistoryPath) || nonCrcFileSystem.mkdirs(queryTaskHistoryPath.getParent())) {
                writerHolder.path = queryTaskHistoryPath;
                return nonCrcFileSystem.create(queryTaskHistoryPath, false, 4096, HistoryWriter.this.taskReplication, nonCrcFileSystem.getDefaultBlockSize(queryTaskHistoryPath));
            }
            HistoryWriter.LOG.error("Can't make Query history directory: " + queryTaskHistoryPath);
            return null;
        }
    }

    public HistoryWriter(String str, boolean z) {
        super(HistoryWriter.class.getName() + ":" + str);
        this.historyQueue = new LinkedBlockingQueue<>();
        this.taskWriters = new HashMap();
        this.querySummaryWriter = null;
        this.stopped = new AtomicBoolean(false);
        this.processName = str.replaceAll(":", "_").toLowerCase();
        this.isMaster = z;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        if (!(configuration instanceof TajoConf)) {
            throw new IllegalArgumentException("conf should be a TajoConf type.");
        }
        this.tajoConf = (TajoConf) configuration;
        TajoConf tajoConf = this.tajoConf;
        this.historyParentPath = TajoConf.getQueryHistoryDir(this.tajoConf);
        TajoConf tajoConf2 = this.tajoConf;
        this.taskHistoryParentPath = TajoConf.getTaskHistoryDir(this.tajoConf);
        this.writerThread = new WriterThread();
        this.historyCleaner = new HistoryCleaner(this.tajoConf, this.isMaster);
        this.queryReplication = (short) this.tajoConf.getInt(HISTORY_QUERY_REPLICATION, FileSystem.get(this.tajoConf).getDefaultReplication(this.historyParentPath));
        this.taskReplication = (short) this.tajoConf.getInt(HISTORY_TASK_REPLICATION, FileSystem.get(this.tajoConf).getDefaultReplication(this.taskHistoryParentPath));
        super.serviceInit(configuration);
    }

    public void serviceStop() throws Exception {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        Iterator<WriterHolder> it = this.taskWriters.values().iterator();
        while (it.hasNext()) {
            IOUtils.cleanup(LOG, new Closeable[]{it.next()});
        }
        this.taskWriters.clear();
        this.writerThread.interrupt();
        IOUtils.cleanup(LOG, new Closeable[]{this.querySummaryWriter});
        if (this.historyCleaner != null) {
            this.historyCleaner.doStop();
        }
        super.serviceStop();
    }

    public void serviceStart() throws Exception {
        this.writerThread.start();
        this.historyCleaner.start();
    }

    public WriterFuture<WriterHolder> appendHistory(History history) {
        WriterFuture<WriterHolder> writerFuture = new WriterFuture<>(history);
        this.historyQueue.add(writerFuture);
        return writerFuture;
    }

    public void appendHistory(WriterFuture<WriterHolder> writerFuture) {
        this.historyQueue.add(writerFuture);
        synchronized (this.writerThread) {
            this.writerThread.notifyAll();
        }
    }

    public WriterFuture<WriterHolder> appendAndFlush(History history) {
        WriterFuture<WriterHolder> writerFuture = new WriterFuture<WriterHolder>(history) { // from class: org.apache.tajo.util.history.HistoryWriter.1
            @Override // org.apache.tajo.util.history.HistoryWriter.WriterFuture
            public void done(WriterHolder writerHolder) {
                if (writerHolder != null) {
                    try {
                        writerHolder.flush();
                    } catch (IOException e) {
                        super.failed(e);
                        return;
                    }
                }
                super.done((AnonymousClass1) writerHolder);
            }
        };
        this.historyQueue.add(writerFuture);
        synchronized (this.writerThread) {
            this.writerThread.notifyAll();
        }
        return writerFuture;
    }

    public void appendAndSync(History history) throws TimeoutException, InterruptedException, IOException {
        WriterFuture<WriterHolder> appendAndFlush = appendAndFlush(history);
        appendAndFlush.get(5L, TimeUnit.SECONDS);
        if (!appendAndFlush.isSucceed()) {
            throw new IOException(appendAndFlush.getError());
        }
    }

    public void flushTaskHistories() {
        if (this.historyQueue.size() <= 0) {
            this.writerThread.flushTaskHistories();
            return;
        }
        synchronized (this.writerThread) {
            this.writerThread.needTaskFlush.set(true);
            this.writerThread.notifyAll();
        }
    }

    public static FileSystem getNonCrcFileSystem(Path path, Configuration configuration) throws IOException {
        FileSystem fileSystem = path.getFileSystem(configuration);
        if (path.toUri().getScheme().equals(MetricsFileScheduledReporter.REPORTER_NAME)) {
            fileSystem.setWriteChecksum(false);
        }
        return fileSystem;
    }

    public static Path getQueryHistoryFilePath(Path path, String str, long j) {
        return new Path(new Path(path, new SimpleDateFormat("yyyyMMdd").format(Long.valueOf(j)) + "/" + QUERY_DETAIL), str + "/query" + HISTORY_FILE_POSTFIX);
    }

    public static Path getQueryHistoryFilePath(Path path, String str) {
        Path path2;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        try {
            String[] split = str.split("_");
            path2 = split.length == 3 ? new Path(path, simpleDateFormat.format(split[1]) + "/" + QUERY_DETAIL) : new Path(path, simpleDateFormat.format(new Date(System.currentTimeMillis())) + "/" + QUERY_DETAIL);
        } catch (Exception e) {
            path2 = new Path(path, simpleDateFormat.format(new Date(System.currentTimeMillis())) + "/" + QUERY_DETAIL);
        }
        return new Path(path2, str + "/query" + HISTORY_FILE_POSTFIX);
    }

    public static Path getQueryTaskHistoryPath(FileSystem fileSystem, Path path, String str, String str2) throws IOException {
        int indexOf;
        Path path2 = new Path(path, str2.substring(0, 8) + "/tasks/" + str);
        String substring = str2.substring(8, 10);
        int i = -1;
        if (!fileSystem.exists(path2)) {
            return new Path(path2, str + "_" + substring + "_" + ((-1) + 1) + HISTORY_FILE_POSTFIX);
        }
        if (!fileSystem.isDirectory(path2)) {
            throw new IOException("Task history path is not directory: " + path2);
        }
        FileStatus[] listStatus = fileSystem.listStatus(path2);
        if (listStatus != null) {
            for (FileStatus fileStatus : listStatus) {
                String[] split = fileStatus.getPath().getName().split("_");
                if (split.length == 4 && split[2].equals(substring) && (indexOf = split[3].indexOf(".")) > 0) {
                    try {
                        int parseInt = Integer.parseInt(split[3].substring(0, indexOf));
                        if (parseInt > i) {
                            i = parseInt;
                        }
                    } catch (NumberFormatException e) {
                    }
                }
            }
        }
        return new Path(path2, str + "_" + substring + "_" + (i + 1) + HISTORY_FILE_POSTFIX);
    }
}
