package org.apache.storm.hdfs.bolt;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.NullPartitioner;
import org.apache.storm.hdfs.common.Partitioner;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.apache.storm.hdfs.security.HdfsSecurityUtil;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hdfs/bolt/AbstractHdfsBolt.class */
public abstract class AbstractHdfsBolt extends BaseRichBolt {
    private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
    protected Map<String, Writer> writers;
    protected OutputCollector collector;
    protected transient FileSystem fs;
    protected SyncPolicy syncPolicy;
    protected FileRotationPolicy rotationPolicy;
    protected FileNameFormat fileNameFormat;
    protected String fsUrl;
    protected String configKey;
    protected transient Object writeLock;
    protected transient Timer rotationTimer;
    protected transient Configuration hdfsConfig;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractHdfsBolt.class);
    private static final Integer DEFAULT_RETRY_COUNT = 3;
    private static final Integer DEFAULT_MAX_OPEN_FILES = 50;
    protected Map<String, Integer> rotationCounterMap = new HashMap();
    protected List<RotationAction> rotationActions = new ArrayList();
    private List<Tuple> tupleBatch = new LinkedList();
    protected long offset = 0;
    protected Integer fileRetryCount = DEFAULT_RETRY_COUNT;
    protected Integer tickTupleInterval = 15;
    protected Integer maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
    protected Partitioner partitioner = new NullPartitioner();

    /* loaded from: input_file:org/apache/storm/hdfs/bolt/AbstractHdfsBolt$WritersMap.class */
    static class WritersMap extends LinkedHashMap<String, Writer> {
        final long maxWriters;

        public WritersMap(long j) {
            super((int) j, 0.75f, true);
            this.maxWriters = j;
        }

        @Override // java.util.LinkedHashMap
        protected boolean removeEldestEntry(Map.Entry<String, Writer> entry) {
            return ((long) size()) > this.maxWriters;
        }
    }

    protected void rotateOutputFile(Writer writer) throws IOException {
        LOG.info("Rotating output file...");
        long currentTimeMillis = System.currentTimeMillis();
        synchronized (this.writeLock) {
            writer.close();
            LOG.info("Performing {} file rotation actions.", Integer.valueOf(this.rotationActions.size()));
            Iterator<RotationAction> it = this.rotationActions.iterator();
            while (it.hasNext()) {
                it.next().execute(this.fs, writer.getFilePath());
            }
        }
        LOG.info("File rotation took {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public final void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.writeLock = new Object();
        if (this.syncPolicy == null) {
            throw new IllegalStateException("SyncPolicy must be specified.");
        }
        if (this.rotationPolicy == null) {
            throw new IllegalStateException("RotationPolicy must be specified.");
        }
        if (this.fsUrl == null) {
            throw new IllegalStateException("File system URL must be specified.");
        }
        this.writers = new WritersMap(this.maxOpenFiles.intValue());
        this.collector = outputCollector;
        this.fileNameFormat.prepare(map, topologyContext);
        this.hdfsConfig = new Configuration();
        Map map2 = (Map) map.get(this.configKey);
        if (map2 != null) {
            for (String str : map2.keySet()) {
                this.hdfsConfig.set(str, String.valueOf(map2.get(str)));
            }
        }
        try {
            HdfsSecurityUtil.login(map, this.hdfsConfig);
            doPrepare(map, topologyContext, outputCollector);
            if (this.rotationPolicy instanceof TimedRotationPolicy) {
                startTimedRotationPolicy();
            }
        } catch (Exception e) {
            throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e);
        }
    }

    public final void execute(Tuple tuple) {
        synchronized (this.writeLock) {
            boolean z = false;
            Writer writer = null;
            String str = null;
            if (TupleUtils.isTick(tuple)) {
                LOG.debug("TICK! forcing a file system flush");
                this.collector.ack(tuple);
                z = true;
            } else {
                str = getHashKeyForTuple(tuple);
                try {
                    writer = getOrCreateWriter(str, tuple);
                    this.offset = writer.write(tuple);
                    this.tupleBatch.add(tuple);
                } catch (IOException e) {
                    LOG.info("Tuple failed to write, forcing a flush of existing data.");
                    this.collector.reportError(e);
                    z = true;
                    this.collector.fail(tuple);
                }
            }
            if (this.syncPolicy.mark(tuple, this.offset) || (z && this.tupleBatch.size() > 0)) {
                int i = 0;
                boolean z2 = false;
                IOException iOException = null;
                while (!z2 && i < this.fileRetryCount.intValue()) {
                    i++;
                    try {
                        syncAllWriters();
                        LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", Integer.valueOf(this.tupleBatch.size()));
                        Iterator<Tuple> it = this.tupleBatch.iterator();
                        while (it.hasNext()) {
                            this.collector.ack(it.next());
                        }
                        this.tupleBatch.clear();
                        this.syncPolicy.reset();
                        z2 = true;
                    } catch (IOException e2) {
                        LOG.warn("Data could not be synced to filesystem on attempt [{}]", Integer.valueOf(i));
                        this.collector.reportError(e2);
                        iOException = e2;
                    }
                }
                if (!z2) {
                    LOG.warn("Data could not be synced to filesystem, failing this batch of tuples");
                    Iterator<Tuple> it2 = this.tupleBatch.iterator();
                    while (it2.hasNext()) {
                        this.collector.fail(it2.next());
                    }
                    this.tupleBatch.clear();
                    throw new RuntimeException("Sync failed [" + i + "] times.", iOException);
                }
            }
            if (writer != null && writer.needsRotation()) {
                doRotationAndRemoveWriter(str, writer);
            }
        }
    }

    private Writer getOrCreateWriter(String str, Tuple tuple) throws IOException {
        Writer writer = this.writers.get(str);
        if (writer == null) {
            writer = makeNewWriter(getBasePathForNextFile(tuple), tuple);
            this.writers.put(str, writer);
        }
        return writer;
    }

    private String getHashKeyForTuple(Tuple tuple) {
        return getWriterKey(tuple) + "****" + this.partitioner.getPartitionPath(tuple);
    }

    void doRotationAndRemoveWriter(String str, Writer writer) {
        try {
            try {
                rotateOutputFile(writer);
                this.writers.remove(str);
            } catch (IOException e) {
                this.collector.reportError(e);
                LOG.error("File could not be rotated");
                this.writers.remove(str);
            }
        } catch (Throwable th) {
            this.writers.remove(str);
            throw th;
        }
    }

    public Map<String, Object> getComponentConfiguration() {
        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), this.tickTupleInterval.intValue());
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void cleanup() {
        this.rotationTimer.cancel();
    }

    private void syncAllWriters() throws IOException {
        Iterator<Writer> it = this.writers.values().iterator();
        while (it.hasNext()) {
            it.next().sync();
        }
    }

    private void startTimedRotationPolicy() {
        long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
        this.rotationTimer = new Timer(true);
        this.rotationTimer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.storm.hdfs.bolt.AbstractHdfsBolt.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                Iterator<Writer> it = AbstractHdfsBolt.this.writers.values().iterator();
                while (it.hasNext()) {
                    try {
                        AbstractHdfsBolt.this.rotateOutputFile(it.next());
                    } catch (IOException e) {
                        AbstractHdfsBolt.LOG.warn("IOException during scheduled file rotation.", (Throwable) e);
                    }
                }
                AbstractHdfsBolt.this.writers.clear();
            }
        }, interval, interval);
    }

    protected Path getBasePathForNextFile(Tuple tuple) {
        String partitionPath = this.partitioner.getPartitionPath(tuple);
        int intValue = this.rotationCounterMap.containsKey(partitionPath) ? this.rotationCounterMap.get(partitionPath).intValue() + 1 : 0;
        this.rotationCounterMap.put(partitionPath, Integer.valueOf(intValue));
        return new Path(this.fsUrl + this.fileNameFormat.getPath() + partitionPath, this.fileNameFormat.getName(intValue, System.currentTimeMillis()));
    }

    protected abstract void doPrepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) throws IOException;

    protected abstract String getWriterKey(Tuple tuple);

    protected abstract Writer makeNewWriter(Path path, Tuple tuple) throws IOException;
}
