package com.ning.metrics.collector.hadoop.processing;

import com.google.inject.Inject;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.collector.hadoop.writer.FileSystemAccess;
import com.ning.metrics.collector.util.NamedThreadFactory;
import com.ning.metrics.serialization.event.Granularity;
import com.ning.metrics.serialization.event.GranularityPathMapper;
import com.ning.metrics.serialization.writer.CallbackHandler;
import com.ning.metrics.serialization.writer.DiskSpoolEventWriter;
import com.ning.metrics.serialization.writer.EventHandler;
import com.ning.metrics.serialization.writer.EventWriter;
import com.ning.metrics.serialization.writer.SyncType;
import com.ning.metrics.serialization.writer.ThresholdEventWriter;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.axis.utils.StringUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/ning/metrics/collector/hadoop/processing/HadoopWriterFactory.class */
public class HadoopWriterFactory implements PersistentWriterFactory {
    private static final Logger log = Logger.getLogger(HadoopWriterFactory.class);
    private static final DateTimeFormatter dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH.mm.ss").withZone(DateTimeZone.UTC);
    private final CollectorConfig config;
    private final FileSystemAccess hdfsAccess;
    private final AtomicBoolean flushEnabled = new AtomicBoolean(true);

    @Inject
    public HadoopWriterFactory(FileSystemAccess fileSystemAccess, CollectorConfig collectorConfig) {
        this.hdfsAccess = fileSystemAccess;
        this.config = collectorConfig;
    }

    @Override // com.ning.metrics.collector.hadoop.processing.PersistentWriterFactory
    public EventWriter createPersistentWriter(final WriterStats writerStats, final SerializationType serializationType, String str, final String str2) {
        final DateTime dateTime = new DateTime();
        return new ThresholdEventWriter(new DiskSpoolEventWriter(new EventHandler() { // from class: com.ning.metrics.collector.hadoop.processing.HadoopWriterFactory.1
            private int flushCount = 0;

            @Override // com.ning.metrics.serialization.writer.EventHandler
            public void handle(File file, CallbackHandler callbackHandler) {
                if (HadoopWriterFactory.this.flushEnabled.get()) {
                    try {
                        String format = String.format("%s/%s-%d-%s-f%d.%s", str2, HadoopWriterFactory.this.config.getLocalIp(), Integer.valueOf(HadoopWriterFactory.this.config.getLocalPort()), HadoopWriterFactory.dateFormatter.print(dateTime), Integer.valueOf(this.flushCount), serializationType.getFileSuffix());
                        HadoopWriterFactory.log.info(String.format("Flushing events to HDFS: [%s] -> [%s]", file.getAbsolutePath(), format));
                        HadoopWriterFactory.this.hdfsAccess.get().copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(format));
                    } catch (IOException e) {
                        callbackHandler.onError(e, file);
                    }
                    callbackHandler.onSuccess(file);
                    writerStats.registerHdfsFlush();
                    this.flushCount++;
                }
            }
        }, String.format("%s/%s", this.config.getSpoolDirectoryName(), String.format("%s-%d-%s.%s.%s", this.config.getLocalIp(), Integer.valueOf(this.config.getLocalPort()), dateFormatter.print(dateTime), str, serializationType.getFileSuffix())), this.config.isFlushEnabled(), this.config.getFlushIntervalInSeconds(), new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("spool to HDFS promoter")), SyncType.valueOf(this.config.getSyncType()), this.config.getSyncBatchSize(), this.config.getRateWindowSizeMinutes(), serializationType.getSerializer()), this.config.getFlushEventQueueSize(), this.config.getRefreshDelayInSeconds());
    }

    @Override // com.ning.metrics.collector.hadoop.processing.PersistentWriterFactory
    @Managed(description = "Process all local files files")
    public void processLeftBelowFiles() throws IOException {
        for (File file : FileUtils.listFiles(new File(this.config.getSpoolDirectoryName()), FileFilterUtils.trueFileFilter(), FileFilterUtils.notFileFilter(FileFilterUtils.nameFileFilter("_tmp")))) {
            String parent = file.getParent();
            if (parent.endsWith("/_lock") || parent.endsWith("_quarantine")) {
                parent = file.getParentFile().getParent();
            }
            String[] split = StringUtils.split(parent, '.');
            String format = String.format("%s/left_below-%s-%d-%s-f%d.%s", new GranularityPathMapper(String.format("%s/%s", this.config.getEventOutputDirectory(), split[split.length - 2]), Granularity.HOURLY).getPathForDateTime(new DateTime(file.lastModified())), this.config.getLocalIp(), Integer.valueOf(this.config.getLocalPort()), dateFormatter.print(new DateTime()), 1, split[split.length - 1]);
            log.info(String.format("Flushing events to HDFS: [%s] -> [%s]", file.getAbsolutePath(), format));
            this.hdfsAccess.get().copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(format));
            if (!file.delete()) {
                log.warn(String.format("Exception cleaning up left below file: %s. We might have DUPS in HDFS!", file.toString()));
            }
            int i = 1 + 1;
        }
    }

    @Managed(description = "Whether files should be flushed to HDFS")
    public AtomicBoolean getFlushEnabled() {
        return this.flushEnabled;
    }

    @Managed(description = "Enable flush to HDFS")
    public void enableFlush() {
        this.flushEnabled.set(true);
    }

    @Managed(description = "Disable flush to HDFS")
    public void disableFlush() {
        this.flushEnabled.set(false);
    }
}
