package org.apache.gobblin.iceberg.writer;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/iceberg/writer/GobblinMCEWriter.class */
public class GobblinMCEWriter implements DataWriter<GenericRecord> {
    private static final Logger log = LoggerFactory.getLogger(GobblinMCEWriter.class);
    public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY = "default.hive.registration.policy";
    public static final String FORCE_HIVE_DATABASE_NAME = "force.hive.database.name";
    public static final String ACCEPTED_CLUSTER_NAMES = "accepted.cluster.names";
    public static final String METADATA_REGISTRATION_THREADS = "metadata.registration.threads";
    public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS = "metadata.parallel.runner.timeout.mills";
    public static final String HIVE_PARTITION_NAME = "hive.partition.name";
    public static final String GMCE_METADATA_WRITER_CLASSES = "gmce.metadata.writer.classes";
    public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
    public static final String TABLE_NAME_DELIMITER = ".";
    Map<String, OperationType> tableOperationTypeMap;
    Map<String, OperationType> datasetOperationTypeMap;
    Set<String> acceptedClusters;
    protected State state;
    private final ParallelRunner parallelRunner;
    private int parallelRunnerTimeoutMills;
    private Closer closer = Closer.create();
    protected final AtomicLong recordCount = new AtomicLong(0);
    private Map<String, Cache<String, Collection<HiveSpec>>> newSpecsMaps = new HashMap();
    private Map<String, Cache<String, Collection<HiveSpec>>> oldSpecsMaps = new HashMap();
    List<MetadataWriter> metadataWriters = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> dataWriterBuilder, State state) throws IOException {
        this.acceptedClusters = state.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName());
        this.state = state;
        Iterator it = this.state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName()).iterator();
        while (it.hasNext()) {
            this.metadataWriters.add(this.closer.register((Closeable) GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, (String) it.next(), new Object[]{this.state})));
        }
        this.tableOperationTypeMap = new HashMap();
        this.datasetOperationTypeMap = new HashMap();
        this.parallelRunner = this.closer.register(new ParallelRunner(this.state.getPropAsInt(METADATA_REGISTRATION_THREADS, 20), FileSystem.get(HadoopUtils.getConfFromState(state))));
        this.parallelRunnerTimeoutMills = this.state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS, DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
    }

    public void write(GenericRecord genericRecord) throws IOException {
    }

    private void computeSpecMap(List<String> list, final ConcurrentHashMap<String, Collection<HiveSpec>> concurrentHashMap, final Cache<String, Collection<HiveSpec>> cache, State state, final boolean z) throws IOException {
        final HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy(state);
        for (final String str : list) {
            this.parallelRunner.submitCallable(new Callable<Void>() { // from class: org.apache.gobblin.iceberg.writer.GobblinMCEWriter.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    try {
                        Path path = z ? new Path(str) : new Path(str).getParent();
                        Path path2 = new Path(path.toUri().getRawPath());
                        ConcurrentHashMap concurrentHashMap2 = concurrentHashMap;
                        String path3 = path.toString();
                        Cache cache2 = cache;
                        String path4 = path.toString();
                        HiveRegistrationPolicy hiveRegistrationPolicy = policy;
                        concurrentHashMap2.put(path3, cache2.get(path4, () -> {
                            return hiveRegistrationPolicy.getHiveSpecs(path2);
                        }));
                        return null;
                    } catch (Exception e) {
                        GobblinMCEWriter.log.warn("Cannot get Hive Spec for {} using policy {}", str, policy.toString());
                        return null;
                    }
                }
            }, str);
        }
        this.parallelRunner.waitForTasks(this.parallelRunnerTimeoutMills);
    }

    public void commit() throws IOException {
        flush();
    }

    public void cleanup() throws IOException {
    }

    public long recordsWritten() {
        return this.recordCount.get();
    }

    public long bytesWritten() throws IOException {
        return 0L;
    }

    public Descriptor getDataDescriptor() {
        return null;
    }

    public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws IOException {
        GenericRecord genericRecord = (GenericRecord) recordEnvelope.getRecord();
        if (this.acceptedClusters.contains(genericRecord.get("cluster"))) {
            GobblinMetadataChangeEvent gobblinMetadataChangeEvent = (GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
            String datasetIdentifier = gobblinMetadataChangeEvent.getDatasetIdentifier().toString();
            if (!this.datasetOperationTypeMap.containsKey(datasetIdentifier)) {
                this.oldSpecsMaps.remove(datasetIdentifier);
            }
            if (this.datasetOperationTypeMap.containsKey(datasetIdentifier) && this.datasetOperationTypeMap.get(datasetIdentifier) != gobblinMetadataChangeEvent.getOperationType()) {
                this.datasetOperationTypeMap.put(datasetIdentifier, gobblinMetadataChangeEvent.getOperationType());
            }
            ConcurrentHashMap<String, Collection<HiveSpec>> concurrentHashMap = new ConcurrentHashMap<>();
            ConcurrentHashMap<String, Collection<HiveSpec>> concurrentHashMap2 = new ConcurrentHashMap<>();
            if (gobblinMetadataChangeEvent.getNewFiles() != null) {
                computeSpecMap(Lists.newArrayList(Iterables.transform(gobblinMetadataChangeEvent.getNewFiles(), (v0) -> {
                    return v0.getFilePath();
                })), concurrentHashMap, this.newSpecsMaps.computeIfAbsent(datasetIdentifier, str -> {
                    return CacheBuilder.newBuilder().expireAfterAccess(this.state.getPropAsInt("GMCEWriter.cache.expiring.time.hours", 1), TimeUnit.HOURS).build();
                }), setHiveRegProperties(this.state, gobblinMetadataChangeEvent, true), false);
            }
            if (gobblinMetadataChangeEvent.getOldFilePrefixes() != null) {
                computeSpecMap(gobblinMetadataChangeEvent.getOldFilePrefixes(), concurrentHashMap2, this.oldSpecsMaps.computeIfAbsent(datasetIdentifier, str2 -> {
                    return CacheBuilder.newBuilder().expireAfterAccess(this.state.getPropAsInt("GMCEWriter.cache.expiring.time.hours", 1), TimeUnit.HOURS).build();
                }), setHiveRegProperties(this.state, gobblinMetadataChangeEvent, false), true);
            } else if (gobblinMetadataChangeEvent.getOldFiles() != null) {
                computeSpecMap(gobblinMetadataChangeEvent.getOldFiles(), concurrentHashMap2, this.oldSpecsMaps.computeIfAbsent(datasetIdentifier, str3 -> {
                    return CacheBuilder.newBuilder().expireAfterAccess(this.state.getPropAsInt("GMCEWriter.cache.expiring.time.hours", 1), TimeUnit.HOURS).build();
                }), setHiveRegProperties(this.state, gobblinMetadataChangeEvent, false), false);
            }
            if (concurrentHashMap.isEmpty() && concurrentHashMap2.isEmpty()) {
                return;
            }
            for (HiveSpec hiveSpec : concurrentHashMap.isEmpty() ? concurrentHashMap2.values().iterator().next() : concurrentHashMap.values().iterator().next()) {
                String dbName = hiveSpec.getTable().getDbName();
                String tableName = hiveSpec.getTable().getTableName();
                String join = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName, new Object[0]);
                if (this.tableOperationTypeMap.containsKey(join) && this.tableOperationTypeMap.get(join) != gobblinMetadataChangeEvent.getOperationType()) {
                    Iterator<MetadataWriter> it = this.metadataWriters.iterator();
                    while (it.hasNext()) {
                        it.next().flush(dbName, tableName);
                    }
                }
                this.tableOperationTypeMap.put(join, gobblinMetadataChangeEvent.getOperationType());
                Iterator<MetadataWriter> it2 = this.metadataWriters.iterator();
                while (it2.hasNext()) {
                    it2.next().writeEnvelope(recordEnvelope, concurrentHashMap, concurrentHashMap2, hiveSpec);
                }
            }
            this.recordCount.incrementAndGet();
        }
    }

    public void flush() throws IOException {
        log.info(String.format("start to flushing %s records", String.valueOf(this.recordCount.get())));
        Iterator<String> it = this.tableOperationTypeMap.keySet().iterator();
        while (it.hasNext()) {
            List splitToList = Splitter.on(TABLE_NAME_DELIMITER).splitToList(it.next());
            Iterator<MetadataWriter> it2 = this.metadataWriters.iterator();
            while (it2.hasNext()) {
                it2.next().flush((String) splitToList.get(0), (String) splitToList.get(1));
            }
        }
        this.tableOperationTypeMap.clear();
        this.recordCount.lazySet(0L);
    }

    public void close() throws IOException {
        flush();
        this.closer.close();
    }

    public static State setHiveRegProperties(State state, GobblinMetadataChangeEvent gobblinMetadataChangeEvent, boolean z) {
        Preconditions.checkArgument(state.contains(DEFAULT_HIVE_REGISTRATION_POLICY_KEY), String.format("Missing required configuration %s", DEFAULT_HIVE_REGISTRATION_POLICY_KEY));
        String prop = state.getProp(DEFAULT_HIVE_REGISTRATION_POLICY_KEY);
        State state2 = new State(state);
        state2.setProp("hive.registration.policy", z ? gobblinMetadataChangeEvent.getRegistrationPolicy() != null ? gobblinMetadataChangeEvent.getRegistrationPolicy() : prop : gobblinMetadataChangeEvent.getRegistrationPolicyForOldData() != null ? gobblinMetadataChangeEvent.getRegistrationPolicyForOldData() : prop);
        if (!z) {
            state2.setProp("mapreduce.job.input.path.empty", true);
        }
        if (gobblinMetadataChangeEvent.getPartitionColumns() != null && !gobblinMetadataChangeEvent.getPartitionColumns().isEmpty()) {
            state2.setProp(HIVE_PARTITION_NAME, String.join(",", gobblinMetadataChangeEvent.getPartitionColumns()));
        }
        if (gobblinMetadataChangeEvent.getRegistrationProperties() != null) {
            for (Map.Entry entry : gobblinMetadataChangeEvent.getRegistrationProperties().entrySet()) {
                state2.setProp((String) entry.getKey(), entry.getValue());
            }
        }
        if (state.contains(FORCE_HIVE_DATABASE_NAME)) {
            state2.setProp("hive.database.name", state.getProp(FORCE_HIVE_DATABASE_NAME));
        }
        if (gobblinMetadataChangeEvent.getTableSchema() != null) {
            state2.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), gobblinMetadataChangeEvent.getTableSchema());
        }
        return state2;
    }

    public List<MetadataWriter> getMetadataWriters() {
        return this.metadataWriters;
    }
}
