package org.apache.hudi.connect.writers;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/connect/writers/KafkaConnectTransactionServices.class */
public class KafkaConnectTransactionServices implements ConnectTransactionServices {
    private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class);
    private final KafkaConnectConfigs connectConfigs;
    private final Option<HoodieTableMetaClient> tableMetaClient;
    private final Configuration hadoopConf;
    private final HoodieWriteConfig writeConfig;
    private final String tableBasePath;
    private final String tableName;
    private final HoodieEngineContext context;
    private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;

    public KafkaConnectTransactionServices(KafkaConnectConfigs kafkaConnectConfigs) throws HoodieException {
        this.connectConfigs = kafkaConnectConfigs;
        this.writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.JAVA).withProperties(kafkaConnectConfigs.getProps()).build();
        this.tableBasePath = this.writeConfig.getBasePath();
        this.tableName = this.writeConfig.getTableName();
        this.hadoopConf = KafkaConnectUtils.getDefaultHadoopConf(kafkaConnectConfigs);
        this.context = new HoodieJavaEngineContext(this.hadoopConf);
        try {
            KeyGenerator createKeyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(kafkaConnectConfigs.getProps()));
            String recordKeyColumns = KafkaConnectUtils.getRecordKeyColumns(createKeyGenerator);
            String partitionColumns = KafkaConnectUtils.getPartitionColumns(createKeyGenerator, new TypedProperties(kafkaConnectConfigs.getProps()));
            LOG.info(String.format("Setting record key %s and partition fields %s for table %s", recordKeyColumns, partitionColumns, this.tableBasePath + this.tableName));
            this.tableMetaClient = Option.of(HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.COPY_ON_WRITE.name()).setTableName(this.tableName).setPayloadClassName(HoodieAvroPayload.class.getName()).setRecordKeyFields(recordKeyColumns).setPartitionFields(partitionColumns).setKeyGeneratorClassProp(this.writeConfig.getKeyGeneratorClass()).fromProperties(kafkaConnectConfigs.getProps()).initTable(this.hadoopConf, this.tableBasePath));
            this.javaClient = new HoodieJavaWriteClient<>(this.context, this.writeConfig);
        } catch (Exception e) {
            throw new HoodieException("Fatal error instantiating Hudi Transaction Services ", e);
        }
    }

    @Override // org.apache.hudi.connect.writers.ConnectTransactionServices
    public String startCommit() {
        String startCommit = this.javaClient.startCommit();
        this.javaClient.transitionInflight(startCommit);
        LOG.info("Starting Hudi commit " + startCommit);
        return startCommit;
    }

    @Override // org.apache.hudi.connect.writers.ConnectTransactionServices
    public boolean endCommit(String str, List<WriteStatus> list, Map<String, String> map) {
        boolean commit = this.javaClient.commit(str, list, Option.of(map));
        if (commit) {
            LOG.info("Ending Hudi commit " + str);
            if (this.writeConfig.isAsyncClusteringEnabled()) {
                this.javaClient.scheduleClustering(Option.empty()).ifPresent(str2 -> {
                    LOG.info("Scheduled clustering at instant time:" + str2);
                });
            }
            if (isAsyncCompactionEnabled()) {
                this.javaClient.scheduleCompaction(Option.empty()).ifPresent(str3 -> {
                    LOG.info("Scheduled compaction at instant time:" + str3);
                });
            }
            syncMeta();
        }
        return commit;
    }

    @Override // org.apache.hudi.connect.writers.ConnectTransactionServices
    public Map<String, String> fetchLatestExtraCommitMetadata() {
        if (!this.tableMetaClient.isPresent()) {
            throw new HoodieException("Fatal error retrieving Hoodie Extra Metadata since Table Meta Client is absent");
        }
        Option<HoodieCommitMetadata> commitMetadataForLatestInstant = KafkaConnectUtils.getCommitMetadataForLatestInstant(this.tableMetaClient.get());
        if (commitMetadataForLatestInstant.isPresent()) {
            return commitMetadataForLatestInstant.get().getExtraMetadata();
        }
        LOG.info("Hoodie Extra Metadata from latest commit is absent");
        return Collections.emptyMap();
    }

    private boolean isAsyncCompactionEnabled() {
        return this.tableMetaClient.isPresent() && HoodieTableType.MERGE_ON_READ.equals(this.tableMetaClient.get().getTableType()) && this.connectConfigs.isAsyncCompactEnabled().booleanValue();
    }

    private void syncMeta() {
        if (this.connectConfigs.isMetaSyncEnabled().booleanValue()) {
            HashSet<String> hashSet = new HashSet(Arrays.asList(this.connectConfigs.getMetaSyncClasses().split(",")));
            FileSystem fs = FSUtils.getFs(this.tableBasePath, new Configuration());
            for (String str : hashSet) {
                SyncUtilHelpers.runHoodieMetaSync(str.trim(), this.connectConfigs.getProps(), this.hadoopConf, fs, this.tableBasePath, this.connectConfigs.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT));
            }
        }
    }
}
