package org.apache.gobblin.hive.writer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/hive/writer/HiveMetadataWriter.class */
public class HiveMetadataWriter implements MetadataWriter {
    private static final Logger log = LoggerFactory.getLogger(HiveMetadataWriter.class);
    private static final String HIVE_REGISTRATION_WHITELIST = "hive.registration.whitelist";
    private static final String HIVE_REGISTRATION_BLACKLIST = "hive.registration.blacklist";
    private static final String HIVE_USE_LATEST_SCHEMA_ALLOWLIST = "hive.use.latest.schema.allowlist";
    private static final String HIVE_USE_LATEST_SCHEMA_DENYLIST = "hive.use.latest.schema.denylist";
    private static final String HIVE_REGISTRATION_TIMEOUT_IN_SECONDS = "hive.registration.timeout.seconds";
    private static final long DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS = 60;

    @VisibleForTesting
    protected HiveRegister hiveRegister;
    private final WhitelistBlacklist whitelistBlacklist;
    private final WhitelistBlacklist useExistingTableSchemaAllowDenyList;
    private final KafkaSchemaRegistry schemaRegistry;
    private final long timeOutSeconds;
    protected State state;
    protected EventSubmitter eventSubmitter;
    private final Joiner tableNameJoiner = Joiner.on('.');
    private final Closer closer = Closer.create();
    private final HashMap<String, HashMap<List<String>, ListenableFuture<Void>>> currentExecutionMap = new HashMap<>();
    private final HashMap<String, Cache<String, String>> schemaCreationTimeMap = new HashMap<>();
    private final HashMap<String, Cache<List<String>, HiveSpec>> specMaps = new HashMap<>();
    private final HashMap<String, String> latestSchemaMap = new HashMap<>();
    private final HashMap<String, String> tableTopicPartitionMap = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.gobblin.hive.writer.HiveMetadataWriter$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/gobblin/hive/writer/HiveMetadataWriter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$gobblin$metadata$OperationType = new int[OperationType.values().length];

        static {
            try {
                $SwitchMap$org$apache$gobblin$metadata$OperationType[OperationType.add_files.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$gobblin$metadata$OperationType[OperationType.drop_files.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$gobblin$metadata$OperationType[OperationType.rewrite_files.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/gobblin/hive/writer/HiveMetadataWriter$HivePartitionOperation.class */
    public enum HivePartitionOperation {
        ADD_OR_MODIFY,
        DROP
    }

    public HiveMetadataWriter(State state) throws IOException {
        this.state = state;
        this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""), state.getProp(HIVE_REGISTRATION_BLACKLIST, ""));
        this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
        this.useExistingTableSchemaAllowDenyList = new WhitelistBlacklist(state.getProp(HIVE_USE_LATEST_SCHEMA_ALLOWLIST, ""), state.getProp(HIVE_USE_LATEST_SCHEMA_DENYLIST, ""));
        this.timeOutSeconds = state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS, DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
        if (!state.contains(HiveRegister.HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY)) {
            state.setProp(HiveRegister.HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY, Long.valueOf(this.timeOutSeconds));
        }
        this.hiveRegister = (HiveRegister) this.closer.register(HiveRegister.get(state));
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new Tag(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, ClustersNames.getInstance().getClusterName()));
        this.eventSubmitter = new EventSubmitter.Builder(this.closer.register(GobblinMetricsRegistry.getInstance().getMetricContext(state, HiveMetadataWriter.class, newArrayList)), HiveMetadataWriter.class.getCanonicalName()).build();
    }

    @Override // org.apache.gobblin.hive.writer.MetadataWriter
    public void flush(String str, String str2) throws IOException {
        HiveSpec hiveSpec;
        String join = this.tableNameJoiner.join(str, str2, new Object[0]);
        if (this.currentExecutionMap.containsKey(join)) {
            log.info("start to flush table: " + join);
            HashMap<List<String>, ListenableFuture<Void>> hashMap = this.currentExecutionMap.get(join);
            for (Map.Entry<List<String>, ListenableFuture<Void>> entry : hashMap.entrySet()) {
                try {
                    entry.getValue().get(this.timeOutSeconds, TimeUnit.SECONDS);
                } catch (InterruptedException | ExecutionException e) {
                    if (!(Throwables.getRootCause(e) instanceof AlreadyExistsException)) {
                        throw new HiveMetadataWriterWithPartitionInfoException((Set) hashMap.keySet().stream().flatMap((v0) -> {
                            return v0.stream();
                        }).collect(Collectors.toSet()), Collections.emptySet(), e);
                    }
                    log.warn("Caught AlreadyExistsException for db {}, table {}, ignoring", str, str2);
                } catch (TimeoutException e2) {
                    throw new RuntimeException("Timeout waiting for result of registration for table " + join, e2);
                }
                Cache<List<String>, HiveSpec> cache = this.specMaps.get(join);
                if (cache != null && (hiveSpec = (HiveSpec) cache.getIfPresent(entry.getKey())) != null) {
                    this.eventSubmitter.submit(buildCommitEvent(str, str2, entry.getKey(), hiveSpec, HivePartitionOperation.ADD_OR_MODIFY));
                }
            }
            hashMap.clear();
            log.info("finish flushing table: " + join);
        }
    }

    @Override // org.apache.gobblin.hive.writer.MetadataWriter
    public void reset(String str, String str2) throws IOException {
        String join = this.tableNameJoiner.join(str, str2, new Object[0]);
        this.currentExecutionMap.remove(join);
        this.schemaCreationTimeMap.remove(join);
        this.latestSchemaMap.remove(join);
        this.specMaps.remove(join);
    }

    public void write(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Map<String, Collection<HiveSpec>> map, Map<String, Collection<HiveSpec>> map2, HiveSpec hiveSpec, String str) throws IOException {
        String dbName = hiveSpec.getTable().getDbName();
        String tableName = hiveSpec.getTable().getTableName();
        String join = this.tableNameJoiner.join(dbName, tableName, new Object[0]);
        OperationType operationType = gobblinMetadataChangeEvent.getOperationType();
        String topicName = getTopicName(gobblinMetadataChangeEvent);
        if (operationType != OperationType.drop_files) {
            if (!createTable(hiveSpec, join)) {
                return;
            } else {
                updateLatestSchemaMapWithExistingSchema(dbName, tableName, join);
            }
        }
        this.tableTopicPartitionMap.put(join, str);
        switch (AnonymousClass1.$SwitchMap$org$apache$gobblin$metadata$OperationType[operationType.ordinal()]) {
            case MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME /* 1 */:
                addFiles(gobblinMetadataChangeEvent, map, dbName, tableName, topicName);
                return;
            case 2:
                deleteFiles(gobblinMetadataChangeEvent, map2, dbName, tableName);
                return;
            case 3:
                deleteFiles(gobblinMetadataChangeEvent, map2, dbName, tableName);
                addFiles(gobblinMetadataChangeEvent, map, dbName, tableName, topicName);
                return;
            default:
                log.error("unsupported operation {}", operationType);
                return;
        }
    }

    private boolean createTable(HiveSpec hiveSpec, String str) {
        try {
            if (inHiveSpecCache(str)) {
                return true;
            }
            this.hiveRegister.createTableIfNotExists(hiveSpec.getTable());
            return true;
        } catch (IOException e) {
            log.error("Failed to create table. Skipping this event", e);
            return false;
        }
    }

    @Nullable
    private String getTopicName(GobblinMetadataChangeEvent gobblinMetadataChangeEvent) {
        String str = null;
        if (gobblinMetadataChangeEvent.getTopicPartitionOffsetsRange() != null && !gobblinMetadataChangeEvent.getTopicPartitionOffsetsRange().isEmpty()) {
            String str2 = (String) gobblinMetadataChangeEvent.getTopicPartitionOffsetsRange().keySet().iterator().next();
            str = str2.substring(0, str2.lastIndexOf(45));
        }
        return str;
    }

    private boolean inHiveSpecCache(String str) {
        return this.specMaps.containsKey(str) && this.specMaps.get(str).size() > 0;
    }

    private void updateLatestSchemaMapWithExistingSchema(String str, String str2, String str3) throws IOException {
        updateLatestSchemaMapWithExistingSchema(str, str2, str3, this.useExistingTableSchemaAllowDenyList, this.hiveRegister, this.latestSchemaMap);
    }

    @VisibleForTesting
    protected static boolean updateLatestSchemaMapWithExistingSchema(String str, String str2, String str3, WhitelistBlacklist whitelistBlacklist, HiveRegister hiveRegister, HashMap<String, String> hashMap) throws IOException {
        if (!whitelistBlacklist.acceptTable(str, str2) && hashMap.containsKey(str3)) {
            return false;
        }
        hashMap.put(str3, ((HiveTable) hiveRegister.getTable(str, str2).get()).getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
        return true;
    }

    public void deleteFiles(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Map<String, Collection<HiveSpec>> map, String str, String str2) throws IOException {
        if (gobblinMetadataChangeEvent.getOldFilePrefixes() == null || gobblinMetadataChangeEvent.getOldFilePrefixes().isEmpty()) {
            return;
        }
        Iterator<Collection<HiveSpec>> it = map.values().iterator();
        while (it.hasNext()) {
            for (HiveSpec hiveSpec : it.next()) {
                if (hiveSpec.getTable().getDbName().equals(str) && hiveSpec.getTable().getTableName().equals(str2) && hiveSpec.getPartition().isPresent()) {
                    deRegisterPartitionHelper(str, str2, hiveSpec);
                }
            }
        }
    }

    protected void deRegisterPartitionHelper(String str, String str2, HiveSpec hiveSpec) throws IOException {
        this.hiveRegister.dropPartitionIfExists(str, str2, hiveSpec.getTable().getPartitionKeys(), ((HivePartition) hiveSpec.getPartition().get()).getValues());
        this.eventSubmitter.submit(buildCommitEvent(str, str2, ((HivePartition) hiveSpec.getPartition().get()).getValues(), hiveSpec, HivePartitionOperation.DROP));
    }

    public void addFiles(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Map<String, Collection<HiveSpec>> map, String str, String str2, String str3) throws IOException {
        String join = this.tableNameJoiner.join(str, str2, new Object[0]);
        Iterator<Collection<HiveSpec>> it = map.values().iterator();
        while (it.hasNext()) {
            for (HiveSpec hiveSpec : it.next()) {
                if (hiveSpec.getTable().getDbName().equals(str) && hiveSpec.getTable().getTableName().equals(str2)) {
                    List<String> values = hiveSpec.getPartition().isPresent() ? ((HivePartition) hiveSpec.getPartition().get()).getValues() : Lists.newArrayList();
                    Cache<List<String>, HiveSpec> computeIfAbsent = this.specMaps.computeIfAbsent(join, str4 -> {
                        return CacheBuilder.newBuilder().expireAfterAccess(this.state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME, 1), TimeUnit.HOURS).build();
                    });
                    HiveSpec hiveSpec2 = (HiveSpec) computeIfAbsent.getIfPresent(values);
                    schemaUpdateHelper(gobblinMetadataChangeEvent, hiveSpec, str3, join);
                    if (hiveSpec2 == null) {
                        registerSpec(str, str2, values, hiveSpec, computeIfAbsent);
                    } else if (this.hiveRegister.needToUpdateTable(hiveSpec2.getTable(), hiveSpec.getTable()) || (hiveSpec.getPartition().isPresent() && this.hiveRegister.needToUpdatePartition((HivePartition) hiveSpec2.getPartition().get(), (HivePartition) hiveSpec.getPartition().get()))) {
                        registerSpec(str, str2, values, hiveSpec, computeIfAbsent);
                    }
                }
            }
        }
    }

    private void registerSpec(String str, String str2, List<String> list, HiveSpec hiveSpec, Cache<List<String>, HiveSpec> cache) {
        String join = this.tableNameJoiner.join(str, str2, new Object[0]);
        HashMap<List<String>, ListenableFuture<Void>> computeIfAbsent = this.currentExecutionMap.computeIfAbsent(join, str3 -> {
            return new HashMap();
        });
        if (computeIfAbsent.containsKey(list)) {
            try {
                computeIfAbsent.get(list).get(this.timeOutSeconds, TimeUnit.SECONDS);
                this.eventSubmitter.submit(buildCommitEvent(str, str2, list, hiveSpec, HivePartitionOperation.ADD_OR_MODIFY));
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.error("Error when getting the result of registration for table " + join);
                throw new RuntimeException(e);
            }
        }
        computeIfAbsent.put(list, this.hiveRegister.register(hiveSpec));
        cache.put(list, hiveSpec);
    }

    private void schemaUpdateHelper(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, HiveSpec hiveSpec, String str, String str2) throws IOException {
        if (gobblinMetadataChangeEvent.getSchemaSource() != SchemaSource.NONE) {
            String prop = hiveSpec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
            if (prop != null) {
                String schemaCreationTime = AvroUtils.getSchemaCreationTime(new Schema.Parser().parse(prop));
                Cache<String, String> computeIfAbsent = this.schemaCreationTimeMap.computeIfAbsent(str2, str3 -> {
                    return CacheBuilder.newBuilder().expireAfterAccess(this.state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME, 1), TimeUnit.HOURS).build();
                });
                if (gobblinMetadataChangeEvent.getSchemaSource() == SchemaSource.EVENT) {
                    this.latestSchemaMap.put(str2, overrideSchemaLiteral(hiveSpec, prop, schemaCreationTime, gobblinMetadataChangeEvent.getPartitionColumns()));
                    computeIfAbsent.cleanUp();
                } else if (gobblinMetadataChangeEvent.getSchemaSource() == SchemaSource.SCHEMAREGISTRY && schemaCreationTime != null && computeIfAbsent.getIfPresent(schemaCreationTime) == null && StringUtils.isNoneEmpty(new CharSequence[]{str})) {
                    if (AvroUtils.getSchemaCreationTime((Schema) this.schemaRegistry.getLatestSchemaByTopic(str)).equals(schemaCreationTime)) {
                        this.latestSchemaMap.put(str2, overrideSchemaLiteral(hiveSpec, prop, schemaCreationTime, gobblinMetadataChangeEvent.getPartitionColumns()));
                    }
                    computeIfAbsent.put(schemaCreationTime, "");
                }
            }
        } else if (gobblinMetadataChangeEvent.getRegistrationProperties().containsKey(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB) && !((String) gobblinMetadataChangeEvent.getRegistrationProperties().get(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB)).equals(hiveSpec.getTable().getDbName())) {
            String str4 = (String) gobblinMetadataChangeEvent.getRegistrationProperties().get(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB);
            try {
                String fetchSchemaFromTable = fetchSchemaFromTable(str4, hiveSpec.getTable().getTableName());
                if (fetchSchemaFromTable != null) {
                    hiveSpec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), fetchSchemaFromTable);
                    HiveMetaStoreUtils.updateColumnsInfoIfNeeded(hiveSpec);
                }
                return;
            } catch (IOException e) {
                log.warn(String.format("Cannot get schema from table %s.%s", str4, hiveSpec.getTable().getTableName()), e);
                return;
            }
        }
        if (this.latestSchemaMap.containsKey(str2)) {
            hiveSpec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), this.latestSchemaMap.get(str2));
            HiveMetaStoreUtils.updateColumnsInfoIfNeeded(hiveSpec);
        }
    }

    protected String overrideSchemaLiteral(HiveSpec hiveSpec, String str, String str2, List<String> list) {
        return str;
    }

    private String fetchSchemaFromTable(String str, String str2) throws IOException {
        String join = this.tableNameJoiner.join(str, str2, new Object[0]);
        if (this.latestSchemaMap.containsKey(join)) {
            return this.latestSchemaMap.get(join);
        }
        Optional<HiveTable> table = this.hiveRegister.getTable(str, str2);
        if (table.isPresent()) {
            return ((HiveTable) table.get()).getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
        }
        return null;
    }

    @Override // org.apache.gobblin.hive.writer.MetadataWriter
    public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, Map<String, Collection<HiveSpec>> map, Map<String, Collection<HiveSpec>> map2, HiveSpec hiveSpec) throws IOException {
        GenericRecord genericRecord = (GenericRecord) recordEnvelope.getRecord();
        GobblinMetadataChangeEvent gobblinMetadataChangeEvent = (GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
        if (!this.whitelistBlacklist.acceptTable(hiveSpec.getTable().getDbName(), hiveSpec.getTable().getTableName())) {
            log.debug(String.format("Skip table %s.%s since it's not selected", hiveSpec.getTable().getDbName(), hiveSpec.getTable().getTableName()));
            return;
        }
        try {
            write(gobblinMetadataChangeEvent, map, map2, hiveSpec, recordEnvelope.getWatermark().getSource());
        } catch (IOException e) {
            throw new HiveMetadataWriterWithPartitionInfoException(getPartitionValues(map), getPartitionValues(map2), e);
        }
    }

    public Set<String> getPartitionValues(Map<String, Collection<HiveSpec>> map) {
        return (Set) ((Set) ((Set) map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet())).stream().filter(hiveSpec -> {
            return hiveSpec.getPartition().isPresent();
        }).map(hiveSpec2 -> {
            return ((HivePartition) hiveSpec2.getPartition().get()).getValues();
        }).collect(Collectors.toSet())).stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
    }

    protected GobblinEventBuilder buildCommitEvent(String str, String str2, List<String> list, HiveSpec hiveSpec, HivePartitionOperation hivePartitionOperation) {
        GobblinEventBuilder gobblinEventBuilder = new GobblinEventBuilder(MetadataWriterKeys.HIVE_COMMIT_EVENT_NAME);
        gobblinEventBuilder.addMetadata(MetadataWriterKeys.HIVE_DATABASE_NAME_KEY, str);
        gobblinEventBuilder.addMetadata(MetadataWriterKeys.HIVE_TABLE_NAME_KEY, str2);
        gobblinEventBuilder.addMetadata(MetadataWriterKeys.PARTITION_KEYS, Joiner.on(',').join((Iterable) hiveSpec.getTable().getPartitionKeys().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList())));
        gobblinEventBuilder.addMetadata(MetadataWriterKeys.PARTITION_VALUES_KEY, Joiner.on(',').join(list));
        gobblinEventBuilder.addMetadata(MetadataWriterKeys.HIVE_PARTITION_OPERATION_KEY, hivePartitionOperation.name());
        gobblinEventBuilder.addMetadata(MetadataWriterKeys.PARTITION_HDFS_PATH, hiveSpec.getPath().toString());
        String str3 = this.tableTopicPartitionMap.get(this.tableNameJoiner.join(str, str2, new Object[0]));
        if (str3 != null) {
            gobblinEventBuilder.addMetadata(MetadataWriterKeys.HIVE_EVENT_GMCE_TOPIC_NAME, str3.split("-")[0]);
        }
        return gobblinEventBuilder;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closer.close();
    }

    void setHiveRegister(HiveRegister hiveRegister) {
        this.hiveRegister = hiveRegister;
    }

    public KafkaSchemaRegistry getSchemaRegistry() {
        return this.schemaRegistry;
    }
}
