package org.apache.flume.sink.hdfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.FlumeFormatter;
import org.apache.flume.sink.hdfs.BucketWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/hdfs/HDFSEventSink.class */
public class HDFSEventSink extends AbstractSink implements PollableSink, Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(HDFSEventSink.class);
    static final long defaultRollInterval = 30;
    static final long defaultRollSize = 1024;
    static final long defaultRollCount = 10;
    static final String defaultFileName = "FlumeData";
    static final String defaultBucketFormat = "%yyyy-%mm-%dd/%HH";
    static final long defaultBatchSize = 1;
    static final long defaultTxnEventMax = 100;
    static final String defaultFileType = "SequenceFile";
    static final int defaultMaxOpenFiles = 5000;
    static final String defaultWriteFormat = "Writable";
    static final long defaultAppendTimeout = 1000;
    private long rollInterval;
    private long rollSize;
    private long rollCount;
    private long txnEventMax;
    private long batchSize;
    private CompressionCodec codeC;
    private SequenceFile.CompressionType compType;
    private String fileType;
    private String path;
    private int maxOpenFiles;
    private String writeFormat;
    private HDFSWriterFactory myWriterFactory;
    private ExecutorService executor;
    private long appendTimeout;
    final WriterLinkedHashMap sfWriters;

    /* loaded from: input_file:org/apache/flume/sink/hdfs/HDFSEventSink$WriterLinkedHashMap.class */
    private class WriterLinkedHashMap extends LinkedHashMap<String, BucketWriter> {
        private static final long serialVersionUID = 1;

        private WriterLinkedHashMap() {
        }

        @Override // java.util.LinkedHashMap
        protected boolean removeEldestEntry(Map.Entry<String, BucketWriter> entry) {
            if (super.size() <= HDFSEventSink.this.maxOpenFiles) {
                return false;
            }
            try {
                entry.getValue().close();
                return true;
            } catch (IOException e) {
                HDFSEventSink.LOG.warn(entry.getKey().toString(), e);
                return true;
            }
        }
    }

    public HDFSEventSink() {
        this.sfWriters = new WriterLinkedHashMap();
        this.myWriterFactory = new HDFSWriterFactory();
    }

    public HDFSEventSink(HDFSWriterFactory hDFSWriterFactory) {
        this.sfWriters = new WriterLinkedHashMap();
        this.myWriterFactory = hDFSWriterFactory;
    }

    public void configure(Context context) {
        String str = (String) context.get("hdfs.path", String.class);
        String str2 = (String) context.get("hdfs.filePrefix", String.class);
        String str3 = (String) context.get("hdfs.rollInterval", String.class);
        String str4 = (String) context.get("hdfs.rollSize", String.class);
        String str5 = (String) context.get("hdfs.rollCount", String.class);
        String str6 = (String) context.get("hdfs.batchSize", String.class);
        String str7 = (String) context.get("hdfs.txnEventMax", String.class);
        String str8 = (String) context.get("hdfs.codeC", String.class);
        String str9 = (String) context.get("hdfs.fileType", String.class);
        String str10 = (String) context.get("hdfs.maxOpenFiles", String.class);
        String str11 = (String) context.get("hdfs.writeFormat", String.class);
        String str12 = (String) context.get("hdfs.appendTimeout", String.class);
        if (str2 == null) {
            str2 = defaultFileName;
        }
        this.path = new String(str + "/" + str2);
        if (str3 == null) {
            this.rollInterval = defaultRollInterval;
        } else {
            this.rollInterval = Long.parseLong(str3);
        }
        if (str4 == null) {
            this.rollSize = defaultRollSize;
        } else {
            this.rollSize = Long.parseLong(str4);
        }
        if (str5 == null) {
            this.rollCount = defaultRollCount;
        } else {
            this.rollCount = Long.parseLong(str5);
        }
        if (str6 == null || str6.equals("0")) {
            this.batchSize = defaultBatchSize;
        } else {
            this.batchSize = Long.parseLong(str6);
        }
        if (str7 == null || str7.equals("0")) {
            this.txnEventMax = defaultTxnEventMax;
        } else {
            this.txnEventMax = Long.parseLong(str7);
        }
        if (str8 == null) {
            this.codeC = null;
            this.compType = SequenceFile.CompressionType.NONE;
        } else {
            this.codeC = getCodec(str8);
            this.compType = SequenceFile.CompressionType.BLOCK;
        }
        if (str9 == null) {
            this.fileType = defaultFileType;
        } else {
            this.fileType = str9;
        }
        if (str10 == null) {
            this.maxOpenFiles = defaultMaxOpenFiles;
        } else {
            this.maxOpenFiles = Integer.parseInt(str10);
        }
        if (str11 == null) {
            this.writeFormat = defaultWriteFormat;
        } else {
            this.writeFormat = str11;
        }
        if (str12 == null) {
            this.appendTimeout = defaultAppendTimeout;
        } else {
            this.appendTimeout = Long.parseLong(str12);
        }
    }

    private static boolean codecMatches(Class<? extends CompressionCodec> cls, String str) {
        String simpleName = cls.getSimpleName();
        if (cls.getName().equals(str) || simpleName.equalsIgnoreCase(str)) {
            return true;
        }
        return simpleName.endsWith("Codec") && simpleName.substring(0, simpleName.length() - "Codec".length()).equalsIgnoreCase(str);
    }

    private static CompressionCodec getCodec(String str) {
        Configuration configuration = new Configuration();
        List<Class> codecClasses = CompressionCodecFactory.getCodecClasses(configuration);
        CompressionCodec compressionCodec = null;
        ArrayList arrayList = new ArrayList();
        arrayList.add("None");
        for (Class cls : codecClasses) {
            arrayList.add(cls.getSimpleName());
            if (codecMatches(cls, str)) {
                try {
                    compressionCodec = (CompressionCodec) cls.newInstance();
                } catch (IllegalAccessException e) {
                    LOG.error("Unable to access " + cls + " class");
                } catch (InstantiationException e2) {
                    LOG.error("Unable to instantiate " + cls + " class");
                }
            }
        }
        if (compressionCodec == null) {
            if (!str.equalsIgnoreCase("None")) {
                throw new IllegalArgumentException("Unsupported compression codec " + str + ".  Please choose from: " + arrayList);
            }
        } else if (compressionCodec instanceof org.apache.hadoop.conf.Configurable) {
            ((org.apache.hadoop.conf.Configurable) compressionCodec).setConf(configuration);
        }
        return compressionCodec;
    }

    private BucketWriter.BucketFlushStatus backgroundAppend(final BucketWriter bucketWriter, final Event event) throws IOException, InterruptedException {
        Future submit = this.executor.submit(new Callable<BucketWriter.BucketFlushStatus>() { // from class: org.apache.flume.sink.hdfs.HDFSEventSink.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public BucketWriter.BucketFlushStatus call() throws Exception {
                return bucketWriter.append(event);
            }
        });
        try {
            return this.appendTimeout > 0 ? (BucketWriter.BucketFlushStatus) submit.get(this.appendTimeout, TimeUnit.MILLISECONDS) : (BucketWriter.BucketFlushStatus) submit.get();
        } catch (InterruptedException e) {
            LOG.warn("Unexpected Exception " + e.getMessage(), e);
            throw e;
        } catch (CancellationException e2) {
            throw new InterruptedException("Blocked append interrupted by rotation event");
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            if (cause instanceof InterruptedException) {
                throw ((InterruptedException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            LOG.error("Got a throwable that is not an exception! Bailing out!", e3.getCause());
            throw new RuntimeException(e3.getCause());
        } catch (TimeoutException e4) {
            submit.cancel(true);
            throw new IOException("Append timed out", e4);
        }
    }

    public PollableSink.Status process() throws EventDeliveryException {
        Event take;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        HashMap hashMap = new HashMap();
        try {
            try {
                try {
                    transaction.begin();
                    for (int i = 0; i < this.txnEventMax && (take = channel.take()) != null; i++) {
                        String escapeString = BucketPath.escapeString(this.path, take.getHeaders());
                        BucketWriter bucketWriter = this.sfWriters.get(escapeString);
                        if (bucketWriter == null) {
                            HDFSWriter writer = this.myWriterFactory.getWriter(this.fileType);
                            FlumeFormatter formatter = HDFSFormatterFactory.getFormatter(this.writeFormat);
                            bucketWriter = new BucketWriter(this.rollInterval, this.rollSize, this.rollCount, this.batchSize);
                            bucketWriter.open(escapeString, this.codeC, this.compType, writer, formatter);
                            this.sfWriters.put(escapeString, bucketWriter);
                        }
                        BucketWriter.BucketFlushStatus backgroundAppend = backgroundAppend(bucketWriter, take);
                        if (backgroundAppend == BucketWriter.BucketFlushStatus.BatchStarted) {
                            hashMap.put(bucketWriter.getFilePath(), bucketWriter);
                        } else if (this.batchSize > defaultBatchSize && backgroundAppend == BucketWriter.BucketFlushStatus.BatchFlushed) {
                            hashMap.remove(bucketWriter.getFilePath());
                        }
                    }
                    Iterator it = hashMap.entrySet().iterator();
                    while (it.hasNext()) {
                        ((BucketWriter) ((Map.Entry) it.next()).getValue()).flush();
                    }
                    hashMap.clear();
                    transaction.commit();
                    PollableSink.Status status = PollableSink.Status.READY;
                    Iterator it2 = hashMap.entrySet().iterator();
                    while (it2.hasNext()) {
                        ((BucketWriter) ((Map.Entry) it2.next()).getValue()).abort();
                    }
                    transaction.close();
                    return status;
                } catch (IOException e) {
                    transaction.rollback();
                    LOG.warn("HDFS IO error", e);
                    PollableSink.Status status2 = PollableSink.Status.BACKOFF;
                    Iterator it3 = hashMap.entrySet().iterator();
                    while (it3.hasNext()) {
                        ((BucketWriter) ((Map.Entry) it3.next()).getValue()).abort();
                    }
                    transaction.close();
                    return status2;
                }
            } catch (Exception e2) {
                transaction.rollback();
                LOG.error("process failed", e2);
                throw new EventDeliveryException(e2.getMessage());
            }
        } catch (Throwable th) {
            Iterator it4 = hashMap.entrySet().iterator();
            while (it4.hasNext()) {
                ((BucketWriter) ((Map.Entry) it4.next()).getValue()).abort();
            }
            transaction.close();
            throw th;
        }
    }

    public void stop() {
        try {
            for (Map.Entry<String, BucketWriter> entry : this.sfWriters.entrySet()) {
                LOG.info("Closing " + entry.getKey());
                entry.getValue().close();
            }
        } catch (IOException e) {
            LOG.warn("IOException in opening file", e);
        }
        this.executor.shutdown();
        while (!this.executor.isTerminated()) {
            try {
                this.executor.awaitTermination(defaultAppendTimeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e2) {
                LOG.warn("shutdown interrupted" + e2.getMessage(), e2);
            }
        }
        this.executor = null;
        super.stop();
    }

    public void start() {
        this.executor = Executors.newFixedThreadPool(1);
        Iterator<Map.Entry<String, BucketWriter>> it = this.sfWriters.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().open();
            } catch (IOException e) {
                LOG.warn("IOException in opening file", e);
            }
        }
        super.start();
    }
}
