package com.twitter.elephantbird.mapreduce.output;

import com.google.common.io.Files;
import com.twitter.elephantbird.util.PathFilters;
import com.twitter.elephantbird.util.TaskHeartbeatThread;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/elephantbird/mapreduce/output/LuceneIndexOutputFormat.class */
public abstract class LuceneIndexOutputFormat<K, V> extends FileOutputFormat<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(LuceneIndexOutputFormat.class);

    /* loaded from: input_file:com/twitter/elephantbird/mapreduce/output/LuceneIndexOutputFormat$IndexRecordWriter.class */
    private class IndexRecordWriter extends RecordWriter<K, V> {
        private IndexWriter writer;
        private FileOutputCommitter committer;
        private File tmpDirFile;
        private long recordsProcessed;

        private IndexRecordWriter(IndexWriter indexWriter, FileOutputCommitter fileOutputCommitter, File file) {
            this.recordsProcessed = 0L;
            this.writer = indexWriter;
            this.committer = fileOutputCommitter;
            this.tmpDirFile = file;
        }

        public void write(K k, V v) throws IOException {
            this.recordsProcessed++;
            if (this.recordsProcessed % 1000000 == 0) {
                LuceneIndexOutputFormat.LOG.info("Processing record " + this.recordsProcessed);
            }
            this.writer.addDocument(LuceneIndexOutputFormat.this.buildDocument(k, v));
        }

        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            TaskHeartbeatThread taskHeartbeatThread = new TaskHeartbeatThread(taskAttemptContext) { // from class: com.twitter.elephantbird.mapreduce.output.LuceneIndexOutputFormat.IndexRecordWriter.1
                public void progress() {
                    String[] list = IndexRecordWriter.this.tmpDirFile.list();
                    if (list == null) {
                        LuceneIndexOutputFormat.LOG.info("Done optimizing segments, heartbeat thread still alive");
                    } else {
                        LuceneIndexOutputFormat.LOG.info("Optimizing " + (list.length - 2) + " segments");
                    }
                }
            };
            try {
                try {
                    LuceneIndexOutputFormat.LOG.info("Starting heartbeat thread");
                    taskHeartbeatThread.start();
                    Path path = new Path(this.committer.getWorkPath(), "index-" + String.valueOf(taskAttemptContext.getTaskAttemptID().getTaskID().getId()));
                    this.writer.forceMerge(1);
                    this.writer.close();
                    FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());
                    LuceneIndexOutputFormat.LOG.info("Copying index to HDFS...");
                    if (!FileUtil.copy(this.tmpDirFile, fileSystem, path, true, taskAttemptContext.getConfiguration())) {
                        throw new IOException("Failed to copy local index to HDFS!");
                    }
                    LuceneIndexOutputFormat.LOG.info("Index written to: " + path);
                    LuceneIndexOutputFormat.LOG.info("Stopping heartbeat thread");
                    taskHeartbeatThread.stop();
                } catch (IOException e) {
                    LuceneIndexOutputFormat.LOG.error("Error committing index", e);
                    throw e;
                }
            } catch (Throwable th) {
                LuceneIndexOutputFormat.LOG.info("Stopping heartbeat thread");
                taskHeartbeatThread.stop();
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/twitter/elephantbird/mapreduce/output/LuceneIndexOutputFormat$NeverTokenizeAnalyzer.class */
    public static class NeverTokenizeAnalyzer extends Analyzer {
        protected Analyzer.TokenStreamComponents createComponents(String str, Reader reader) {
            throw new UnsupportedOperationException();
        }
    }

    protected abstract Document buildDocument(K k, V v) throws IOException;

    protected Analyzer newAnalyzer(Configuration configuration) {
        return new NeverTokenizeAnalyzer();
    }

    protected Directory getDirectoryImplementation(File file) throws IOException {
        return new SimpleFSDirectory(file, NoLockFactory.getNoLockFactory());
    }

    public static IndexWriter createIndexWriter(Directory directory, Analyzer analyzer) throws IOException {
        return createIndexWriter(directory, analyzer, 10);
    }

    public static IndexWriter createIndexWriter(Directory directory, Analyzer analyzer, int i) throws IOException {
        LOG.info("Creating IndexWriter with:\nDirectory: " + directory + "\nAnalyzer: " + analyzer + "\nMerge Factor: " + i);
        IndexWriterConfig indexWriterConfig = new IndexWriterConfig(Version.LUCENE_40, analyzer);
        LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy();
        logByteSizeMergePolicy.setMergeFactor(i);
        logByteSizeMergePolicy.setUseCompoundFile(false);
        indexWriterConfig.setMergePolicy(logByteSizeMergePolicy);
        indexWriterConfig.setMergeScheduler(new SerialMergeScheduler());
        return new IndexWriter(directory, indexWriterConfig);
    }

    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        FileOutputCommitter outputCommitter = getOutputCommitter(taskAttemptContext);
        File createTempDir = Files.createTempDir();
        return new IndexRecordWriter(createIndexWriter(getDirectoryImplementation(createTempDir), newAnalyzer(taskAttemptContext.getConfiguration())), outputCommitter, createTempDir);
    }

    public static void setOutputPath(Job job, Path path) {
        FileOutputFormat.setOutputPath(job, path);
    }

    public static PathFilter newIndexDirFilter(Configuration configuration) {
        return new PathFilters.CompositePathFilter(PathFilters.newExcludeFilesFilter(configuration), new PathFilter[]{PathFilters.EXCLUDE_HIDDEN_PATHS_FILTER, new PathFilter() { // from class: com.twitter.elephantbird.mapreduce.output.LuceneIndexOutputFormat.1
            public boolean accept(Path path) {
                return path.getName().startsWith("index-");
            }
        }});
    }
}
