package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.class */
public class SparkSampleWritesUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);

    public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(JavaSparkContext javaSparkContext, Option<JavaRDD<HoodieRecord>> option, HoodieWriteConfig hoodieWriteConfig) {
        if (!hoodieWriteConfig.getBoolean(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED).booleanValue()) {
            LOG.debug("Skip overwriting record size estimate as it's disabled.");
            return Option.empty();
        }
        if (getMetaClient(javaSparkContext, hoodieWriteConfig.getBasePath()).isTimelineNonEmpty()) {
            LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
            return Option.empty();
        }
        try {
            Pair<Boolean, String> doSampleWrites = doSampleWrites(javaSparkContext, option, hoodieWriteConfig, HoodieInstantTimeGenerator.getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault())));
            if (((Boolean) doSampleWrites.getLeft()).booleanValue()) {
                long avgSizeFromSampleWrites = getAvgSizeFromSampleWrites(javaSparkContext, (String) doSampleWrites.getRight());
                LOG.info("Overwriting record size estimate to {}", Long.valueOf(avgSizeFromSampleWrites));
                TypedProperties props = hoodieWriteConfig.getProps();
                props.put(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(avgSizeFromSampleWrites));
                return Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build());
            }
        } catch (IOException e) {
            LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", hoodieWriteConfig.getTableName()), e);
        }
        return Option.empty();
    }

    private static Pair<Boolean, String> doSampleWrites(JavaSparkContext javaSparkContext, Option<JavaRDD<HoodieRecord>> option, HoodieWriteConfig hoodieWriteConfig, String str) throws IOException {
        String sampleWritesBasePath = getSampleWritesBasePath(javaSparkContext, hoodieWriteConfig, str);
        HoodieTableMetaClient.newTableBuilder().setTableType(HoodieTableType.COPY_ON_WRITE).setTableName(String.format("%s_samples_%s", hoodieWriteConfig.getTableName(), str)).setCDCEnabled(false).initTable(HadoopFSUtils.getStorageConfWithCopy(javaSparkContext.hadoopConfiguration()), sampleWritesBasePath);
        TypedProperties props = hoodieWriteConfig.getProps();
        props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "false");
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withProps(props).withTableServicesEnabled(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).withSchemaEvolutionEnable(false).withBulkInsertParallelism(1).withAutoCommit(true).withPath(sampleWritesBasePath).build();
        Pair of = Pair.of(false, (Object) null);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(javaSparkContext), build, Option.empty());
        Throwable th = null;
        try {
            try {
                int intValue = hoodieWriteConfig.getIntOrDefault(HoodieStreamerConfig.SAMPLE_WRITES_SIZE).intValue();
                Pair<Boolean, String> pair = (Pair) option.map(javaRDD -> {
                    List take = javaRDD.coalesce(1).take(intValue);
                    if (take.isEmpty()) {
                        return of;
                    }
                    sparkRDDWriteClient.startCommitWithTime(str);
                    JavaRDD bulkInsert = sparkRDDWriteClient.bulkInsert(javaSparkContext.parallelize(take, 1), str);
                    if (bulkInsert.filter((v0) -> {
                        return v0.hasErrors();
                    }).count() <= 0) {
                        return Pair.of(true, sampleWritesBasePath);
                    }
                    LOG.error("sample writes for table {} failed with errors.", hoodieWriteConfig.getTableName());
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Printing out the top 100 errors");
                        bulkInsert.filter((v0) -> {
                            return v0.hasErrors();
                        }).take(100).forEach(writeStatus -> {
                            LOG.trace("Global error :", writeStatus.getGlobalError());
                            writeStatus.getErrors().forEach((hoodieKey, th2) -> {
                                LOG.trace(String.format("Error for key: %s", hoodieKey), th2);
                            });
                        });
                    }
                    return of;
                }).orElse(of);
                if (sparkRDDWriteClient != null) {
                    if (0 != 0) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                return pair;
            } finally {
            }
        } catch (Throwable th3) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private static String getSampleWritesBasePath(JavaSparkContext javaSparkContext, HoodieWriteConfig hoodieWriteConfig, String str) throws IOException {
        StoragePath storagePath = new StoragePath(hoodieWriteConfig.getBasePath(), ".hoodie/.aux/.sample_writes/" + str);
        HoodieStorage storage = getMetaClient(javaSparkContext, hoodieWriteConfig.getBasePath()).getStorage();
        if (storage.exists(storagePath)) {
            storage.deleteDirectory(storagePath);
        }
        return storagePath.toString();
    }

    private static long getAvgSizeFromSampleWrites(JavaSparkContext javaSparkContext, String str) throws IOException {
        HoodieTableMetaClient metaClient = getMetaClient(javaSparkContext, str);
        Option lastInstant = metaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
        ValidationUtils.checkState(lastInstant.isPresent(), "The only completed instant should be present in sample_writes table.");
        HoodieInstant hoodieInstant = (HoodieInstant) lastInstant.get();
        HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, (byte[]) metaClient.getCommitTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
        return (long) Math.ceil((1.0d * hoodieCommitMetadata.fetchTotalBytesWritten()) / hoodieCommitMetadata.fetchTotalRecordsWritten());
    }

    private static HoodieTableMetaClient getMetaClient(JavaSparkContext javaSparkContext, String str) {
        return HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(HadoopFSUtils.getFs(str, javaSparkContext.hadoopConfiguration()).getConf())).setBasePath(str).build();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -674884955:
                if (implMethodName.equals("hasErrors")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return (v0) -> {
                        return v0.hasErrors();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return (v0) -> {
                        return v0.hasErrors();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
