package org.apache.hadoop.hive.kafka;

import java.io.IOException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.tez.DagCredentialSupplier;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.kafkaesqueesqueesque.clients.admin.AdminClient;
import org.apache.kafkaesqueesqueesque.clients.admin.CreateDelegationTokenOptions;
import org.apache.kafkaesqueesqueesque.common.security.auth.SecurityProtocol;
import org.apache.kafkaesqueesqueesque.common.security.token.delegation.DelegationToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.class */
public class KafkaDagCredentialSupplier implements DagCredentialSupplier {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaDagCredentialSupplier.class);

    public Token<?> obtainToken(BaseWork baseWork, Set<TableDesc> set, Configuration configuration) {
        if (!(baseWork instanceof MapWork)) {
            return null;
        }
        PartitionDesc partitionDesc = (PartitionDesc) ((MapWork) baseWork).getAliasToPartnInfo().values().stream().findFirst().orElse(null);
        if (partitionDesc != null) {
            TableDesc tableDesc = partitionDesc.getTableDesc();
            if (isTokenRequired(tableDesc)) {
                return getKafkaDelegationTokenForBrokers(configuration, tableDesc);
            }
        }
        for (TableDesc tableDesc2 : set) {
            if (isTokenRequired(tableDesc2)) {
                return getKafkaDelegationTokenForBrokers(configuration, tableDesc2);
            }
        }
        return null;
    }

    public Text getTokenAlias() {
        return KafkaUtils.KAFKA_DELEGATION_TOKEN_KEY;
    }

    private boolean isTokenRequired(TableDesc tableDesc) {
        return (StringUtils.isEmpty(tableDesc.getProperties().getProperty(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName())) || SecurityProtocol.PLAINTEXT == KafkaUtils.securityProtocol(tableDesc.getProperties())) ? false : true;
    }

    private Token<?> getKafkaDelegationTokenForBrokers(Configuration configuration, TableDesc tableDesc) {
        Properties properties = tableDesc.getProperties();
        String str = (String) properties.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        LOG.info("Getting kafka credentials for brokers: {}", str);
        String var = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
        try {
            String serverPrincipal = SecurityUtil.getServerPrincipal(HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), "0.0.0.0");
            Properties properties2 = new Properties();
            properties2.put("bootstrap.servers", str);
            SecurityProtocol securityProtocol = KafkaUtils.securityProtocol(properties);
            if (securityProtocol == null) {
                securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
                LOG.warn("Kafka security.protocol is undefined in table properties. Using default {}", securityProtocol.name);
            }
            properties2.put("security.protocol", securityProtocol.name);
            String format = String.format("%s %s %s %s serviceName=\"%s\" keyTab=\"%s\" principal=\"%s\";", "com.sun.security.auth.module.Krb5LoginModule required", "debug=true", "useKeyTab=true", "storeKey=true", "kafka", var, serverPrincipal);
            properties2.put("sasl.jaas.config", format);
            LOG.debug("Jaas config for requesting kafka credentials: {}", format);
            Configuration configuration2 = new Configuration(configuration);
            properties.stringPropertyNames().forEach(str2 -> {
                configuration2.set(str2, properties.getProperty(str2));
            });
            KafkaUtils.setupKafkaSslProperties(configuration2, properties2);
            CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions();
            try {
                AdminClient create = AdminClient.create(properties2);
                Throwable th = null;
                try {
                    try {
                        DelegationToken delegationToken = create.createDelegationToken(createDelegationTokenOptions).delegationToken().get();
                        LOG.info("Got kafka delegation token: {}", delegationToken);
                        Token<?> token = new Token<>(delegationToken.tokenInfo().tokenId().getBytes(), delegationToken.hmac(), (Text) null, new Text("kafka"));
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                create.close();
                            }
                        }
                        return token;
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (create != null) {
                        if (th != null) {
                            try {
                                create.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            create.close();
                        }
                    }
                    throw th3;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while getting Kafka token", e);
            } catch (ExecutionException e2) {
                throw new RuntimeException("Exception while getting Kafka token", e2);
            }
        } catch (IOException e3) {
            throw new RuntimeException(e3);
        }
    }
}
