/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.hive.bolt;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.storm.Config;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.hive.common.HiveUtils;
import org.apache.storm.hive.common.HiveWriter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.BatchHelper;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveBolt
extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
    private OutputCollector collector;
    private HiveOptions options;
    private ExecutorService callTimeoutPool;
    private transient Timer heartBeatTimer;
    private AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    private UserGroupInformation ugi = null;
    private BatchHelper batchHelper;
    private boolean tokenAuthEnabled;
    @VisibleForTesting
    Map<HiveEndPoint, HiveWriter> allWriters;

    public HiveBolt(HiveOptions options) {
        this.options = options;
    }

    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector) {
        try {
            this.tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
            try {
                this.ugi = HiveUtils.authenticate(this.tokenAuthEnabled, this.options.getKerberosKeytab(), this.options.getKerberosPrincipal());
            }
            catch (HiveUtils.AuthenticationFailed ex) {
                LOG.error("Hive kerberos authentication failed " + ex.getMessage(), (Throwable)ex);
                throw new IllegalArgumentException(ex);
            }
            this.collector = collector;
            this.batchHelper = new BatchHelper(this.options.getBatchSize().intValue(), collector);
            this.allWriters = new ConcurrentHashMap<HiveEndPoint, HiveWriter>();
            String timeoutName = "hive-bolt-%d";
            this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
            this.sendHeartBeat.set(true);
            this.heartBeatTimer = new Timer();
            this.setupHeartBeatTimer();
        }
        catch (Exception e) {
            LOG.warn("unable to make connection to hive ", (Throwable)e);
        }
    }

    public void execute(Tuple tuple) {
        try {
            if (this.batchHelper.shouldHandle(tuple)) {
                List<String> partitionVals = this.options.getMapper().mapPartitions(tuple);
                HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, this.options);
                HiveWriter writer = this.getOrCreateWriter(endPoint);
                writer.write(this.options.getMapper().mapRecord(tuple));
                this.batchHelper.addBatch(tuple);
            }
            if (this.batchHelper.shouldFlush()) {
                this.flushAllWriters(true);
                LOG.info("acknowledging tuples after writers flushed ");
                this.batchHelper.ack();
            }
            if (TupleUtils.isTick((Tuple)tuple)) {
                this.retireIdleWriters();
            }
        }
        catch (SerializationError se) {
            LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", (Object)tuple);
            this.collector.reportError((Throwable)se);
            this.collector.ack(tuple);
        }
        catch (Exception e) {
            this.batchHelper.fail(e);
            this.abortAndCloseWriters();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    public void cleanup() {
        ExecutorService[] toShutdown;
        this.sendHeartBeat.set(false);
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                HiveWriter w = entry.getValue();
                w.flushAndClose();
            }
            catch (Exception ex) {
                LOG.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", (Throwable)ex);
                if (!(ex instanceof InterruptedException)) continue;
                Thread.currentThread().interrupt();
            }
        }
        for (ExecutorService execService : toShutdown = new ExecutorService[]{this.callTimeoutPool}) {
            execService.shutdown();
            try {
                while (!execService.isTerminated()) {
                    execService.awaitTermination(this.options.getCallTimeOut().intValue(), TimeUnit.MILLISECONDS);
                }
            }
            catch (InterruptedException ex) {
                LOG.warn("shutdown interrupted on " + execService, (Throwable)ex);
            }
        }
        this.callTimeoutPool = null;
        super.cleanup();
        LOG.info("Hive Bolt stopped");
    }

    public Map<String, Object> getComponentConfiguration() {
        Map conf = super.getComponentConfiguration();
        if (conf == null) {
            conf = new Config();
        }
        if (this.options.getTickTupleInterval() > 0) {
            conf.put("topology.tick.tuple.freq.secs", this.options.getTickTupleInterval());
        }
        return conf;
    }

    private void setupHeartBeatTimer() {
        if (this.options.getHeartBeatInterval() > 0) {
            this.heartBeatTimer.schedule(new TimerTask(){

                @Override
                public void run() {
                    try {
                        if (HiveBolt.this.sendHeartBeat.get()) {
                            LOG.debug("Start sending heartbeat on all writers");
                            HiveBolt.this.sendHeartBeatOnAllWriters();
                            HiveBolt.this.setupHeartBeatTimer();
                        }
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to heartbeat on HiveWriter ", (Throwable)e);
                    }
                }
            }, this.options.getHeartBeatInterval() * 1000);
        }
    }

    private void sendHeartBeatOnAllWriters() throws InterruptedException {
        for (HiveWriter writer : this.allWriters.values()) {
            writer.heartBeat();
        }
    }

    void flushAllWriters(boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        for (HiveWriter writer : this.allWriters.values()) {
            writer.flush(rollToNext);
        }
    }

    void abortAndCloseWriters() {
        try {
            this.abortAllWriters();
            this.closeAllWriters();
        }
        catch (Exception ie) {
            LOG.warn("unable to close hive connections. ", (Throwable)ie);
        }
    }

    private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                entry.getValue().abort();
            }
            catch (Exception e) {
                LOG.error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", (Throwable)e);
            }
        }
    }

    private void closeAllWriters() {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                entry.getValue().close();
            }
            catch (Exception e) {
                LOG.warn("unable to close writers. ", (Throwable)e);
            }
        }
        this.allWriters.clear();
    }

    @VisibleForTesting
    HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        try {
            HiveWriter writer = this.allWriters.get(endPoint);
            if (writer == null) {
                LOG.debug("Creating Writer to Hive end point : " + endPoint);
                writer = HiveUtils.makeHiveWriter(endPoint, this.callTimeoutPool, this.ugi, this.options, this.tokenAuthEnabled);
                if (this.allWriters.size() > this.options.getMaxOpenConnections() - 1) {
                    LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", (Object)this.allWriters.size(), (Object)this.options.getMaxOpenConnections());
                    int retired = this.retireIdleWriters();
                    if (retired == 0) {
                        this.retireEldestWriter();
                    }
                }
                this.allWriters.put(endPoint, writer);
                HiveUtils.logAllHiveEndPoints(this.allWriters);
            }
            return writer;
        }
        catch (HiveWriter.ConnectFailure e) {
            LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, (Throwable)e);
            throw e;
        }
    }

    private void retireEldestWriter() {
        LOG.info("Attempting close eldest writers");
        long oldestTimeStamp = System.currentTimeMillis();
        HiveEndPoint eldest = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() >= oldestTimeStamp) continue;
            eldest = entry.getKey();
            oldestTimeStamp = entry.getValue().getLastUsed();
        }
        try {
            LOG.info("Closing least used Writer to Hive end point : " + eldest);
            this.allWriters.remove(eldest).flushAndClose();
        }
        catch (IOException e) {
            LOG.warn("Failed to close writer for end point: " + eldest, (Throwable)e);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, (Throwable)e);
        }
    }

    private int retireIdleWriters() {
        LOG.info("Attempting close idle writers");
        int count = 0;
        long now = System.currentTimeMillis();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (now - entry.getValue().getLastUsed() <= (long)this.options.getIdleTimeout().intValue()) continue;
            ++count;
            this.retire(entry.getKey());
        }
        return count;
    }

    private void retire(HiveEndPoint ep) {
        try {
            HiveWriter writer = this.allWriters.remove(ep);
            if (writer != null) {
                LOG.info("Closing idle Writer to Hive end point : {}", (Object)ep);
                writer.flushAndClose();
            }
        }
        catch (IOException e) {
            LOG.warn("Failed to close writer for end point: {}. Error: " + ep, (Throwable)e);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + ep, (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + ep, (Throwable)e);
        }
    }
}

