/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.hive.common;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.RecordWriter;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.StreamingIOFailure;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.storm.hive.bolt.mapper.HiveMapper;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveWriter {
    private static final Logger LOG = LoggerFactory.getLogger(HiveWriter.class);
    private final HiveEndPoint endPoint;
    private final StreamingConnection connection;
    private final int txnsPerBatch;
    private final RecordWriter recordWriter;
    private final ExecutorService callTimeoutPool;
    private final long callTimeout;
    private final Object txnBatchLock = new Object();
    protected boolean closed;
    private TransactionBatch txnBatch;
    private long lastUsed;
    private boolean autoCreatePartitions;
    private UserGroupInformation ugi;
    private int totalRecords = 0;

    public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, long callTimeout, ExecutorService callTimeoutPool, HiveMapper mapper, UserGroupInformation ugi, boolean tokenAuthEnabled) throws InterruptedException, ConnectFailure {
        try {
            this.autoCreatePartitions = autoCreatePartitions;
            this.callTimeout = callTimeout;
            this.callTimeoutPool = callTimeoutPool;
            this.endPoint = endPoint;
            this.ugi = ugi;
            this.connection = this.newConnection(ugi, tokenAuthEnabled);
            this.txnsPerBatch = txnsPerBatch;
            this.recordWriter = this.getRecordWriter(mapper, tokenAuthEnabled);
            this.txnBatch = this.nextTxnBatch(this.recordWriter);
            this.closed = false;
            this.lastUsed = System.currentTimeMillis();
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new ConnectFailure(endPoint, (Throwable)e);
        }
    }

    private static void checkAndThrowInterruptedException() throws InterruptedException {
        Thread.currentThread();
        if (Thread.interrupted()) {
            throw new InterruptedException("Timed out before Hive call was made. Your callTimeout might be set too low or Hive calls are taking too long.");
        }
    }

    public RecordWriter getRecordWriter(final HiveMapper mapper, boolean tokenAuthEnabled) throws Exception {
        if (!tokenAuthEnabled) {
            return mapper.createRecordWriter(this.endPoint);
        }
        try {
            return (RecordWriter)this.ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RecordWriter>(){

                @Override
                public RecordWriter run() throws StreamingException, IOException, ClassNotFoundException {
                    return mapper.createRecordWriter(HiveWriter.this.endPoint);
                }
            });
        }
        catch (Exception e) {
            throw new ConnectFailure(this.endPoint, (Throwable)e);
        }
    }

    private HiveConf createHiveConf(String metaStoreUri, boolean tokenAuthEnabled) {
        if (!tokenAuthEnabled) {
            return null;
        }
        HiveConf hcatConf = new HiveConf();
        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
        hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
        return hcatConf;
    }

    public String toString() {
        return "{ endPoint = " + this.endPoint.toString() + ", TransactionBatch = " + this.txnBatch.toString() + " }";
    }

    public synchronized void write(final byte[] record) throws WriteFailure, SerializationError, InterruptedException {
        if (this.closed) {
            throw new IllegalStateException("This hive streaming writer was closed and thus no longer able to write : " + this.endPoint);
        }
        try {
            LOG.debug("Writing event to {}", (Object)this.endPoint);
            this.callWithTimeout(new CallRunner<Void>(){

                @Override
                public Void call() throws StreamingException, InterruptedException {
                    HiveWriter.this.txnBatch.write(record);
                    HiveWriter.this.totalRecords++;
                    return null;
                }
            });
        }
        catch (SerializationError se) {
            throw new SerializationError(this.endPoint.toString() + " SerializationError", (Exception)((Object)se));
        }
        catch (StreamingException e) {
            throw new WriteFailure(this.endPoint, this.txnBatch.getCurrentTxnId(), e);
        }
        catch (TimeoutException e) {
            throw new WriteFailure(this.endPoint, this.txnBatch.getCurrentTxnId(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush(boolean rollToNext) throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
        if (this.totalRecords <= 0) {
            return;
        }
        try {
            Object object = this.txnBatchLock;
            synchronized (object) {
                this.commitTxn();
                this.nextTxn(rollToNext);
                this.totalRecords = 0;
                this.lastUsed = System.currentTimeMillis();
            }
        }
        catch (StreamingException e) {
            throw new TxnFailure(this.txnBatch, (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void heartBeat() throws InterruptedException {
        Object object = this.txnBatchLock;
        synchronized (object) {
            try {
                this.callWithTimeout(new CallRunner<Void>(){

                    @Override
                    public Void call() throws Exception {
                        try {
                            LOG.info("Sending heartbeat on batch " + HiveWriter.this.txnBatch);
                            HiveWriter.this.txnBatch.heartbeat();
                        }
                        catch (StreamingException e) {
                            LOG.warn("Heartbeat error on batch " + HiveWriter.this.txnBatch, (Throwable)e);
                        }
                        return null;
                    }
                });
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (Exception e) {
                LOG.warn("Unable to send heartbeat on Txn Batch " + this.txnBatch, (Throwable)e);
            }
        }
    }

    public int getTotalRecords() {
        return this.totalRecords;
    }

    public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure, IOException, InterruptedException {
        this.flush(false);
        this.close();
    }

    public void close() throws IOException, InterruptedException {
        this.closeTxnBatch();
        this.closeConnection();
        this.closed = true;
    }

    private void closeConnection() throws InterruptedException {
        LOG.info("Closing connection to end point : {}", (Object)this.endPoint);
        try {
            this.callWithTimeout(new CallRunner<Void>(){

                @Override
                public Void call() throws Exception {
                    HiveWriter.this.connection.close();
                    return null;
                }
            });
        }
        catch (Exception e) {
            LOG.warn("Error closing connection to EndPoint : " + this.endPoint, (Throwable)e);
        }
    }

    private void commitTxn() throws CommitFailure, InterruptedException {
        LOG.debug("Committing Txn id {} to {}", (Object)this.txnBatch.getCurrentTxnId(), (Object)this.endPoint);
        try {
            this.callWithTimeout(new CallRunner<Void>(){

                @Override
                public Void call() throws Exception {
                    HiveWriter.this.txnBatch.commit();
                    return null;
                }
            });
        }
        catch (StreamingException e) {
            throw new CommitFailure(this.endPoint, this.txnBatch.getCurrentTxnId(), e);
        }
        catch (TimeoutException e) {
            throw new CommitFailure(this.endPoint, this.txnBatch.getCurrentTxnId(), e);
        }
    }

    @VisibleForTesting
    StreamingConnection newConnection(final UserGroupInformation ugi, final boolean tokenAuthEnabled) throws InterruptedException, ConnectFailure {
        try {
            return this.callWithTimeout(new CallRunner<StreamingConnection>(){

                @Override
                public StreamingConnection call() throws Exception {
                    return HiveWriter.this.endPoint.newConnection(HiveWriter.this.autoCreatePartitions, HiveWriter.this.createHiveConf(((HiveWriter)HiveWriter.this).endPoint.metaStoreUri, tokenAuthEnabled), ugi);
                }
            });
        }
        catch (StreamingException e) {
            throw new ConnectFailure(this.endPoint, (Throwable)e);
        }
        catch (TimeoutException e) {
            throw new ConnectFailure(this.endPoint, (Throwable)e);
        }
    }

    private TransactionBatch nextTxnBatch(final RecordWriter recordWriter) throws InterruptedException, TxnBatchFailure {
        LOG.debug("Fetching new Txn Batch for {}", (Object)this.endPoint);
        TransactionBatch batch = null;
        try {
            batch = this.callWithTimeout(new CallRunner<TransactionBatch>(){

                @Override
                public TransactionBatch call() throws Exception {
                    return HiveWriter.this.connection.fetchTransactionBatch(HiveWriter.this.txnsPerBatch, recordWriter);
                }
            });
            batch.beginNextTransaction();
            LOG.debug("Acquired {}. Switching to first txn", (Object)batch);
        }
        catch (TimeoutException e) {
            throw new TxnBatchFailure(this.endPoint, (Throwable)e);
        }
        catch (StreamingException e) {
            throw new TxnBatchFailure(this.endPoint, (Throwable)e);
        }
        return batch;
    }

    private void closeTxnBatch() throws InterruptedException {
        try {
            LOG.debug("Closing Txn Batch {}", (Object)this.txnBatch);
            this.callWithTimeout(new CallRunner<Void>(){

                @Override
                public Void call() throws Exception {
                    if (HiveWriter.this.txnBatch != null) {
                        HiveWriter.this.txnBatch.close();
                    }
                    return null;
                }
            });
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Exception e) {
            LOG.warn("Error closing txn batch " + this.txnBatch, (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abort() throws StreamingException, TxnBatchFailure, InterruptedException {
        Object object = this.txnBatchLock;
        synchronized (object) {
            this.abortTxn();
            this.nextTxn(true);
        }
    }

    private void abortTxn() throws InterruptedException {
        LOG.info("Aborting Txn id {} on End Point {}", (Object)this.txnBatch.getCurrentTxnId(), (Object)this.endPoint);
        try {
            this.callWithTimeout(new CallRunner<Void>(){

                @Override
                public Void call() throws StreamingException, InterruptedException {
                    HiveWriter.this.txnBatch.abort();
                    return null;
                }
            });
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (TimeoutException e) {
            LOG.warn("Timeout while aborting Txn " + this.txnBatch.getCurrentTxnId() + " on EndPoint: " + this.endPoint, (Throwable)e);
        }
        catch (Exception e) {
            LOG.warn("Error aborting Txn " + this.txnBatch.getCurrentTxnId() + " on EndPoint: " + this.endPoint, (Throwable)e);
        }
    }

    private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
        if (this.txnBatch.remainingTransactions() == 0) {
            this.closeTxnBatch();
            this.txnBatch = null;
            if (rollToNext) {
                this.txnBatch = this.nextTxnBatch(this.recordWriter);
            }
        } else if (rollToNext) {
            LOG.debug("Switching to next Txn for {}", (Object)this.endPoint);
            this.txnBatch.beginNextTransaction();
        }
    }

    private <T> T callWithTimeout(final CallRunner<T> callRunner) throws TimeoutException, StreamingException, InterruptedException {
        Future future = this.callTimeoutPool.submit(new Callable<T>(){

            @Override
            public T call() throws Exception {
                return callRunner.call();
            }
        });
        try {
            if (this.callTimeout > 0L) {
                return future.get(this.callTimeout, TimeUnit.MILLISECONDS);
            }
            return future.get();
        }
        catch (TimeoutException timeoutException) {
            future.cancel(true);
            throw timeoutException;
        }
        catch (ExecutionException e1) {
            Throwable cause = e1.getCause();
            if (cause instanceof IOException) {
                throw new StreamingIOFailure("I/O Failure", (Exception)((IOException)cause));
            }
            if (cause instanceof StreamingException) {
                throw (StreamingException)cause;
            }
            if (cause instanceof InterruptedException) {
                throw (InterruptedException)cause;
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException)cause;
            }
            if (cause instanceof TimeoutException) {
                throw new StreamingException("Operation Timed Out.", (Exception)((TimeoutException)cause));
            }
            throw new RuntimeException(e1);
        }
    }

    public long getLastUsed() {
        return this.lastUsed;
    }

    private byte[] generateRecord(Tuple tuple) {
        StringBuilder buf = new StringBuilder();
        for (Object o : tuple.getValues()) {
            buf.append(o);
            buf.append(",");
        }
        return buf.toString().getBytes();
    }

    public static class TxnFailure
    extends Failure {
        public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
            super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
        }
    }

    public static class TxnBatchFailure
    extends Failure {
        public TxnBatchFailure(HiveEndPoint ep, Throwable cause) {
            super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
        }
    }

    public static class ConnectFailure
    extends Failure {
        public ConnectFailure(HiveEndPoint ep, Throwable cause) {
            super("Failed connecting to EndPoint " + ep, cause);
        }
    }

    public static class CommitFailure
    extends Failure {
        public CommitFailure(HiveEndPoint endPoint, Long txnId, Throwable cause) {
            super("Commit of Txn " + txnId + " failed on EndPoint: " + endPoint, cause);
        }
    }

    public static class WriteFailure
    extends Failure {
        public WriteFailure(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
            super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
        }
    }

    public static class Failure
    extends Exception {
        public Failure(String message, Throwable cause) {
            super(message, cause);
        }
    }

    private static interface CallRunner<T> {
        public T call() throws Exception;
    }
}

