package org.apache.kylin.metrics.lib.impl.hive;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
import org.apache.kylin.metrics.lib.Record;
import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord;
import org.apache.kylin.source.hive.HiveMetaStoreClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-metrics-reporter-hive-4.0.0-alpha.jar:org/apache/kylin/metrics/lib/impl/hive/HiveProducer.class */
public class HiveProducer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) HiveProducer.class);
    private static final int CACHE_MAX_SIZE = 10;
    private final HiveConf hiveConf;
    private FileSystem fs;
    private final LoadingCache<Pair<String, String>, Pair<String, List<FieldSchema>>> tableFieldSchemaCache;
    private final String contentFilePrefix;
    private String metricType;
    private String prePartitionPath;
    private Path curPartitionContentPath;
    private int id;
    private FSDataOutputStream fout;

    public HiveProducer(String str, Properties properties) throws Exception {
        this(str, properties, new HiveConf());
    }

    HiveProducer(String str, Properties properties, HiveConf hiveConf) throws Exception {
        String str2;
        this.id = 0;
        this.metricType = str;
        this.hiveConf = hiveConf;
        for (Map.Entry entry : properties.entrySet()) {
            this.hiveConf.set(entry.getKey().toString(), entry.getValue().toString());
        }
        this.fs = FileSystem.get(this.hiveConf);
        this.tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<Pair<String, String>, Pair<String, List<FieldSchema>>>() { // from class: org.apache.kylin.metrics.lib.impl.hive.HiveProducer.2
            @Override // com.google.common.cache.RemovalListener
            public void onRemoval(RemovalNotification<Pair<String, String>, Pair<String, List<FieldSchema>>> removalNotification) {
                HiveProducer.logger.info("Field schema with table " + ActiveReservoirReporter.getTableName(removalNotification.getKey()) + " is removed due to " + removalNotification.getCause());
            }
        }).maximumSize(10L).build(new CacheLoader<Pair<String, String>, Pair<String, List<FieldSchema>>>() { // from class: org.apache.kylin.metrics.lib.impl.hive.HiveProducer.1
            @Override // com.google.common.cache.CacheLoader
            public Pair<String, List<FieldSchema>> load(Pair<String, String> pair) throws Exception {
                IMetaStoreClient hiveMetaStoreClient = HiveMetaStoreClientFactory.getHiveMetaStoreClient(HiveProducer.this.hiveConf);
                String location = hiveMetaStoreClient.getTable(pair.getFirst(), pair.getSecond()).getSd().getLocation();
                List fields = hiveMetaStoreClient.getFields(pair.getFirst(), pair.getSecond());
                hiveMetaStoreClient.close();
                return new Pair<>(location, fields);
            }
        });
        try {
            str2 = InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            str2 = "UNKNOWN";
        }
        this.contentFilePrefix = str2 + "-" + System.currentTimeMillis() + "-part-";
    }

    public void close() {
        this.tableFieldSchemaCache.cleanUp();
    }

    public void send(Record record) throws Exception {
        HiveProducerRecord convertTo = convertTo(record);
        write(convertTo.key(), Lists.newArrayList(convertTo));
    }

    public void send(List<Record> list) throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        Iterator<Record> it2 = list.iterator();
        while (it2.hasNext()) {
            HiveProducerRecord convertTo = convertTo(it2.next());
            if (newHashMap.get(convertTo.key()) == null) {
                newHashMap.put(convertTo.key(), Lists.newLinkedList());
            }
            ((List) newHashMap.get(convertTo.key())).add(convertTo);
        }
        for (Map.Entry entry : newHashMap.entrySet()) {
            write((HiveProducerRecord.RecordKey) entry.getKey(), (Iterable) entry.getValue());
        }
    }

    private void write(HiveProducerRecord.RecordKey recordKey, Iterable<HiveProducerRecord> iterable) throws Exception {
        String first = this.tableFieldSchemaCache.get(new Pair<>(recordKey.database(), recordKey.table())).getFirst();
        StringBuilder sb = new StringBuilder();
        sb.append(first);
        for (Map.Entry<String, String> entry : recordKey.partition().entrySet()) {
            sb.append("/");
            sb.append(entry.getKey().toLowerCase(Locale.ROOT));
            sb.append("=");
            sb.append(entry.getValue());
        }
        Path path = new Path(sb.toString());
        if (path.toUri().getScheme() != null && !path.toUri().toString().startsWith(this.fs.getUri().toString())) {
            this.fs.close();
            this.fs = path.getFileSystem(this.hiveConf);
        }
        if (!this.fs.exists(path)) {
            StringBuilder sb2 = new StringBuilder();
            sb2.append("ALTER TABLE ");
            sb2.append(recordKey.database() + "." + recordKey.table());
            sb2.append(" ADD IF NOT EXISTS PARTITION (");
            boolean z = true;
            for (Map.Entry<String, String> entry2 : recordKey.partition().entrySet()) {
                if (z) {
                    z = false;
                } else {
                    sb2.append(",");
                }
                sb2.append(entry2.getKey().toLowerCase(Locale.ROOT));
                sb2.append("='" + entry2.getValue() + "'");
            }
            sb2.append(")");
            logger.debug("create partition by {}.", sb2);
            Driver driver = new Driver(this.hiveConf);
            CliSessionState cliSessionState = new CliSessionState(this.hiveConf);
            SessionState.start(cliSessionState);
            CommandProcessorResponse run = driver.run(sb2.toString());
            if (run.getResponseCode() != 0) {
                logger.warn("Fail to add partition. HQL: {}; Cause by: {}", sb2.toString(), run.toString());
            }
            cliSessionState.close();
            driver.close();
        }
        if (this.fout == null || this.prePartitionPath == null || this.prePartitionPath.compareTo(path.toString()) != 0) {
            if (this.fout != null) {
                logger.debug("Flush output stream of previous partition path {}. Using a new one {}. ", this.prePartitionPath, path);
                closeFout();
            }
            Path path2 = new Path(path, this.contentFilePrefix + String.format(Locale.ROOT, "%04d", Integer.valueOf(this.id)));
            logger.info("Try to use new partition content path: {} for metric: {}", path2, this.metricType);
            if (!this.fs.exists(path2)) {
                int i = 0;
                while (!this.fs.createNewFile(path2)) {
                    int i2 = i;
                    i++;
                    if (i2 >= 5 || this.fs.exists(path2)) {
                        break;
                    } else {
                        Thread.sleep(500 * i);
                    }
                }
                if (!this.fs.exists(path2)) {
                    throw new IllegalStateException("Fail to create HDFS file: " + path2 + " after " + i + " retries");
                }
            }
            this.fout = this.fs.append(path2);
            this.prePartitionPath = path.toString();
            this.curPartitionContentPath = path2;
            this.id = (this.id + 1) % 10;
        }
        try {
            int i3 = 0;
            Iterator<HiveProducerRecord> it2 = iterable.iterator();
            while (it2.hasNext()) {
                this.fout.writeBytes(it2.next().valueToString() + "\n");
                i3++;
            }
            logger.info("Success to write {} metrics ({}) to file {}", Integer.valueOf(i3), this.metricType, this.curPartitionContentPath);
        } catch (IOException e) {
            logger.error("Fails to write metrics(" + this.metricType + ") to file " + this.curPartitionContentPath.toString() + " due to ", (Throwable) e);
            closeFout();
        }
    }

    private void closeFout() {
        if (this.fout != null) {
            try {
                this.fout.close();
            } catch (Exception e) {
                logger.error("Close the path: " + this.curPartitionContentPath + " failed", (Throwable) e);
                if (this.fs instanceof DistributedFileSystem) {
                    try {
                        this.fs.recoverLease(this.curPartitionContentPath);
                    } catch (Exception e2) {
                        logger.error("Recover lease for path: " + this.curPartitionContentPath + " failed", (Throwable) e2);
                    }
                }
            }
        }
        this.fout = null;
    }

    private HiveProducerRecord convertTo(Record record) throws Exception {
        Map<String, Object> valueRaw = record.getValueRaw();
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(1);
        newHashMapWithExpectedSize.put(TimePropertyEnum.DAY_DATE.toString(), valueRaw.get(TimePropertyEnum.DAY_DATE.toString()).toString());
        return parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getSubject()), newHashMapWithExpectedSize, valueRaw);
    }

    public HiveProducerRecord parseToHiveProducerRecord(String str, Map<String, String> map, Map<String, Object> map2) throws Exception {
        Pair<String, String> tableNameSplits = ActiveReservoirReporter.getTableNameSplits(str);
        List<FieldSchema> second = this.tableFieldSchemaCache.get(tableNameSplits).getSecond();
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(second.size());
        Iterator<FieldSchema> it2 = second.iterator();
        while (it2.hasNext()) {
            newArrayListWithExpectedSize.add(map2.get(it2.next().getName().toUpperCase(Locale.ROOT)));
        }
        return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), map, newArrayListWithExpectedSize);
    }
}
