package org.apache.hudi.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/util/StreamerUtil.class */
public class StreamerUtil {
    private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);

    public static TypedProperties appendKafkaProps(FlinkStreamerConfig flinkStreamerConfig) {
        TypedProperties props = getProps(flinkStreamerConfig);
        props.put("bootstrap.servers", flinkStreamerConfig.kafkaBootstrapServers);
        props.put("group.id", flinkStreamerConfig.kafkaGroupId);
        return props;
    }

    public static TypedProperties getProps(FlinkStreamerConfig flinkStreamerConfig) {
        return flinkStreamerConfig.propsFilePath.isEmpty() ? new TypedProperties() : readConfig(FSUtils.getFs(flinkStreamerConfig.propsFilePath, getHadoopConf()), new Path(flinkStreamerConfig.propsFilePath), flinkStreamerConfig.configs).getConfig();
    }

    public static Schema getSourceSchema(FlinkStreamerConfig flinkStreamerConfig) {
        return new FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(flinkStreamerConfig)).getSourceSchema();
    }

    public static Schema getSourceSchema(Configuration configuration) {
        if (configuration.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
            return new FilebasedSchemaProvider(configuration).getSourceSchema();
        }
        if (configuration.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
            return new Schema.Parser().parse((String) configuration.get(FlinkOptions.SOURCE_AVRO_SCHEMA));
        }
        throw new HoodieException(String.format("Either option '%s' or '%s' should be specified for avro schema deserialization", FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), FlinkOptions.SOURCE_AVRO_SCHEMA.key()));
    }

    public static DFSPropertiesConfiguration readConfig(FileSystem fileSystem, Path path, List<String> list) {
        DFSPropertiesConfiguration dFSPropertiesConfiguration;
        try {
            dFSPropertiesConfiguration = new DFSPropertiesConfiguration(path.getFileSystem(fileSystem.getConf()), path);
        } catch (Exception e) {
            dFSPropertiesConfiguration = new DFSPropertiesConfiguration();
            LOG.warn("Unexpected error read props file at :" + path, e);
        }
        try {
            if (!list.isEmpty()) {
                LOG.info("Adding overridden properties to file properties.");
                dFSPropertiesConfiguration.addProperties(new BufferedReader(new StringReader(String.join("\n", list))));
            }
            return dFSPropertiesConfiguration;
        } catch (IOException e2) {
            throw new HoodieIOException("Unexpected error adding config overrides", e2);
        }
    }

    public static org.apache.hadoop.conf.Configuration getHadoopConf() {
        return FlinkClientUtil.getHadoopConf();
    }

    public static HoodieWriteConfig getHoodieClientConfig(Configuration configuration) {
        return HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(configuration.getString(FlinkOptions.PATH)).combineInput(configuration.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true).withMergeAllowDuplicateOnInserts(allowDuplicateInserts(configuration)).withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(configuration.getString(FlinkOptions.PAYLOAD_CLASS_NAME)).withTargetIOPerCompactionInMB(configuration.getLong(FlinkOptions.COMPACTION_TARGET_IO)).withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.valueOf(configuration.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))).withMaxNumDeltaCommitsBeforeCompaction(configuration.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)).withMaxDeltaSecondsBeforeCompaction(configuration.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)).withAsyncClean(Boolean.valueOf(configuration.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))).retainCommits(configuration.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS)).withCleanerParallelism(20).archiveCommitsWith(configuration.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), configuration.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)).withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).build()).withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxMemoryMaxSize(configuration.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024, configuration.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024).build()).forTable(configuration.getString(FlinkOptions.TABLE_NAME)).withStorageConfig(HoodieStorageConfig.newBuilder().logFileDataBlockMaxSize(configuration.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024).logFileMaxSize(configuration.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(configuration.getBoolean(FlinkOptions.METADATA_ENABLED)).withMaxNumDeltaCommitsBeforeCompaction(configuration.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)).build()).withEmbeddedTimelineServerReuseEnabled(true).withAutoCommit(false).withAllowOperationMetadataField(configuration.getBoolean(FlinkOptions.CHANGELOG_ENABLED)).withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(configuration))).withSchema(getSourceSchema(configuration).toString()).build();
    }

    public static TypedProperties flinkConf2TypedProperties(Configuration configuration) {
        Properties properties = new Properties();
        configuration.addAllToProperties(properties);
        for (ConfigOption<?> configOption : FlinkOptions.optionalOptions()) {
            if (!configuration.contains(configOption) && configOption.hasDefaultValue()) {
                properties.put(configOption.key(), configOption.defaultValue());
            }
        }
        return new TypedProperties(properties);
    }

    public static void checkRequiredProperties(TypedProperties typedProperties, List<String> list) {
        list.forEach(str -> {
            Preconditions.checkState(typedProperties.containsKey(str), "Required property " + str + " is missing");
        });
    }

    public static void initTableIfNotExists(Configuration configuration) throws IOException {
        String string = configuration.getString(FlinkOptions.PATH);
        org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConf();
        if (FSUtils.getFs(string, hadoopConf).exists(new Path(string, ".hoodie"))) {
            LOG.info("Table [{}/{}] already exists, no need to initialize the table", string, configuration.getString(FlinkOptions.TABLE_NAME));
        } else {
            HoodieTableMetaClient.withPropertyBuilder().setTableType(configuration.getString(FlinkOptions.TABLE_TYPE)).setTableName(configuration.getString(FlinkOptions.TABLE_NAME)).setRecordKeyFields(configuration.getString(FlinkOptions.RECORD_KEY_FIELD, (String) null)).setPayloadClassName(configuration.getString(FlinkOptions.PAYLOAD_CLASS_NAME)).setArchiveLogFolder((String) HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPartitionFields(configuration.getString(FlinkOptions.PARTITION_PATH_FIELD, (String) null)).setPreCombineField(configuration.getString(FlinkOptions.PRECOMBINE_FIELD)).setTimelineLayoutVersion(1).initTable(hadoopConf, string);
            LOG.info("Table initialized under base path {}", string);
        }
    }

    public static String generateBucketKey(String str, String str2) {
        return String.format("%s_%s", str, str2);
    }

    public static boolean needsAsyncCompaction(Configuration configuration) {
        return configuration.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT).equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) && configuration.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
    }

    public static boolean needsScheduleCompaction(Configuration configuration) {
        return configuration.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT).equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) && configuration.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
    }

    public static HoodieTableMetaClient createMetaClient(String str) {
        return HoodieTableMetaClient.builder().setBasePath(str).setConf(FlinkClientUtil.getHadoopConf()).build();
    }

    public static HoodieTableMetaClient createMetaClient(Configuration configuration) {
        return createMetaClient(configuration.getString(FlinkOptions.PATH));
    }

    public static HoodieFlinkWriteClient createWriteClient(Configuration configuration, RuntimeContext runtimeContext) {
        return new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(new SerializableConfiguration(getHadoopConf()), new FlinkTaskContextSupplier(runtimeContext)), getHoodieClientConfig(configuration));
    }

    public static HoodieFlinkWriteClient createWriteClient(Configuration configuration) {
        return new HoodieFlinkWriteClient(HoodieFlinkEngineContext.DEFAULT, getHoodieClientConfig(configuration));
    }

    public static String medianInstantTime(String str, String str2) {
        try {
            long time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(str).getTime();
            long time2 = HoodieActiveTimeline.COMMIT_FORMATTER.parse(str2).getTime();
            ValidationUtils.checkArgument(time > time2, "Instant [" + str + "] should have newer timestamp than instant [" + str2 + "]");
            return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time2 + ((time - time2) / 2)));
        } catch (ParseException e) {
            throw new HoodieException("Get median instant time with interval [" + str2 + ", " + str + "] error", e);
        }
    }

    public static long instantTimeDiffSeconds(String str, String str2) {
        try {
            return (HoodieActiveTimeline.COMMIT_FORMATTER.parse(str).getTime() - HoodieActiveTimeline.COMMIT_FORMATTER.parse(str2).getTime()) / 1000;
        } catch (ParseException e) {
            throw new HoodieException("Get instant time diff with interval [" + str2 + ", " + str + "] error", e);
        }
    }

    public static Option<Transformer> createTransformer(List<String> list) throws IOException {
        try {
            ArrayList arrayList = new ArrayList();
            Iterator it = ((List) Option.ofNullable(list).orElse(Collections.emptyList())).iterator();
            while (it.hasNext()) {
                arrayList.add(ReflectionUtils.loadClass((String) it.next()));
            }
            return arrayList.isEmpty() ? Option.empty() : Option.of(new ChainedTransformer(arrayList));
        } catch (Throwable th) {
            throw new IOException("Could not load transformer class(es) " + list, th);
        }
    }

    public static boolean isValidFile(FileStatus fileStatus) {
        String fileExtension = FSUtils.getFileExtension(fileStatus.getPath().toString());
        return HoodieFileFormat.PARQUET.getFileExtension().equals(fileExtension) ? fileStatus.getLen() > ((long) ParquetFileWriter.MAGIC.length) : HoodieFileFormat.ORC.getFileExtension().equals(fileExtension) ? fileStatus.getLen() > ((long) "ORC".length()) : HoodieFileFormat.HOODIE_LOG.getFileExtension().equals(fileExtension) ? fileStatus.getLen() > ((long) HoodieLogFormat.MAGIC.length) : fileStatus.getLen() > 0;
    }

    public static boolean allowDuplicateInserts(Configuration configuration) {
        return WriteOperationType.fromValue(configuration.getString(FlinkOptions.OPERATION)) == WriteOperationType.INSERT && !configuration.getBoolean(FlinkOptions.INSERT_DEDUP);
    }
}
