package com.ning.metrics.collector.processing.hadoop;

import com.google.inject.Inject;
import com.ning.arecibo.jmx.Monitored;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.collector.processing.EventSpoolProcessor;
import com.ning.metrics.collector.processing.LocalSpoolManager;
import com.ning.metrics.collector.processing.SerializationType;
import com.ning.metrics.serialization.hadoop.FileSystemAccess;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/ning/metrics/collector/processing/hadoop/HadoopWriterFactory.class */
public class HadoopWriterFactory implements EventSpoolProcessor {
    private static final Logger log = LoggerFactory.getLogger(HadoopWriterFactory.class);
    private final CollectorConfig config;
    private final FileSystemAccess hdfsAccess;
    private final AtomicBoolean flushEnabled;
    private static final String PROCESSOR_NAME = "HDFSWriter";

    @Inject
    public HadoopWriterFactory(FileSystemAccess fileSystemAccess, CollectorConfig collectorConfig) {
        this.hdfsAccess = fileSystemAccess;
        this.config = collectorConfig;
        this.flushEnabled = new AtomicBoolean(collectorConfig.isFlushEnabled());
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public void processEventFile(String str, SerializationType serializationType, File file, String str2) throws IOException {
        pushFileToHadoop(file, str2);
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public void close() {
        this.hdfsAccess.close();
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public String getProcessorName() {
        return PROCESSOR_NAME;
    }

    protected void pushFileToHadoop(File file, String str) throws IOException {
        log.info(String.format("Flushing events to HDFS: [%s] -> [%s]", file.getAbsolutePath(), str));
        this.hdfsAccess.get().copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(str));
    }

    @Managed(description = "Whether files should be flushed to HDFS")
    public AtomicBoolean getFlushEnabled() {
        return this.flushEnabled;
    }

    @Managed(description = "Enable flush to HDFS")
    public void enableFlush() {
        this.flushEnabled.set(true);
    }

    @Managed(description = "Disable flush to HDFS")
    public void disableFlush() {
        this.flushEnabled.set(false);
    }

    @Monitored(description = "Number of local files not yet pushed to HDFS")
    public int nbLocalFiles() {
        return LocalSpoolManager.findFilesInSpoolDirectory(new File(this.config.getSpoolDirectoryName())).size();
    }
}
