package org.apache.hudi.utilities.deltastreamer;

import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/DeltaSync.class */
public class DeltaSync implements Serializable {
    private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
    public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
    private final HoodieDeltaStreamer.Config cfg;
    private transient SourceFormatAdapter formatAdapter;
    private transient SchemaProvider schemaProvider;
    private transient Transformer transformer;
    private KeyGenerator keyGenerator;
    private transient FileSystem fs;
    private transient JavaSparkContext jssc;
    private transient SparkSession sparkSession;
    private transient HiveConf hiveConf;
    private final TypedProperties props;
    private transient Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient;
    private transient Option<HoodieTimeline> commitTimelineOpt;
    private transient HoodieWriteClient writeClient;
    private final HoodieTableType tableType;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.utilities.deltastreamer.DeltaSync$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/DeltaSync$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$HoodieTableType = new int[HoodieTableType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieTableType[HoodieTableType.COPY_ON_WRITE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieTableType[HoodieTableType.MERGE_ON_READ.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public DeltaSync(HoodieDeltaStreamer.Config config, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieTableType hoodieTableType, TypedProperties typedProperties, JavaSparkContext javaSparkContext, FileSystem fileSystem, HiveConf hiveConf, Function<HoodieWriteClient, Boolean> function) throws IOException {
        this.cfg = config;
        this.jssc = javaSparkContext;
        this.sparkSession = sparkSession;
        this.fs = fileSystem;
        this.tableType = hoodieTableType;
        this.onInitializingHoodieWriteClient = function;
        this.props = typedProperties;
        LOG.info("Creating delta streamer with configs : " + typedProperties.toString());
        this.schemaProvider = schemaProvider;
        refreshTimeline();
        this.transformer = UtilHelpers.createTransformer(config.transformerClassName);
        this.keyGenerator = DataSourceUtils.createKeyGenerator(typedProperties);
        this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(config.sourceClassName, typedProperties, javaSparkContext, sparkSession, schemaProvider));
        this.hiveConf = hiveConf;
        if (config.filterDupes.booleanValue()) {
            config.operation = config.operation == HoodieDeltaStreamer.Operation.UPSERT ? HoodieDeltaStreamer.Operation.INSERT : config.operation;
        }
        setupWriteClient();
    }

    private void refreshTimeline() throws IOException {
        if (!this.fs.exists(new Path(this.cfg.targetBasePath))) {
            this.commitTimelineOpt = Option.empty();
            HoodieTableMetaClient.initTableType(new Configuration(this.jssc.hadoopConfiguration()), this.cfg.targetBasePath, this.cfg.tableType, this.cfg.targetTableName, "archived", this.cfg.payloadClassName);
            return;
        }
        HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(new Configuration(this.fs.getConf()), this.cfg.targetBasePath, this.cfg.payloadClassName);
        switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$model$HoodieTableType[hoodieTableMetaClient.getTableType().ordinal()]) {
            case 1:
                this.commitTimelineOpt = Option.of(hoodieTableMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
                return;
            case 2:
                this.commitTimelineOpt = Option.of(hoodieTableMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
                return;
            default:
                throw new HoodieException("Unsupported table type :" + hoodieTableMetaClient.getTableType());
        }
    }

    public Option<String> syncOnce() throws Exception {
        Option<String> empty = Option.empty();
        HoodieDeltaStreamerMetrics hoodieDeltaStreamerMetrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
        Timer.Context overallTimerContext = hoodieDeltaStreamerMetrics.getOverallTimerContext();
        refreshTimeline();
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource = readFromSource(this.commitTimelineOpt);
        if (null != readFromSource) {
            if (null == this.schemaProvider) {
                this.schemaProvider = (SchemaProvider) readFromSource.getKey();
                setupWriteClient();
            }
            empty = writeToSink((JavaRDD) ((Pair) readFromSource.getRight()).getRight(), (String) ((Pair) readFromSource.getRight()).getLeft(), hoodieDeltaStreamerMetrics, overallTimerContext);
        }
        this.jssc.getPersistentRDDs().values().forEach((v0) -> {
            v0.unpersist();
        });
        return empty;
    }

    private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(Option<HoodieTimeline> option) throws Exception {
        Option<JavaRDD<GenericRecord>> batch;
        String checkpointForNextBatch;
        SchemaProvider schemaProvider;
        Option<String> empty = Option.empty();
        if (option.isPresent()) {
            Option lastInstant = ((HoodieTimeline) option.get()).lastInstant();
            if (lastInstant.isPresent()) {
                HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) ((HoodieTimeline) option.get()).getInstantDetails((HoodieInstant) lastInstant.get()).get(), HoodieCommitMetadata.class);
                if (this.cfg.checkpoint != null && !this.cfg.checkpoint.equals(hoodieCommitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
                    empty = Option.of(this.cfg.checkpoint);
                } else {
                    if (hoodieCommitMetadata.getMetadata(CHECKPOINT_KEY) == null) {
                        throw new HoodieDeltaStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :" + lastInstant + ", Instants :" + ((HoodieTimeline) option.get()).getInstants().collect(Collectors.toList()) + ", CommitMetadata=" + hoodieCommitMetadata.toJsonString());
                    }
                    empty = Option.of(hoodieCommitMetadata.getMetadata(CHECKPOINT_KEY));
                }
            }
        } else {
            HoodieTableMetaClient.initTableType(new Configuration(this.jssc.hadoopConfiguration()), this.cfg.targetBasePath, this.cfg.tableType, this.cfg.targetTableName, "archived", this.cfg.payloadClassName);
        }
        if (!empty.isPresent() && this.cfg.checkpoint != null) {
            empty = Option.of(this.cfg.checkpoint);
        }
        LOG.info("Checkpoint to resume from : " + empty);
        if (this.transformer != null) {
            InputBatch<Dataset<Row>> fetchNewDataInRowFormat = this.formatAdapter.fetchNewDataInRowFormat(empty, this.cfg.sourceLimit);
            Option map = fetchNewDataInRowFormat.getBatch().map(dataset -> {
                return this.transformer.apply(this.jssc, this.sparkSession, dataset, this.props);
            });
            checkpointForNextBatch = fetchNewDataInRowFormat.getCheckpointForNextBatch();
            batch = (this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null) ? map.map(dataset2 -> {
                return AvroConversionUtils.createRdd(dataset2, RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE).toJavaRDD();
            }) : map.map(dataset3 -> {
                return AvroConversionUtils.createRdd(dataset3, this.schemaProvider.getTargetSchema(), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE).toJavaRDD();
            });
            schemaProvider = (this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null) ? (SchemaProvider) map.map(dataset4 -> {
                return new RowBasedSchemaProvider(dataset4.schema());
            }).orElse(fetchNewDataInRowFormat.getSchemaProvider()) : this.schemaProvider;
        } else {
            InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat = this.formatAdapter.fetchNewDataInAvroFormat(empty, this.cfg.sourceLimit);
            batch = fetchNewDataInAvroFormat.getBatch();
            checkpointForNextBatch = fetchNewDataInAvroFormat.getCheckpointForNextBatch();
            schemaProvider = fetchNewDataInAvroFormat.getSchemaProvider();
        }
        if (Objects.equals(checkpointForNextBatch, empty.orElse((Object) null))) {
            LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" + empty + "). New Checkpoint=(" + checkpointForNextBatch + ")");
            return null;
        }
        if (batch.isPresent() && !((JavaRDD) batch.get()).isEmpty()) {
            return Pair.of(schemaProvider, Pair.of(checkpointForNextBatch, ((JavaRDD) batch.get()).map(genericRecord -> {
                return new HoodieRecord(this.keyGenerator.getKey(genericRecord), DataSourceUtils.createPayload(this.cfg.payloadClassName, genericRecord, (Comparable) DataSourceUtils.getNestedFieldVal(genericRecord, this.cfg.sourceOrderingField, false)));
            })));
        }
        LOG.info("No new data, perform empty commit.");
        return Pair.of(schemaProvider, Pair.of(checkpointForNextBatch, this.jssc.emptyRDD()));
    }

    private Option<String> writeToSink(JavaRDD<HoodieRecord> javaRDD, String str, HoodieDeltaStreamerMetrics hoodieDeltaStreamerMetrics, Timer.Context context) throws Exception {
        JavaRDD bulkInsert;
        Option<String> empty = Option.empty();
        if (this.cfg.filterDupes.booleanValue()) {
            this.cfg.operation = this.cfg.operation == HoodieDeltaStreamer.Operation.UPSERT ? HoodieDeltaStreamer.Operation.INSERT : this.cfg.operation;
            javaRDD = DataSourceUtils.dropDuplicates(this.jssc, javaRDD, this.writeClient.getConfig(), this.writeClient.getTimelineServer());
        }
        boolean isEmpty = javaRDD.isEmpty();
        String startCommit = startCommit();
        LOG.info("Starting commit  : " + startCommit);
        if (this.cfg.operation == HoodieDeltaStreamer.Operation.INSERT) {
            bulkInsert = this.writeClient.insert(javaRDD, startCommit);
        } else if (this.cfg.operation == HoodieDeltaStreamer.Operation.UPSERT) {
            bulkInsert = this.writeClient.upsert(javaRDD, startCommit);
        } else {
            if (this.cfg.operation != HoodieDeltaStreamer.Operation.BULK_INSERT) {
                throw new HoodieDeltaStreamerException("Unknown operation :" + this.cfg.operation);
            }
            bulkInsert = this.writeClient.bulkInsert(javaRDD, startCommit);
        }
        long longValue = bulkInsert.mapToDouble((v0) -> {
            return v0.getTotalErrorRecords();
        }).sum().longValue();
        long longValue2 = bulkInsert.mapToDouble((v0) -> {
            return v0.getTotalRecords();
        }).sum().longValue();
        boolean z = longValue > 0;
        long j = 0;
        if (z && !this.cfg.commitOnErrors.booleanValue()) {
            LOG.error("Delta Sync found errors when writing. Errors/Total=" + longValue + "/" + longValue2);
            LOG.error("Printing out the top 100 errors");
            bulkInsert.filter((v0) -> {
                return v0.hasErrors();
            }).take(100).forEach(writeStatus -> {
                LOG.error("Global error :", writeStatus.getGlobalError());
                if (writeStatus.getErrors().size() > 0) {
                    writeStatus.getErrors().forEach((hoodieKey, th) -> {
                        LOG.trace("Error for key:" + hoodieKey + " is " + th);
                    });
                }
            });
            this.writeClient.rollback(startCommit);
            throw new HoodieException("Commit " + startCommit + " failed and rolled-back !");
        }
        HashMap hashMap = new HashMap();
        hashMap.put(CHECKPOINT_KEY, str);
        if (this.cfg.checkpoint != null) {
            hashMap.put(CHECKPOINT_RESET_KEY, this.cfg.checkpoint);
        }
        if (z) {
            LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + longValue + "/" + longValue2);
        }
        if (!this.writeClient.commit(startCommit, bulkInsert, Option.of(hashMap))) {
            LOG.info("Commit " + startCommit + " failed!");
            throw new HoodieException("Commit " + startCommit + " failed!");
        }
        LOG.info("Commit " + startCommit + " successful!");
        if (this.cfg.isAsyncCompactionEnabled()) {
            empty = this.writeClient.scheduleCompaction(Option.empty());
        }
        if (!isEmpty) {
            Timer.Context hiveSyncTimerContext = hoodieDeltaStreamerMetrics.getHiveSyncTimerContext();
            syncHive();
            j = hiveSyncTimerContext != null ? hiveSyncTimerContext.stop() : 0L;
        }
        hoodieDeltaStreamerMetrics.updateDeltaStreamerMetrics(context != null ? context.stop() : 0L, j);
        return empty;
    }

    private String startCommit() {
        int i = 1;
        IllegalArgumentException illegalArgumentException = null;
        while (i <= 2) {
            try {
                return this.writeClient.startCommit();
            } catch (IllegalArgumentException e) {
                illegalArgumentException = e;
                LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", e);
                i++;
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                }
            }
        }
        throw illegalArgumentException;
    }

    private void syncHive() {
        if (this.cfg.enableHiveSync.booleanValue()) {
            HiveSyncConfig buildHiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(this.props, this.cfg.targetBasePath);
            LOG.info("Syncing target hoodie table with hive table(" + buildHiveSyncConfig.tableName + "). Hive metastore URL :" + buildHiveSyncConfig.jdbcUrl + ", basePath :" + this.cfg.targetBasePath);
            new HiveSyncTool(buildHiveSyncConfig, this.hiveConf, this.fs).syncHoodieTable();
        }
    }

    public void setupWriteClient() {
        LOG.info("Setting up Hoodie Write Client");
        if (null == this.schemaProvider || null != this.writeClient) {
            return;
        }
        registerAvroSchemas(this.schemaProvider);
        this.writeClient = new HoodieWriteClient(this.jssc, getHoodieClientConfig(this.schemaProvider), true);
        this.onInitializingHoodieWriteClient.apply(this.writeClient);
    }

    private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
        HoodieWriteConfig.Builder withProps = HoodieWriteConfig.newBuilder().withPath(this.cfg.targetBasePath).combineInput(this.cfg.filterDupes.booleanValue(), true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(this.cfg.payloadClassName).withInlineCompaction(Boolean.valueOf(this.cfg.isInlineCompactionEnabled())).build()).forTable(this.cfg.targetTableName).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withAutoCommit(false).withProps(this.props);
        if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
            withProps = withProps.withSchema(schemaProvider.getTargetSchema().toString());
        }
        HoodieWriteConfig build = withProps.build();
        Preconditions.checkArgument(build.isInlineCompaction() == this.cfg.isInlineCompactionEnabled());
        Preconditions.checkArgument(!build.shouldAutoCommit().booleanValue());
        Preconditions.checkArgument(build.shouldCombineBeforeInsert() == this.cfg.filterDupes.booleanValue());
        Preconditions.checkArgument(build.shouldCombineBeforeUpsert());
        return build;
    }

    private void registerAvroSchemas(SchemaProvider schemaProvider) {
        if (null != schemaProvider) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(schemaProvider.getSourceSchema());
            if (schemaProvider.getTargetSchema() != null) {
                arrayList.add(schemaProvider.getTargetSchema());
            }
            LOG.info("Registering Schema :" + arrayList);
            this.jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(arrayList).toList());
        }
    }

    public void close() {
        if (null != this.writeClient) {
            this.writeClient.close();
            this.writeClient = null;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1030817580:
                if (implMethodName.equals("getTotalRecords")) {
                    z = 2;
                    break;
                }
                break;
            case -674884955:
                if (implMethodName.equals("hasErrors")) {
                    z = false;
                    break;
                }
                break;
            case -213386296:
                if (implMethodName.equals("getTotalErrorRecords")) {
                    z = 3;
                    break;
                }
                break;
            case 809145839:
                if (implMethodName.equals("lambda$readFromSource$f92c188c$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return (v0) -> {
                        return v0.hasErrors();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/deltastreamer/DeltaSync") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/avro/generic/GenericRecord;)Lorg/apache/hudi/common/model/HoodieRecord;")) {
                    DeltaSync deltaSync = (DeltaSync) serializedLambda.getCapturedArg(0);
                    return genericRecord -> {
                        return new HoodieRecord(this.keyGenerator.getKey(genericRecord), DataSourceUtils.createPayload(this.cfg.payloadClassName, genericRecord, (Comparable) DataSourceUtils.getNestedFieldVal(genericRecord, this.cfg.sourceOrderingField, false)));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/DoubleFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)D") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getTotalRecords();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/DoubleFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)D") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getTotalErrorRecords();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
