package org.apache.hive.streaming;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection.class */
public class HiveStreamingConnection implements StreamingConnection {
    private static final Logger LOG;
    private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083";
    private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1;
    private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true;
    private String database;
    private String table;
    private List<String> staticPartitionValues;
    private String agentInfo;
    private int transactionBatchSize;
    private RecordWriter recordWriter;
    private StreamingTransaction currentTransactionBatch;
    private HiveConf conf;
    private boolean streamingOptimizations;
    private AtomicBoolean isConnectionClosed;
    private boolean isPartitionedTable;
    private IMetaStoreClient msClient;
    private IMetaStoreClient heartbeatMSClient;
    private final String username;
    private final boolean secureMode;
    private Table tableObject;
    private String metastoreUri;
    private ConnectionStats connectionStats;
    private final Long writeId;
    private final Integer statementId;
    private boolean manageTransactions;
    private int countTransactions;
    private Set<String> partitions;
    private Map<String, WriteDirInfo> writePaths;
    private Runnable onShutdownRunner;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection$Builder.class */
    public static class Builder {
        private String database;
        private String table;
        private List<String> staticPartitionValues;
        private String agentInfo;
        private HiveConf hiveConf;
        private RecordWriter recordWriter;
        private Table tableObject;
        private boolean isPartitioned;
        private int transactionBatchSize = 1;
        private boolean streamingOptimizations = true;
        private long writeId = -1;
        private int statementId = -1;
        private boolean manageTransactions = true;

        public Builder withDatabase(String str) {
            this.database = str;
            return this;
        }

        public Builder withTable(String str) {
            this.table = str;
            return this;
        }

        public Builder withStaticPartitionValues(List<String> list) {
            this.staticPartitionValues = list == null ? null : new ArrayList(list);
            return this;
        }

        public Builder withAgentInfo(String str) {
            this.agentInfo = str;
            return this;
        }

        public Builder withHiveConf(HiveConf hiveConf) {
            this.hiveConf = hiveConf;
            return this;
        }

        @InterfaceStability.Evolving
        public Builder withTransactionBatchSize(int i) {
            this.transactionBatchSize = i;
            return this;
        }

        public Builder withStreamingOptimizations(boolean z) {
            this.streamingOptimizations = z;
            return this;
        }

        public Builder withRecordWriter(RecordWriter recordWriter) {
            this.recordWriter = recordWriter;
            return this;
        }

        public Builder withWriteId(long j) {
            this.writeId = j;
            this.manageTransactions = false;
            return this;
        }

        public Builder withStatementId(int i) {
            this.statementId = i;
            return this;
        }

        public Builder withTableObject(Table table) {
            this.tableObject = table;
            this.isPartitioned = (this.tableObject.getPartitionKeys() == null || this.tableObject.getPartitionKeys().isEmpty()) ? false : true;
            return this;
        }

        public HiveStreamingConnection connect() throws StreamingException {
            if (this.database == null) {
                throw new StreamingException("Database cannot be null for streaming connection");
            }
            if (this.table == null) {
                if (this.tableObject == null) {
                    throw new StreamingException("Table and table object cannot be null for streaming connection");
                }
                this.table = this.tableObject.getTableName();
            }
            if (this.tableObject != null && !this.tableObject.getTableName().equals(this.table)) {
                throw new StreamingException("Table must match tableObject table name");
            }
            if (this.recordWriter == null) {
                throw new StreamingException("Record writer cannot be null for streaming connection");
            }
            if ((this.writeId != -1 && this.tableObject == null) || (this.writeId == -1 && this.tableObject != null)) {
                throw new StreamingException("If writeId is set, tableObject must be set as well and vice versa");
            }
            HiveStreamingConnection hiveStreamingConnection = new HiveStreamingConnection(this);
            hiveStreamingConnection.getClass();
            hiveStreamingConnection.onShutdownRunner = hiveStreamingConnection::close;
            ShutdownHookManager.addShutdownHook(hiveStreamingConnection.onShutdownRunner, 11);
            Thread.setDefaultUncaughtExceptionHandler((thread, th) -> {
                hiveStreamingConnection.close();
            });
            return hiveStreamingConnection;
        }
    }

    /* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection$TxnState.class */
    public enum TxnState {
        INACTIVE("I"),
        OPEN("O"),
        COMMITTED("C"),
        ABORTED("A"),
        PREPARED_FOR_COMMIT("P");

        private final String code;

        TxnState(String str) {
            this.code = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.code;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/streaming/HiveStreamingConnection$WriteDirInfo.class */
    public static class WriteDirInfo {
        List<String> partitionVals;
        Path writeDir;

        WriteDirInfo(List<String> list, Path path) {
            this.partitionVals = list;
            this.writeDir = path;
        }

        List<String> getPartitionVals() {
            return this.partitionVals;
        }

        Path getWriteDir() {
            return this.writeDir;
        }
    }

    private HiveStreamingConnection(Builder builder) throws StreamingException {
        this.isConnectionClosed = new AtomicBoolean(false);
        this.tableObject = null;
        this.countTransactions = 0;
        this.database = builder.database.toLowerCase();
        this.table = builder.table.toLowerCase();
        this.staticPartitionValues = builder.staticPartitionValues;
        this.conf = builder.hiveConf;
        this.agentInfo = builder.agentInfo;
        this.streamingOptimizations = builder.streamingOptimizations;
        this.writeId = Long.valueOf(builder.writeId);
        this.statementId = Integer.valueOf(builder.statementId);
        this.tableObject = builder.tableObject;
        setPartitionedTable(Boolean.valueOf(builder.isPartitioned));
        this.manageTransactions = builder.manageTransactions;
        this.writePaths = new HashMap();
        UserGroupInformation userGroupInformation = null;
        try {
            userGroupInformation = UserGroupInformation.getLoginUser();
        } catch (IOException e) {
            LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage());
        }
        if (userGroupInformation == null) {
            this.username = System.getProperty("user.name");
            this.secureMode = false;
        } else {
            this.username = userGroupInformation.getShortUserName();
            this.secureMode = userGroupInformation.hasKerberosCredentials();
        }
        this.transactionBatchSize = builder.transactionBatchSize;
        this.recordWriter = builder.recordWriter;
        this.connectionStats = new ConnectionStats();
        if (this.agentInfo == null) {
            try {
                this.agentInfo = this.username + ":" + InetAddress.getLocalHost().getHostName() + ":" + Thread.currentThread().getName();
            } catch (UnknownHostException e2) {
                this.agentInfo = UUID.randomUUID().toString();
            }
        }
        if (this.conf == null) {
            this.conf = createHiveConf(getClass(), DEFAULT_METASTORE_URI);
        }
        overrideConfSettings(this.conf);
        if (this.manageTransactions) {
            this.metastoreUri = this.conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
            this.msClient = getMetaStoreClient(this.conf, this.metastoreUri, this.secureMode, "streaming-connection");
            this.heartbeatMSClient = getMetaStoreClient(this.conf, this.metastoreUri, this.secureMode, "streaming-connection-heartbeat");
            validateTable();
        }
        LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private void setPartitionedTable(Boolean bool) {
        this.isPartitionedTable = bool.booleanValue();
    }

    public String toString() {
        return "{ metaStoreUri: " + this.metastoreUri + ", database: " + this.database + ", table: " + this.table + " }";
    }

    private String toConnectionInfoString() {
        return "{ metastore-uri: " + this.metastoreUri + ", database: " + this.database + ", table: " + this.table + ", partitioned-table: " + isPartitionedTable() + ", dynamic-partitioning: " + isDynamicPartitioning() + ", username: " + this.username + ", secure-mode: " + this.secureMode + ", record-writer: " + this.recordWriter.getClass().getSimpleName() + ", agent-info: " + this.agentInfo + ", writeId: " + this.writeId + ", statementId: " + this.statementId + " }";
    }

    @VisibleForTesting
    String toTransactionString() {
        return this.currentTransactionBatch == null ? "" : this.currentTransactionBatch.toString();
    }

    @Override // org.apache.hive.streaming.PartitionHandler
    public PartitionInfo createPartitionIfNotExists(List<String> list) throws StreamingException {
        Partition createMetaPartitionObject;
        String str = null;
        String str2 = null;
        boolean z = false;
        try {
            try {
                Map makeSpecFromValues = Warehouse.makeSpecFromValues(this.tableObject.getPartitionKeys(), list);
                Path path = new Path(Utilities.getQualifiedPath(this.conf, new Path(this.tableObject.getDataLocation(), Warehouse.makePartPath(makeSpecFromValues))));
                str = path.toString();
                str2 = Warehouse.makePartName(this.tableObject.getPartitionKeys(), list);
                createMetaPartitionObject = org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(this.tableObject, makeSpecFromValues, path);
            } catch (AlreadyExistsException e) {
                z = true;
            }
            if (getMSC() == null) {
                return new PartitionInfo(str2, str, false);
            }
            getMSC().add_partition(createMetaPartitionObject);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Created partition {} for table {}", str2, this.tableObject.getFullyQualifiedName());
            }
            return new PartitionInfo(str2, str, z);
        } catch (HiveException | TException e2) {
            throw new StreamingException("Unable to creation partition for values: " + list + " connection: " + toConnectionInfoString(), e2);
        }
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public Path getDeltaFileLocation(List<String> list, Integer num, Long l, Long l2, Integer num2) throws StreamingException {
        return this.recordWriter.getDeltaFileLocation(list, num, l, l2, num2, this.tableObject);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IMetaStoreClient getMSC() {
        this.connectionStats.incrementMetastoreCalls();
        return this.msClient;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IMetaStoreClient getHeatbeatMSC() {
        this.connectionStats.incrementMetastoreCalls();
        return this.heartbeatMSClient;
    }

    /* JADX WARN: Failed to calculate best type for var: r11v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r11v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r12v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r12v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 11, insn: 0x01b0: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r11 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:55:0x01b0 */
    /* JADX WARN: Not initialized variable reg: 12, insn: 0x01b5: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r12 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:57:0x01b5 */
    /* JADX WARN: Type inference failed for: r11v0, types: [org.apache.hadoop.fs.FSDataOutputStream] */
    /* JADX WARN: Type inference failed for: r12v0, types: [java.lang.Throwable] */
    private void validateTable() throws InvalidTable, ConnectionError {
        ?? r11;
        ?? r12;
        try {
            this.tableObject = new Table(getMSC().getTable(this.database, this.table));
            if (!AcidUtils.isFullAcidTable(this.tableObject)) {
                LOG.error("HiveEndPoint " + this + " must use an acid table");
                throw new InvalidTable(this.database, this.table, "is not an Acid table");
            }
            if (this.tableObject.getPartitionKeys() == null || this.tableObject.getPartitionKeys().isEmpty()) {
                setPartitionedTable(false);
            } else {
                setPartitionedTable(true);
            }
            if (!isPartitionedTable() && this.staticPartitionValues != null && !this.staticPartitionValues.isEmpty()) {
                String str = toString() + " specifies partitions for un-partitioned table";
                LOG.error(str);
                throw new ConnectionError(str);
            }
            if (this.transactionBatchSize > 1) {
                try {
                    FileSystem fileSystem = this.tableObject.getDataLocation().getFileSystem(this.conf);
                    Throwable th = null;
                    try {
                        if (BlobStorageUtils.isBlobStorageFileSystem(this.conf, fileSystem)) {
                            Path path = new Path("/tmp", "_tmp_stream_verify_" + UUID.randomUUID().toString());
                            try {
                                try {
                                    FSDataOutputStream create = fileSystem.create(path, false);
                                    Throwable th2 = null;
                                    if (!create.hasCapability("hflush")) {
                                        throw new ConnectionError("The backing filesystem only supports transaction batch sizes of 1, but " + this.transactionBatchSize + " was requested.");
                                    }
                                    fileSystem.deleteOnExit(path);
                                    if (create != null) {
                                        if (0 != 0) {
                                            try {
                                                create.close();
                                            } catch (Throwable th3) {
                                                th2.addSuppressed(th3);
                                            }
                                        } else {
                                            create.close();
                                        }
                                    }
                                } catch (IOException e) {
                                    throw new ConnectionError("Could not create path for database", e);
                                }
                            } catch (Throwable th4) {
                                if (r11 != 0) {
                                    if (r12 != 0) {
                                        try {
                                            r11.close();
                                        } catch (Throwable th5) {
                                            r12.addSuppressed(th5);
                                        }
                                    } else {
                                        r11.close();
                                    }
                                }
                                throw th4;
                            }
                        }
                        if (fileSystem != null) {
                            if (0 != 0) {
                                try {
                                    fileSystem.close();
                                } catch (Throwable th6) {
                                    th.addSuppressed(th6);
                                }
                            } else {
                                fileSystem.close();
                            }
                        }
                    } finally {
                    }
                } catch (IOException e2) {
                    throw new ConnectionError("Could not retrieve FileSystem of table", e2);
                }
            }
        } catch (Exception e3) {
            LOG.warn("Unable to validate the table for connection: " + toConnectionInfoString(), e3);
            throw new InvalidTable(this.database, this.table, e3);
        }
    }

    private void beginNextTransaction() throws StreamingException {
        if (this.currentTransactionBatch == null) {
            this.currentTransactionBatch = createNewTransactionBatch();
            LOG.info("Opened new transaction batch {}", this.currentTransactionBatch);
        }
        if (this.currentTransactionBatch.isClosed()) {
            throw new StreamingException("Cannot begin next transaction on a closed streaming connection");
        }
        if (this.currentTransactionBatch.remainingTransactions() == 0) {
            LOG.info("Transaction batch {} is done. Rolling over to next transaction batch.", this.currentTransactionBatch);
            closeCurrentTransactionBatch();
            this.currentTransactionBatch = createNewTransactionBatch();
            LOG.info("Rolled over to new transaction batch {}", this.currentTransactionBatch);
        }
        this.currentTransactionBatch.beginNextTransaction();
    }

    private StreamingTransaction createNewTransactionBatch() throws StreamingException {
        this.countTransactions++;
        if (this.manageTransactions) {
            return new TransactionBatch(this);
        }
        if (this.countTransactions > 1) {
            throw new StreamingException("If a writeId is passed for the construction of HiveStreaming only one transaction batch can be done");
        }
        return new UnManagedSingleTransaction(this);
    }

    private void checkClosedState() throws StreamingException {
        if (this.isConnectionClosed.get()) {
            throw new StreamingException("Streaming connection is closed already.");
        }
    }

    private void checkState() throws StreamingException {
        checkClosedState();
        if (this.currentTransactionBatch == null) {
            throw new StreamingException("Transaction batch is null. Missing beginTransaction?");
        }
        if (this.currentTransactionBatch.getCurrentTransactionState() != TxnState.OPEN) {
            throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?");
        }
    }

    private void closeCurrentTransactionBatch() throws StreamingException {
        this.currentTransactionBatch.close();
        this.writePaths.clear();
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void beginTransaction() throws StreamingException {
        checkClosedState();
        this.partitions = new HashSet();
        beginNextTransaction();
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void commitTransaction() throws StreamingException {
        commitTransaction(null);
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void commitTransaction(Set<String> set) throws StreamingException {
        commitTransaction(set, null, null);
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void commitTransaction(Set<String> set, String str, String str2) throws StreamingException {
        checkState();
        HashSet hashSet = new HashSet();
        if (set != null) {
            for (String str3 : set) {
                try {
                    if (!createPartitionIfNotExists(Warehouse.getPartValuesFromPartName(str3)).isExists()) {
                        hashSet.add(str3);
                    }
                } catch (MetaException e) {
                    throw new StreamingException("Partition " + str3 + " is invalid.", e);
                }
            }
            this.connectionStats.incrementTotalPartitions(set.size());
        }
        this.currentTransactionBatch.commit(hashSet, str, str2);
        this.partitions.addAll(this.currentTransactionBatch.getPartitions());
        this.connectionStats.incrementCreatedPartitions(hashSet.size());
        this.connectionStats.incrementCommittedTransactions();
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void abortTransaction() throws StreamingException {
        checkState();
        this.currentTransactionBatch.abort();
        this.connectionStats.incrementAbortedTransactions();
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void write(byte[] bArr) throws StreamingException {
        checkState();
        this.currentTransactionBatch.write(bArr);
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void write(InputStream inputStream) throws StreamingException {
        checkState();
        this.currentTransactionBatch.write(inputStream);
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void close() {
        if (this.isConnectionClosed.get()) {
            return;
        }
        this.isConnectionClosed.set(true);
        try {
            try {
                if (this.currentTransactionBatch != null) {
                    closeCurrentTransactionBatch();
                }
                if (this.manageTransactions) {
                    getMSC().close();
                    getHeatbeatMSC().close();
                    try {
                        Hive.get(this.conf).getSynchronizedMSC().close();
                    } catch (Exception e) {
                        LOG.warn("Error while closing HMS connection", e);
                    }
                }
                if (!ShutdownHookManager.isShutdownInProgress()) {
                    ShutdownHookManager.removeShutdownHook(this.onShutdownRunner);
                }
            } catch (StreamingException e2) {
                LOG.warn("Unable to close current transaction batch: " + this.currentTransactionBatch, e2);
                if (this.manageTransactions) {
                    getMSC().close();
                    getHeatbeatMSC().close();
                    try {
                        Hive.get(this.conf).getSynchronizedMSC().close();
                    } catch (Exception e3) {
                        LOG.warn("Error while closing HMS connection", e3);
                    }
                }
                if (!ShutdownHookManager.isShutdownInProgress()) {
                    ShutdownHookManager.removeShutdownHook(this.onShutdownRunner);
                }
            }
            LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
        } catch (Throwable th) {
            if (this.manageTransactions) {
                getMSC().close();
                getHeatbeatMSC().close();
                try {
                    Hive.get(this.conf).getSynchronizedMSC().close();
                } catch (Exception e4) {
                    LOG.warn("Error while closing HMS connection", e4);
                }
            }
            if (!ShutdownHookManager.isShutdownInProgress()) {
                ShutdownHookManager.removeShutdownHook(this.onShutdownRunner);
            }
            throw th;
        }
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public ConnectionStats getConnectionStats() {
        return this.connectionStats;
    }

    private static IMetaStoreClient getMetaStoreClient(HiveConf hiveConf, String str, boolean z, String str2) throws ConnectionError {
        if (str != null) {
            hiveConf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), str);
        }
        if (z) {
            hiveConf.setBoolean(MetastoreConf.ConfVars.USE_THRIFT_SASL.getHiveName(), true);
        }
        try {
            LOG.info("Creating metastore client for {}", str2);
            return HiveMetaStoreUtils.getHiveMetastoreClient(hiveConf);
        } catch (MetaException | IOException e) {
            throw new ConnectionError("Error connecting to Hive Metastore URI: " + str + ". " + e.getMessage(), (Exception) e);
        }
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void addWriteDirectoryInfo(List<String> list, Path path) {
        String fullyQualifiedName = list == null ? this.tableObject.getFullyQualifiedName() : list.toString();
        if (!this.writePaths.containsKey(fullyQualifiedName)) {
            this.writePaths.put(fullyQualifiedName, new WriteDirInfo(list, path));
            return;
        }
        WriteDirInfo writeDirInfo = this.writePaths.get(fullyQualifiedName);
        if (!$assertionsDisabled && !writeDirInfo.getWriteDir().equals(path)) {
            throw new AssertionError();
        }
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public void addWriteNotificationEvents() throws StreamingException {
        if (!this.conf.getBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML)) {
            LOG.debug("Write notification log is ignored as dml event logging is disabled.");
            return;
        }
        try {
            Long currentTxnId = getCurrentTxnId();
            Long currentWriteId = getCurrentWriteId();
            for (WriteDirInfo writeDirInfo : this.writePaths.values()) {
                LOG.debug("TxnId: " + currentTxnId + ", WriteId: " + currentWriteId + " - Logging write event for the files in path " + writeDirInfo.getWriteDir());
                List listLocatedFileStatus = HdfsUtils.listLocatedFileStatus(this.tableObject.getDataLocation().getFileSystem(this.conf), writeDirInfo.getWriteDir(), (PathFilter) null, true);
                if (listLocatedFileStatus.isEmpty()) {
                    LOG.debug("TxnId: " + currentTxnId + ", WriteId: " + currentWriteId + " - Skipping empty path " + writeDirInfo.getWriteDir());
                } else {
                    Hive.addWriteNotificationLog(this.conf, this.tableObject, writeDirInfo.getPartitionVals(), currentTxnId, currentWriteId, listLocatedFileStatus, (List) null);
                }
            }
        } catch (IOException | TException | HiveException e) {
            throw new StreamingException("Failed to log write notification events.", e);
        }
    }

    @VisibleForTesting
    TxnState getCurrentTransactionState() {
        return this.currentTransactionBatch.getCurrentTransactionState();
    }

    @VisibleForTesting
    int remainingTransactions() {
        return this.currentTransactionBatch.remainingTransactions();
    }

    @VisibleForTesting
    Long getCurrentTxnId() {
        return Long.valueOf(this.currentTransactionBatch.getCurrentTxnId());
    }

    private HiveConf createHiveConf(Class<?> cls, String str) {
        HiveConf hiveConf = new HiveConf(cls);
        if (str != null) {
            hiveConf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), str);
        }
        return hiveConf;
    }

    private void overrideConfSettings(HiveConf hiveConf) {
        setHiveConf(hiveConf, HiveConf.ConfVars.HIVE_TXN_MANAGER, DbTxnManager.class.getName());
        setHiveConf(hiveConf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
        setHiveConf(hiveConf, MetastoreConf.ConfVars.EXECUTE_SET_UGI.getHiveName());
        setHiveConf(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
        if (this.streamingOptimizations) {
            setHiveConf(hiveConf, HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED, true);
        }
        setHiveConf(hiveConf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_ENABLED, false);
    }

    private static void setHiveConf(HiveConf hiveConf, HiveConf.ConfVars confVars, String str) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + confVars + " = " + str);
        }
        hiveConf.setVar(confVars, str);
    }

    private static void setHiveConf(HiveConf hiveConf, HiveConf.ConfVars confVars, boolean z) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + confVars + " = " + z);
        }
        hiveConf.setBoolVar(confVars, z);
    }

    private static void setHiveConf(HiveConf hiveConf, String str) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Overriding HiveConf setting : " + str + " = true");
        }
        hiveConf.setBoolean(str, true);
    }

    public List<TxnToWriteId> getTxnToWriteIds() {
        if (this.currentTransactionBatch != null) {
            return this.currentTransactionBatch.getTxnToWriteIds();
        }
        return null;
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public HiveConf getHiveConf() {
        return this.conf;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public String getMetastoreUri() {
        return this.metastoreUri;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public Table getTable() {
        return this.tableObject;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public List<String> getStaticPartitionValues() {
        return this.staticPartitionValues;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public String getAgentInfo() {
        return this.agentInfo;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public boolean isPartitionedTable() {
        return this.isPartitionedTable;
    }

    @Override // org.apache.hive.streaming.ConnectionInfo
    public boolean isDynamicPartitioning() {
        return isPartitionedTable() && (this.staticPartitionValues == null || this.staticPartitionValues.isEmpty());
    }

    @Override // org.apache.hive.streaming.StreamingConnection
    public Set<String> getPartitions() {
        return this.partitions;
    }

    public String getUsername() {
        return this.username;
    }

    public String getDatabase() {
        return this.database;
    }

    public RecordWriter getRecordWriter() {
        return this.recordWriter;
    }

    public int getTransactionBatchSize() {
        return this.transactionBatchSize;
    }

    public HiveConf getConf() {
        return this.conf;
    }

    public Long getWriteId() {
        return this.writeId;
    }

    public Integer getStatementId() {
        return this.statementId;
    }

    public Long getCurrentWriteId() {
        return Long.valueOf(this.currentTransactionBatch.getCurrentWriteId());
    }

    static {
        $assertionsDisabled = !HiveStreamingConnection.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(HiveStreamingConnection.class.getName());
    }
}
