package org.apache.hadoop.hive.kafka;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.kafka.KafkaOutputFormat;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.ReflectionUtil;
import org.apache.kafkaesqueesqueesque.clients.producer.ProducerRecord;
import org.apache.kafkaesqueesqueesque.common.config.ConfigDef;
import org.apache.kafkaesqueesqueesque.common.errors.AuthenticationException;
import org.apache.kafkaesqueesqueesque.common.errors.AuthorizationException;
import org.apache.kafkaesqueesqueesque.common.errors.InvalidTopicException;
import org.apache.kafkaesqueesqueesque.common.errors.OffsetMetadataTooLarge;
import org.apache.kafkaesqueesqueesque.common.errors.SecurityDisabledException;
import org.apache.kafkaesqueesqueesque.common.errors.SerializationException;
import org.apache.kafkaesqueesqueesque.common.errors.UnknownServerException;
import org.apache.kafkaesqueesqueesque.common.security.auth.SecurityProtocol;
import org.apache.kafkaesqueesqueesque.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/hadoop/hive/kafka/KafkaUtils.class */
public final class KafkaUtils {
    private static final String JAAS_TEMPLATE = "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";";
    private static final String JAAS_TEMPLATE_SCRAM = "org.apache.kafkaesqueesqueesque.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\" serviceName=\"%s\" tokenauth=true;";
    static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
    static final String PRODUCER_CONFIGURATION_PREFIX = "kafka.producer";
    private static final Logger log = LoggerFactory.getLogger(KafkaUtils.class);
    static final Text KAFKA_DELEGATION_TOKEN_KEY = new Text("KAFKA_DELEGATION_TOKEN");
    private static final Set<String> SSL_CONFIG_KEYS = ImmutableSet.copyOf(new ConfigDef().withClientSslSupport().configKeys().keySet());
    static final Set<String> FORBIDDEN_PROPERTIES = new HashSet((Collection) ImmutableList.of("enable.auto.commit", "auto.offset.reset", "key.deserializer", "value.deserializer", "transactional.id", "key.serializer", "value.serializer"));

    private KafkaUtils() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Properties consumerProperties(Configuration configuration) {
        Properties properties = new Properties();
        properties.setProperty("client.id", Utilities.getTaskId(configuration));
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.offset.reset", "none");
        String str = configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config " + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        }
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        if (UserGroupInformation.isSecurityEnabled()) {
            addKerberosJaasConf(configuration, properties);
        }
        properties.putAll(extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX));
        setupKafkaSslProperties(configuration, properties);
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setupKafkaSslProperties(Configuration configuration, Properties properties) {
        copySSLProperties(configuration, properties);
        String str = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName());
        if (str == null || str.isEmpty()) {
            return;
        }
        String str2 = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD.getName());
        String str3 = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEYSTORE_PASSWORD.getName());
        String str4 = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEY_PASSWORD.getName());
        String var = HiveConf.getVar(configuration, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR);
        try {
            String str5 = configuration.get(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName());
            properties.setProperty("ssl.truststore.location", new File(var + "/" + new Path(str5).getName()).getAbsolutePath());
            writeStoreToLocal(configuration, str5, new File(var).getAbsolutePath());
            properties.setProperty("ssl.truststore.password", Utilities.getPasswdFromKeystore(str, str2));
            if (!str3.isEmpty()) {
                log.info("Kafka keystore configured, configuring local keystore");
                String str6 = configuration.get(KafkaTableProperties.HIVE_SSL_KEYSTORE_LOCATION_CONFIG.getName());
                properties.setProperty("ssl.keystore.location", new File(var + "/" + new Path(str6).getName()).getAbsolutePath());
                writeStoreToLocal(configuration, str6, new File(var).getAbsolutePath());
                properties.setProperty("ssl.keystore.password", Utilities.getPasswdFromKeystore(str, str3));
            }
            if (!str4.isEmpty()) {
                properties.setProperty("ssl.key.password", Utilities.getPasswdFromKeystore(str, str4));
            }
        } catch (IOException | URISyntaxException e) {
            throw new IllegalStateException("Unable to retrieve password from the credential keystore", e);
        }
    }

    private static void copySSLProperties(Configuration configuration, Properties properties) {
        for (String str : SSL_CONFIG_KEYS) {
            String str2 = configuration.get(str);
            if (str2 != null && !properties.containsKey(str)) {
                properties.setProperty(str, str2);
            }
        }
    }

    private static void writeStoreToLocal(Configuration configuration, String str, String str2) throws IOException, URISyntaxException {
        try {
            File file = new File(str2);
            if (!file.exists() && !file.mkdirs()) {
                throw new IOException("Unable to create local directory, " + str2);
            }
            FileSystem.get(new URI(str), configuration).copyToLocalFile(new Path(new URI(str).toString()), new Path(str2));
        } catch (URISyntaxException e) {
            throw new IOException("Unable to download store", e);
        }
    }

    private static Map<String, String> extractExtraProperties(Configuration configuration, String str) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry entry : configuration.getValByRegex("^" + str + "\\..*").entrySet()) {
            String substring = ((String) entry.getKey()).substring(str.length() + 1);
            if (FORBIDDEN_PROPERTIES.contains(substring)) {
                throw new IllegalArgumentException("Not suppose to set Kafka Property " + substring);
            }
            builder.put(substring, entry.getValue());
        }
        return builder.build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Properties producerProperties(Configuration configuration) {
        KafkaOutputFormat.WriteSemantic valueOf = KafkaOutputFormat.WriteSemantic.valueOf(configuration.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName()));
        Properties properties = new Properties();
        String str = configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config " + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        }
        properties.setProperty("bootstrap.servers", str);
        if (UserGroupInformation.isSecurityEnabled()) {
            addKerberosJaasConf(configuration, properties);
        }
        properties.putAll(extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX));
        setupKafkaSslProperties(configuration, properties);
        String str2 = configuration.get("mapred.task.id", (String) null);
        properties.setProperty("client.id", str2 == null ? "random_" + UUID.randomUUID().toString() : str2);
        switch (valueOf) {
            case AT_LEAST_ONCE:
                properties.setProperty("retries", String.valueOf(Integer.MAX_VALUE));
                properties.setProperty("acks", "all");
                break;
            case EXACTLY_ONCE:
                String taskId = getTaskId(configuration);
                properties.setProperty("acks", "all");
                properties.setProperty("retries", String.valueOf(Integer.MAX_VALUE));
                properties.setProperty("transactional.id", taskId);
                properties.setProperty("enable.idempotence", "true");
                break;
            default:
                throw new IllegalArgumentException("Unknown Semantic " + valueOf);
        }
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void copyDependencyJars(Configuration configuration, Class<?>... clsArr) throws IOException {
        HashSet hashSet = new HashSet();
        LocalFileSystem local = FileSystem.getLocal(configuration);
        hashSet.addAll(configuration.getStringCollection("tmpjars"));
        hashSet.addAll((Collection) Arrays.stream(clsArr).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(cls -> {
            String jarFinderGetJar = Utilities.jarFinderGetJar(cls);
            if (jarFinderGetJar == null) {
                throw new RuntimeException("Could not find jar for class " + cls + " in order to ship it to the cluster.");
            }
            try {
                if (local.exists(new Path(jarFinderGetJar))) {
                    return jarFinderGetJar;
                }
                throw new RuntimeException("Could not validate jar file " + jarFinderGetJar + " for class " + cls);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList()));
        if (hashSet.isEmpty()) {
            return;
        }
        configuration.set("tmpjars", StringUtils.arrayToString((String[]) hashSet.toArray(new String[0])));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static AbstractSerDe createDelegate(String str) {
        try {
            return (AbstractSerDe) ReflectionUtil.newInstance(Class.forName(str), (Configuration) null);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ProducerRecord<byte[], byte[]> toProducerRecord(String str, KafkaWritable kafkaWritable) {
        return new ProducerRecord<>(str, kafkaWritable.getPartition() != -1 ? Integer.valueOf(kafkaWritable.getPartition()) : null, kafkaWritable.getTimestamp() != -1 ? Long.valueOf(kafkaWritable.getTimestamp()) : null, kafkaWritable.getRecordKey(), kafkaWritable.getValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean exceptionIsFatal(Throwable th) {
        return ((th instanceof AuthenticationException) || (th instanceof AuthorizationException) || (th instanceof SecurityDisabledException)) || ((th instanceof InvalidTopicException) || (th instanceof UnknownServerException) || (th instanceof SerializationException) || (th instanceof OffsetMetadataTooLarge) || (th instanceof IllegalStateException));
    }

    static String getTaskId(Configuration configuration) {
        String str = (String) Preconditions.checkNotNull(configuration.get("mapred.task.id", (String) null));
        int lastIndexOf = str.lastIndexOf("_");
        return lastIndexOf != -1 ? str.substring(0, lastIndexOf) : str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addKerberosJaasConf(Configuration configuration, Properties properties) {
        Token token;
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        properties.setProperty("sasl.mechanism", "GSSAPI");
        properties.setProperty("sasl.kerberos.service.name", "kafka");
        String var = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
        String var2 = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
        if (var == null || var.isEmpty() || var2 == null || var2.isEmpty()) {
            var2 = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_FS_KERBEROS_KEYTAB_FILE);
            var = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_FS_KERBEROS_PRINCIPAL);
        }
        try {
            String format = String.format(JAAS_TEMPLATE, var2, SecurityUtil.getServerPrincipal(var, "0.0.0.0"));
            properties.setProperty("sasl.jaas.config", format);
            if ((configuration instanceof JobConf) && (token = ((JobConf) configuration).getCredentials().getToken(KAFKA_DELEGATION_TOKEN_KEY)) != null) {
                log.info("Kafka delegation token has been found: {}", token);
                properties.setProperty("sasl.mechanism", "SCRAM-SHA-256");
                format = String.format(JAAS_TEMPLATE_SCRAM, new String(token.getIdentifier()), Base64.getEncoder().encodeToString(token.getPassword()), token.getService());
                properties.setProperty("sasl.jaas.config", format);
            }
            log.info("Kafka client running with following JAAS = [{}]", format);
        } catch (IOException e) {
            log.error("Can not construct kerberos principal", e);
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static SecurityProtocol securityProtocol(Properties properties) {
        for (String str : new String[]{"security.protocol", "kafka.consumer.security.protocol", "kafka.producer.security.protocol"}) {
            String property = properties.getProperty(str);
            if (property != null && !property.isEmpty()) {
                return SecurityProtocol.forName(property);
            }
        }
        return null;
    }
}
