/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.hive.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.hive.common.HiveUtils;
import org.apache.storm.hive.common.HiveWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveBolt
extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
    private OutputCollector collector;
    private HiveOptions options;
    private Integer currentBatchSize;
    private ExecutorService callTimeoutPool;
    private transient Timer heartBeatTimer;
    private Boolean kerberosEnabled = false;
    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
    private UserGroupInformation ugi = null;
    HashMap<HiveEndPoint, HiveWriter> allWriters;

    public HiveBolt(HiveOptions options) {
        this.options = options;
        this.currentBatchSize = 0;
    }

    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector) {
        try {
            if (this.options.getKerberosPrincipal() == null && this.options.getKerberosKeytab() == null) {
                this.kerberosEnabled = false;
            } else if (this.options.getKerberosPrincipal() != null && this.options.getKerberosKeytab() != null) {
                this.kerberosEnabled = true;
            } else {
                throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal  & KerberosKeytab");
            }
            if (this.kerberosEnabled.booleanValue()) {
                try {
                    this.ugi = HiveUtils.authenticate(this.options.getKerberosKeytab(), this.options.getKerberosPrincipal());
                }
                catch (HiveUtils.AuthenticationFailed ex) {
                    LOG.error("Hive Kerberos authentication failed " + ex.getMessage(), (Throwable)ex);
                    throw new IllegalArgumentException(ex);
                }
            }
            this.collector = collector;
            this.allWriters = new HashMap();
            String timeoutName = "hive-bolt-%d";
            this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
            this.heartBeatTimer = new Timer();
            this.setupHeartBeatTimer();
        }
        catch (Exception e) {
            LOG.warn("unable to make connection to hive ", (Throwable)e);
        }
    }

    public void execute(Tuple tuple) {
        try {
            List<String> partitionVals = this.options.getMapper().mapPartitions(tuple);
            HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, this.options);
            HiveWriter writer = this.getOrCreateWriter(endPoint);
            if (this.timeToSendHeartBeat.compareAndSet(true, false)) {
                this.enableHeartBeatOnAllWriters();
            }
            writer.write(this.options.getMapper().mapRecord(tuple));
            Integer n = this.currentBatchSize;
            Integer n2 = this.currentBatchSize = Integer.valueOf(this.currentBatchSize + 1);
            if (this.currentBatchSize >= this.options.getBatchSize()) {
                this.flushAllWriters();
                this.currentBatchSize = 0;
            }
            this.collector.ack(tuple);
        }
        catch (Exception e) {
            this.collector.reportError((Throwable)e);
            this.collector.fail(tuple);
            this.flushAndCloseWriters();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    public void cleanup() {
        ExecutorService[] toShutdown;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                HiveWriter w = entry.getValue();
                LOG.info("Flushing writer to {}", (Object)w);
                w.flush(false);
                LOG.info("Closing writer to {}", (Object)w);
                w.close();
            }
            catch (Exception ex) {
                LOG.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", (Throwable)ex);
                if (!(ex instanceof InterruptedException)) continue;
                Thread.currentThread().interrupt();
            }
        }
        for (ExecutorService execService : toShutdown = new ExecutorService[]{this.callTimeoutPool}) {
            execService.shutdown();
            try {
                while (!execService.isTerminated()) {
                    execService.awaitTermination(this.options.getCallTimeOut().intValue(), TimeUnit.MILLISECONDS);
                }
            }
            catch (InterruptedException ex) {
                LOG.warn("shutdown interrupted on " + execService, (Throwable)ex);
            }
        }
        this.callTimeoutPool = null;
        super.cleanup();
        LOG.info("Hive Bolt stopped");
    }

    private void setupHeartBeatTimer() {
        if (this.options.getHeartBeatInterval() > 0) {
            this.heartBeatTimer.schedule(new TimerTask(){

                @Override
                public void run() {
                    HiveBolt.this.timeToSendHeartBeat.set(true);
                    HiveBolt.this.setupHeartBeatTimer();
                }
            }, this.options.getHeartBeatInterval() * 1000);
        }
    }

    private void flushAllWriters() throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        for (HiveWriter writer : this.allWriters.values()) {
            writer.flush(true);
        }
    }

    private void closeAllWriters() {
        try {
            for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
                entry.getValue().close();
            }
            this.allWriters.clear();
        }
        catch (Exception e) {
            LOG.warn("unable to close writers. ", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushAndCloseWriters() {
        try {
            this.flushAllWriters();
        }
        catch (Exception e) {
            LOG.warn("unable to flush hive writers. ", (Throwable)e);
        }
        finally {
            this.closeAllWriters();
        }
    }

    private void enableHeartBeatOnAllWriters() {
        for (HiveWriter writer : this.allWriters.values()) {
            writer.setHeartBeatNeeded();
        }
    }

    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        try {
            HiveWriter writer = this.allWriters.get(endPoint);
            if (writer == null) {
                int retired;
                LOG.debug("Creating Writer to Hive end point : " + endPoint);
                writer = HiveUtils.makeHiveWriter(endPoint, this.callTimeoutPool, this.ugi, this.options);
                if (this.allWriters.size() > this.options.getMaxOpenConnections() && (retired = this.retireIdleWriters()) == 0) {
                    this.retireEldestWriter();
                }
                this.allWriters.put(endPoint, writer);
            }
            return writer;
        }
        catch (HiveWriter.ConnectFailure e) {
            LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, (Throwable)e);
            throw e;
        }
    }

    private void retireEldestWriter() {
        long oldestTimeStamp = System.currentTimeMillis();
        HiveEndPoint eldest = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() >= oldestTimeStamp) continue;
            eldest = entry.getKey();
            oldestTimeStamp = entry.getValue().getLastUsed();
        }
        try {
            LOG.info("Closing least used Writer to Hive end point : " + eldest);
            this.allWriters.remove(eldest).close();
        }
        catch (IOException e) {
            LOG.warn("Failed to close writer for end point: " + eldest, (Throwable)e);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    private int retireIdleWriters() {
        int count = 0;
        long now = System.currentTimeMillis();
        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (now - entry.getValue().getLastUsed() <= (long)this.options.getIdleTimeout().intValue()) continue;
            ++count;
            retirees.add(entry.getKey());
        }
        for (HiveEndPoint ep : retirees) {
            try {
                LOG.info("Closing idle Writer to Hive end point : {}", (Object)ep);
                this.allWriters.remove(ep).close();
            }
            catch (IOException e) {
                LOG.warn("Failed to close writer for end point: {}. Error: " + ep, (Throwable)e);
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        return count;
    }
}

