package io.confluent.connect.hdfs;

import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.filter.TopicCommittedFileFilter;
import io.confluent.connect.hdfs.hive.HiveMetaStore;
import io.confluent.connect.hdfs.hive.HiveUtil;
import io.confluent.connect.hdfs.partitioner.Partitioner;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.storage.Storage;
import io.confluent.connect.storage.StorageFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/hdfs/DataWriter.class */
public class DataWriter {
    private static final Logger log = LoggerFactory.getLogger(DataWriter.class);
    private static final Time SYSTEM_TIME = new SystemTime();
    private final Time time;
    private Map<TopicPartition, TopicPartitionWriter> topicPartitionWriters;
    private String url;
    private HdfsStorage storage;
    private String topicsDir;
    private Format format;
    private RecordWriterProvider writerProvider;
    private io.confluent.connect.storage.format.RecordWriterProvider<HdfsSinkConnectorConfig> newWriterProvider;
    private io.confluent.connect.storage.format.SchemaFileReader<HdfsSinkConnectorConfig, Path> schemaFileReader;
    private io.confluent.connect.storage.format.Format<HdfsSinkConnectorConfig, Path> newFormat;
    private Partitioner partitioner;
    private HdfsSinkConnectorConfig connectorConfig;
    private AvroData avroData;
    private SinkTaskContext context;
    private ExecutorService executorService;
    private String hiveDatabase;
    private HiveMetaStore hiveMetaStore;
    private HiveUtil hive;
    private Queue<Future<Void>> hiveUpdateFutures;
    private boolean hiveIntegration;
    private Thread ticketRenewThread;
    private volatile boolean isRunning;

    /* loaded from: input_file:io/confluent/connect/hdfs/DataWriter$PartitionerWrapper.class */
    public static class PartitionerWrapper implements Partitioner {
        public final io.confluent.connect.storage.partitioner.Partitioner<FieldSchema> partitioner;

        public PartitionerWrapper(io.confluent.connect.storage.partitioner.Partitioner<FieldSchema> partitioner) {
            this.partitioner = partitioner;
        }

        @Override // io.confluent.connect.hdfs.partitioner.Partitioner
        public void configure(Map<String, Object> map) {
            this.partitioner.configure(map);
        }

        @Override // io.confluent.connect.hdfs.partitioner.Partitioner
        public String encodePartition(SinkRecord sinkRecord) {
            return this.partitioner.encodePartition(sinkRecord);
        }

        @Override // io.confluent.connect.hdfs.partitioner.Partitioner
        public String generatePartitionedPath(String str, String str2) {
            return this.partitioner.generatePartitionedPath(str, str2);
        }

        @Override // io.confluent.connect.hdfs.partitioner.Partitioner
        public List<FieldSchema> partitionFields() {
            return this.partitioner.partitionFields();
        }
    }

    public DataWriter(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, SinkTaskContext sinkTaskContext, AvroData avroData) {
        this(hdfsSinkConnectorConfig, sinkTaskContext, avroData, SYSTEM_TIME);
    }

    public DataWriter(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, SinkTaskContext sinkTaskContext, AvroData avroData, Time time) {
        this.time = time;
        try {
            System.setProperty("hadoop.home.dir", hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HADOOP_HOME_CONFIG));
            this.connectorConfig = hdfsSinkConnectorConfig;
            this.avroData = avroData;
            this.context = sinkTaskContext;
            String string = hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HADOOP_CONF_DIR_CONFIG);
            log.info("Hadoop configuration directory {}", string);
            Configuration hadoopConfiguration = hdfsSinkConnectorConfig.getHadoopConfiguration();
            if (!string.equals("")) {
                hadoopConfiguration.addResource(new Path(string + "/core-site.xml"));
                hadoopConfiguration.addResource(new Path(string + "/hdfs-site.xml"));
            }
            if (hdfsSinkConnectorConfig.getBoolean(HdfsSinkConnectorConfig.HDFS_AUTHENTICATION_KERBEROS_CONFIG).booleanValue()) {
                SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, hadoopConfiguration);
                String string2 = hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.CONNECT_HDFS_PRINCIPAL_CONFIG);
                String string3 = hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.CONNECT_HDFS_KEYTAB_CONFIG);
                if (string2 == null || string3 == null) {
                    throw new ConfigException("Hadoop is using Kerberos for authentication, you need to provide both a connect principal and the path to the keytab of the principal.");
                }
                hadoopConfiguration.set("hadoop.security.authentication", "kerberos");
                hadoopConfiguration.set("hadoop.security.authorization", "true");
                String canonicalHostName = InetAddress.getLocalHost().getCanonicalHostName();
                String serverPrincipal = SecurityUtil.getServerPrincipal(hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HDFS_NAMENODE_PRINCIPAL_CONFIG), canonicalHostName);
                if (hadoopConfiguration.get("dfs.namenode.kerberos.principal") == null) {
                    hadoopConfiguration.set("dfs.namenode.kerberos.principal", serverPrincipal);
                }
                log.info("Hadoop namenode principal: " + hadoopConfiguration.get("dfs.namenode.kerberos.principal"));
                UserGroupInformation.setConfiguration(hadoopConfiguration);
                UserGroupInformation.loginUserFromKeytab(SecurityUtil.getServerPrincipal(string2, canonicalHostName), string3);
                final UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
                log.info("Login as: " + loginUser.getUserName());
                final long longValue = hdfsSinkConnectorConfig.getLong(HdfsSinkConnectorConfig.KERBEROS_TICKET_RENEW_PERIOD_MS_CONFIG).longValue();
                this.isRunning = true;
                this.ticketRenewThread = new Thread(new Runnable() { // from class: io.confluent.connect.hdfs.DataWriter.1
                    @Override // java.lang.Runnable
                    public void run() {
                        synchronized (DataWriter.this) {
                            while (DataWriter.this.isRunning) {
                                try {
                                    DataWriter.this.wait(longValue);
                                    if (DataWriter.this.isRunning) {
                                        loginUser.reloginFromKeytab();
                                    }
                                } catch (IOException e) {
                                    DataWriter.log.error("Error renewing the ticket", e);
                                } catch (InterruptedException e2) {
                                }
                            }
                        }
                    }
                });
                log.info("Starting the Kerberos ticket renew thread with period {}ms.", Long.valueOf(longValue));
                this.ticketRenewThread.start();
            }
            this.url = hdfsSinkConnectorConfig.getUrl();
            this.topicsDir = hdfsSinkConnectorConfig.getString("topics.dir");
            this.storage = (HdfsStorage) StorageFactory.createStorage(hdfsSinkConnectorConfig.getClass("storage.class"), HdfsSinkConnectorConfig.class, hdfsSinkConnectorConfig, this.url);
            createDir(this.topicsDir);
            createDir(this.topicsDir + HdfsSinkConnectorConstants.TEMPFILE_DIRECTORY);
            createDir(hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.LOGS_DIR_CONFIG));
            try {
                this.newFormat = (io.confluent.connect.storage.format.Format) hdfsSinkConnectorConfig.getClass("format.class").getConstructor(HdfsStorage.class).newInstance(this.storage);
                this.newWriterProvider = this.newFormat.getRecordWriterProvider();
                this.schemaFileReader = this.newFormat.getSchemaFileReader();
            } catch (NoSuchMethodException e) {
                this.format = (Format) hdfsSinkConnectorConfig.getClass("format.class").getConstructor(new Class[0]).newInstance(new Object[0]);
                this.writerProvider = this.format.getRecordWriterProvider();
                final SchemaFileReader schemaFileReader = this.format.getSchemaFileReader(avroData);
                this.schemaFileReader = new io.confluent.connect.storage.format.SchemaFileReader<HdfsSinkConnectorConfig, Path>() { // from class: io.confluent.connect.hdfs.DataWriter.2
                    public Schema getSchema(HdfsSinkConnectorConfig hdfsSinkConnectorConfig2, Path path) {
                        try {
                            return schemaFileReader.getSchema(hdfsSinkConnectorConfig2.getHadoopConfiguration(), path);
                        } catch (IOException e2) {
                            throw new ConnectException("Failed to get schema", e2);
                        }
                    }

                    public Iterator<Object> iterator() {
                        throw new UnsupportedOperationException();
                    }

                    public boolean hasNext() {
                        throw new UnsupportedOperationException();
                    }

                    public Object next() {
                        throw new UnsupportedOperationException();
                    }

                    public void remove() {
                        throw new UnsupportedOperationException();
                    }

                    public void close() throws IOException {
                    }
                };
            }
            this.partitioner = newPartitioner(hdfsSinkConnectorConfig);
            this.hiveIntegration = hdfsSinkConnectorConfig.getBoolean("hive.integration").booleanValue();
            if (this.hiveIntegration) {
                this.hiveDatabase = hdfsSinkConnectorConfig.getString("hive.database");
                this.hiveMetaStore = new HiveMetaStore(hadoopConfiguration, hdfsSinkConnectorConfig);
                if (this.format != null) {
                    this.hive = this.format.getHiveUtil(hdfsSinkConnectorConfig, this.hiveMetaStore);
                } else {
                    if (this.newFormat == null) {
                        throw new ConnectException("One of old or new format classes must be provided");
                    }
                    final io.confluent.connect.storage.hive.HiveUtil createHiveUtil = this.newFormat.getHiveFactory().createHiveUtil(hdfsSinkConnectorConfig, this.hiveMetaStore);
                    this.hive = new HiveUtil(hdfsSinkConnectorConfig, this.hiveMetaStore) { // from class: io.confluent.connect.hdfs.DataWriter.3
                        @Override // io.confluent.connect.hdfs.hive.HiveUtil
                        public void createTable(String str, String str2, Schema schema, Partitioner partitioner) {
                            createHiveUtil.createTable(str, str2, schema, partitioner);
                        }

                        public void alterSchema(String str, String str2, Schema schema) {
                            createHiveUtil.alterSchema(str, str2, schema);
                        }
                    };
                }
                this.executorService = Executors.newSingleThreadExecutor();
                this.hiveUpdateFutures = new LinkedList();
            }
            this.topicPartitionWriters = new HashMap();
            for (TopicPartition topicPartition : sinkTaskContext.assignment()) {
                this.topicPartitionWriters.put(topicPartition, new TopicPartitionWriter(topicPartition, this.storage, this.writerProvider, this.newWriterProvider, this.partitioner, hdfsSinkConnectorConfig, sinkTaskContext, avroData, this.hiveMetaStore, this.hive, this.schemaFileReader, this.executorService, this.hiveUpdateFutures, time));
            }
        } catch (IOException e2) {
            throw new ConnectException(e2);
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e3) {
            throw new ConnectException("Reflection exception: ", e3);
        }
    }

    public void write(Collection<SinkRecord> collection) {
        Future<Void> next;
        for (SinkRecord sinkRecord : collection) {
            this.topicPartitionWriters.get(new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue())).buffer(sinkRecord);
        }
        if (this.hiveIntegration) {
            Iterator<Future<Void>> it = this.hiveUpdateFutures.iterator();
            while (it.hasNext()) {
                try {
                    next = it.next();
                } catch (InterruptedException e) {
                } catch (ExecutionException e2) {
                    throw new RuntimeException(e2);
                }
                if (!next.isDone()) {
                    break;
                }
                next.get();
                it.remove();
            }
        }
        Iterator<TopicPartition> it2 = this.topicPartitionWriters.keySet().iterator();
        while (it2.hasNext()) {
            this.topicPartitionWriters.get(it2.next()).write();
        }
    }

    public void recover(TopicPartition topicPartition) {
        this.topicPartitionWriters.get(topicPartition).recover();
    }

    public void syncWithHive() throws ConnectException {
        HashSet<String> hashSet = new HashSet();
        Iterator<TopicPartition> it = this.topicPartitionWriters.keySet().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().topic());
        }
        try {
            for (String str : hashSet) {
                String str2 = FileUtils.topicDirectory(this.url, this.topicsDir, str);
                FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset(this.storage, new Path(str2), new TopicCommittedFileFilter(str));
                if (fileStatusWithMaxOffset != null) {
                    this.hive.createTable(this.hiveDatabase, str, this.schemaFileReader.getSchema(this.connectorConfig, fileStatusWithMaxOffset.getPath()), this.partitioner);
                    List listPartitions = this.hiveMetaStore.listPartitions(this.hiveDatabase, str, (short) -1);
                    for (FileStatus fileStatus : FileUtils.getDirectories(this.storage, new Path(str2))) {
                        String path = fileStatus.getPath().toString();
                        if (!listPartitions.contains(path)) {
                            this.hiveMetaStore.addPartition(this.hiveDatabase, str, getPartitionValue(path));
                        }
                    }
                }
            }
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    public void open(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            this.topicPartitionWriters.put(topicPartition, new TopicPartitionWriter(topicPartition, this.storage, this.writerProvider, this.newWriterProvider, this.partitioner, this.connectorConfig, this.context, this.avroData, this.hiveMetaStore, this.hive, this.schemaFileReader, this.executorService, this.hiveUpdateFutures, this.time));
            recover(topicPartition);
        }
    }

    public void close() {
        for (TopicPartition topicPartition : this.topicPartitionWriters.keySet()) {
            try {
                this.topicPartitionWriters.get(topicPartition).close();
            } catch (ConnectException e) {
                log.error("Error closing writer for {}. Error: {}", topicPartition, e.getMessage());
            }
        }
        this.topicPartitionWriters.clear();
    }

    public void stop() {
        if (this.executorService != null) {
            boolean z = false;
            try {
                log.info("Shutting down Hive executor service.");
                this.executorService.shutdown();
                long longValue = this.connectorConfig.getLong("shutdown.timeout.ms").longValue();
                log.info("Awaiting termination.");
                z = this.executorService.awaitTermination(longValue, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            if (!z) {
                log.warn("Unclean Hive executor service shutdown, you probably need to sync with Hive next time you start the connector");
                this.executorService.shutdownNow();
            }
        }
        this.storage.close();
        if (this.ticketRenewThread != null) {
            synchronized (this) {
                this.isRunning = false;
                notifyAll();
            }
        }
    }

    public Partitioner getPartitioner() {
        return this.partitioner;
    }

    public Map<TopicPartition, Long> getCommittedOffsets() {
        HashMap hashMap = new HashMap();
        log.debug("Writer looking for last offsets for topic partitions {}", this.topicPartitionWriters.keySet());
        for (TopicPartition topicPartition : this.topicPartitionWriters.keySet()) {
            long offset = this.topicPartitionWriters.get(topicPartition).offset();
            log.debug("Writer found last offset {} for topic partition {}", Long.valueOf(offset), topicPartition);
            if (offset >= 0) {
                hashMap.put(topicPartition, Long.valueOf(offset));
            }
        }
        return hashMap;
    }

    public TopicPartitionWriter getBucketWriter(TopicPartition topicPartition) {
        return this.topicPartitionWriters.get(topicPartition);
    }

    public Storage getStorage() {
        return this.storage;
    }

    Map<String, io.confluent.connect.storage.format.RecordWriter> getWriters(TopicPartition topicPartition) {
        return this.topicPartitionWriters.get(topicPartition).getWriters();
    }

    public Map<String, String> getTempFileNames(TopicPartition topicPartition) {
        return this.topicPartitionWriters.get(topicPartition).getTempFiles();
    }

    private void createDir(String str) {
        String str2 = this.url + "/" + str;
        if (this.storage.exists(str2)) {
            return;
        }
        this.storage.create(str2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [io.confluent.connect.hdfs.partitioner.Partitioner] */
    private Partitioner newPartitioner(HdfsSinkConnectorConfig hdfsSinkConnectorConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        PartitionerWrapper partitionerWrapper;
        try {
            partitionerWrapper = (Partitioner) hdfsSinkConnectorConfig.getClass("partitioner.class").newInstance();
        } catch (ClassCastException e) {
            partitionerWrapper = new PartitionerWrapper((io.confluent.connect.storage.partitioner.Partitioner) hdfsSinkConnectorConfig.getClass("partitioner.class").newInstance());
        }
        partitionerWrapper.configure(new HashMap(hdfsSinkConnectorConfig.plainValues()));
        return partitionerWrapper;
    }

    private String getPartitionValue(String str) {
        String[] split = str.split("/");
        StringBuilder sb = new StringBuilder();
        sb.append("/");
        for (int i = 3; i < split.length; i++) {
            sb.append(split[i]);
            sb.append("/");
        }
        return sb.toString();
    }

    private Partitioner createPartitioner(HdfsSinkConnectorConfig hdfsSinkConnectorConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class<?> cls = Class.forName(hdfsSinkConnectorConfig.getString("partitioner.class"));
        Map<String, Object> copyConfig = copyConfig(hdfsSinkConnectorConfig);
        Partitioner partitioner = (Partitioner) cls.newInstance();
        partitioner.configure(copyConfig);
        return partitioner;
    }

    private Map<String, Object> copyConfig(HdfsSinkConnectorConfig hdfsSinkConnectorConfig) {
        HashMap hashMap = new HashMap();
        hashMap.put("partition.field.name", hdfsSinkConnectorConfig.getString("partition.field.name"));
        hashMap.put("partition.duration.ms", hdfsSinkConnectorConfig.getLong("partition.duration.ms"));
        hashMap.put("path.format", hdfsSinkConnectorConfig.getString("path.format"));
        hashMap.put("locale", hdfsSinkConnectorConfig.getString("locale"));
        hashMap.put("timezone", hdfsSinkConnectorConfig.getString("timezone"));
        return hashMap;
    }
}
