package org.apache.hudi.utilities.streamer;

import com.codahale.metrics.Timer;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.http.cookie.ClientCookie;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.HoodieSchemaUtils;
import org.apache.hudi.HoodieSparkSqlWriter;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.commit.HoodieStreamerDatasetBulkInsertCommitActionExecutor;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetaSyncException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hudi.util.JavaScalaConverters;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.exception.HoodieStreamerWriteException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.LazyCastingIterator;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet;
import org.apache.hudi.utilities.schema.SimpleSchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/hudi/utilities/streamer/StreamSync.class */
public class StreamSync implements Serializable, Closeable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamSync.class);
    private static final String NULL_PLACEHOLDER = "[null]";
    public static final String CHECKPOINT_IGNORE_KEY = "deltastreamer.checkpoint.ignore_key";
    private final HoodieStreamer.Config cfg;
    private transient SourceFormatAdapter formatAdapter;
    private transient SchemaProvider userProvidedSchemaProvider;
    private transient SchemaProvider schemaProvider;
    private transient Option<Transformer> transformer;
    private String keyGenClassName;
    private transient HoodieStorage storage;
    private final transient HoodieSparkEngineContext hoodieSparkContext;
    private transient SparkSession sparkSession;
    private transient Configuration conf;
    private final TypedProperties props;
    private transient Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient;
    private transient Option<HoodieTimeline> commitsTimelineOpt;
    private transient Option<HoodieTimeline> allCommitsTimelineOpt;
    private final SchemaSet processedSchema;
    private transient Option<EmbeddedTimelineService> embeddedTimelineService;
    private transient SparkRDDWriteClient writeClient;
    private Option<BaseErrorTableWriter> errorTableWriter;
    private HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy;
    private transient HoodieIngestionMetrics metrics;
    private transient HoodieMetrics hoodieMetrics;
    private final boolean autoGenerateRecordKeys;
    private final boolean useRowWriter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.utilities.streamer.StreamSync$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/utilities/streamer/StreamSync$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$HoodieTableType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$config$HoodieErrorTableConfig$ErrorWriteFailureStrategy;
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$WriteOperationType = new int[WriteOperationType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.UPSERT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.BULK_INSERT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.INSERT_OVERWRITE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.INSERT_OVERWRITE_TABLE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.DELETE_PARTITION.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$org$apache$hudi$config$HoodieErrorTableConfig$ErrorWriteFailureStrategy = new int[HoodieErrorTableConfig.ErrorWriteFailureStrategy.values().length];
            try {
                $SwitchMap$org$apache$hudi$config$HoodieErrorTableConfig$ErrorWriteFailureStrategy[HoodieErrorTableConfig.ErrorWriteFailureStrategy.ROLLBACK_COMMIT.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$hudi$config$HoodieErrorTableConfig$ErrorWriteFailureStrategy[HoodieErrorTableConfig.ErrorWriteFailureStrategy.LOG_ERROR.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            $SwitchMap$org$apache$hudi$common$model$HoodieTableType = new int[HoodieTableType.values().length];
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieTableType[HoodieTableType.COPY_ON_WRITE.ordinal()] = 1;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieTableType[HoodieTableType.MERGE_ON_READ.ordinal()] = 2;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/utilities/streamer/StreamSync$WriteClientWriteResult.class */
    public class WriteClientWriteResult {
        private Map<String, List<String>> partitionToReplacedFileIds = Collections.emptyMap();
        private JavaRDD<WriteStatus> writeStatusRDD;

        public WriteClientWriteResult(JavaRDD<WriteStatus> javaRDD) {
            this.writeStatusRDD = javaRDD;
        }

        public Map<String, List<String>> getPartitionToReplacedFileIds() {
            return this.partitionToReplacedFileIds;
        }

        public void setPartitionToReplacedFileIds(Map<String, List<String>> map) {
            this.partitionToReplacedFileIds = map;
        }

        public JavaRDD<WriteStatus> getWriteStatusRDD() {
            return this.writeStatusRDD;
        }
    }

    @VisibleForTesting
    StreamSync(HoodieStreamer.Config config, SparkSession sparkSession, TypedProperties typedProperties, HoodieSparkEngineContext hoodieSparkEngineContext, HoodieStorage hoodieStorage, Configuration configuration, Function<SparkRDDWriteClient, Boolean> function, SchemaProvider schemaProvider, Option<BaseErrorTableWriter> option, SourceFormatAdapter sourceFormatAdapter, Option<Transformer> option2, boolean z, boolean z2) {
        this.embeddedTimelineService = Option.empty();
        this.errorTableWriter = Option.empty();
        this.cfg = config;
        this.hoodieSparkContext = hoodieSparkEngineContext;
        this.sparkSession = sparkSession;
        this.storage = hoodieStorage;
        this.onInitializingHoodieWriteClient = function;
        this.props = typedProperties;
        this.userProvidedSchemaProvider = schemaProvider;
        this.processedSchema = new SchemaSet();
        this.autoGenerateRecordKeys = z2;
        this.keyGenClassName = HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(new TypedProperties(typedProperties));
        this.conf = configuration;
        this.errorTableWriter = option;
        this.formatAdapter = sourceFormatAdapter;
        this.transformer = option2;
        this.useRowWriter = z;
    }

    @Deprecated
    public StreamSync(HoodieStreamer.Config config, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties typedProperties, JavaSparkContext javaSparkContext, FileSystem fileSystem, Configuration configuration, Function<SparkRDDWriteClient, Boolean> function) throws IOException {
        this(config, sparkSession, typedProperties, new HoodieSparkEngineContext(javaSparkContext), fileSystem, configuration, function, new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    public StreamSync(HoodieStreamer.Config config, SparkSession sparkSession, TypedProperties typedProperties, HoodieSparkEngineContext hoodieSparkEngineContext, FileSystem fileSystem, Configuration configuration, Function<SparkRDDWriteClient, Boolean> function, StreamContext streamContext) throws IOException {
        this.embeddedTimelineService = Option.empty();
        this.errorTableWriter = Option.empty();
        this.cfg = config;
        this.hoodieSparkContext = hoodieSparkEngineContext;
        this.sparkSession = sparkSession;
        this.storage = new HoodieHadoopStorage(fileSystem);
        this.onInitializingHoodieWriteClient = function;
        this.props = typedProperties;
        this.userProvidedSchemaProvider = streamContext.getSchemaProvider();
        this.processedSchema = new SchemaSet();
        this.autoGenerateRecordKeys = KeyGenUtils.isAutoGeneratedRecordKeysEnabled(typedProperties);
        this.keyGenClassName = HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(new TypedProperties(typedProperties));
        this.conf = configuration;
        HoodieWriteConfig hoodieClientConfig = getHoodieClientConfig();
        this.metrics = (HoodieIngestionMetrics) ReflectionUtils.loadClass(config.ingestionMetricsClass, new Class[]{HoodieMetricsConfig.class, HoodieStorage.class}, hoodieClientConfig.getMetricsConfig(), this.storage);
        this.hoodieMetrics = new HoodieMetrics(hoodieClientConfig, this.storage);
        if (typedProperties.getBoolean(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), ((Boolean) HoodieErrorTableConfig.ERROR_TABLE_ENABLED.defaultValue()).booleanValue())) {
            this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(config, sparkSession, typedProperties, hoodieSparkEngineContext, fileSystem);
            this.errorWriteFailureStrategy = ErrorTableUtils.getErrorWriteFailureStrategy(typedProperties);
        }
        refreshTimeline();
        Source createSource = UtilHelpers.createSource(config.sourceClassName, typedProperties, hoodieSparkEngineContext.jsc(), sparkSession, this.metrics, streamContext);
        this.formatAdapter = new SourceFormatAdapter(createSource, this.errorTableWriter, Option.of(typedProperties));
        this.transformer = UtilHelpers.createTransformer(Option.ofNullable(config.transformerClassNames), this.schemaProvider == null ? Option::empty : () -> {
            return Option.ofNullable(this.schemaProvider.getSourceSchema());
        }, this.errorTableWriter.isPresent());
        this.useRowWriter = this.cfg.operation == WriteOperationType.BULK_INSERT && createSource.getSourceType() == Source.SourceType.ROW && this.props.getBoolean(DataSourceWriteOptions.ENABLE_ROW_WRITER().key(), false);
    }

    public void refreshTimeline() throws IOException {
        if (!this.storage.exists(new StoragePath(this.cfg.targetBasePath))) {
            initializeEmptyTable();
            return;
        }
        try {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(this.conf)).setBasePath(this.cfg.targetBasePath).setPayloadClassName(this.cfg.payloadClassName).setRecordMergerStrategy(this.props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), (String) HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).build();
            switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$model$HoodieTableType[build.getTableType().ordinal()]) {
                case 1:
                case 2:
                    this.commitsTimelineOpt = Option.of(build.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
                    this.allCommitsTimelineOpt = Option.of(build.getActiveTimeline().getAllCommitsTimeline());
                    return;
                default:
                    throw new HoodieException("Unsupported table type :" + build.getTableType());
            }
        } catch (HoodieIOException e) {
            LOG.warn("Full exception msg " + e.getMessage());
            if (!e.getMessage().contains("Could not load Hoodie properties") || !e.getMessage().contains("hoodie.properties")) {
                throw e;
            }
            String format = this.cfg.targetBasePath.endsWith("/") ? this.cfg.targetBasePath : String.format("%s/", this.cfg.targetBasePath);
            if (this.storage.exists(new StoragePath(format)) && this.storage.exists(new StoragePath(String.format("%s%s/%s", format, ".hoodie", "hoodie.properties"))) && this.storage.exists(new StoragePath(String.format("%s%s/%s", format, ".hoodie", "hoodie.properties.backup")))) {
                return;
            }
            LOG.warn("Base path exists, but table is not fully initialized. Re-initializing again");
            initializeEmptyTable();
            if (HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(this.conf)).setBasePath(this.cfg.targetBasePath).build().reloadActiveTimeline().countInstants() > 0) {
                this.storage.deleteDirectory(new StoragePath(String.format("%s%s/%s", format, ".hoodie", "hoodie.properties")));
                throw new HoodieIOException("hoodie.properties is missing. Likely due to some external entity. Please populate the hoodie.properties and restart the pipeline. ", e.getIOException());
            }
        }
    }

    private void initializeEmptyTable() throws IOException {
        this.commitsTimelineOpt = Option.empty();
        this.allCommitsTimelineOpt = Option.empty();
        HoodieTableMetaClient.withPropertyBuilder().setTableType(this.cfg.tableType).setTableName(this.cfg.targetTableName).setArchiveLogFolder((String) HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(this.cfg.payloadClassName).setBaseFileFormat(this.cfg.baseFileFormat).setPartitionFields(SparkKeyGenUtils.getPartitionColumns(this.props)).setRecordKeyFields(this.props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())).setPopulateMetaFields(this.props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), ((Boolean) HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).booleanValue())).setKeyGeneratorClassProp(this.keyGenClassName).setPreCombineField(this.cfg.sourceOrderingField).setPartitionMetafileUseBaseFormat(Boolean.valueOf(this.props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), ((Boolean) HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()).booleanValue()))).setCDCEnabled(this.props.getBoolean(HoodieTableConfig.CDC_ENABLED.key(), ((Boolean) HoodieTableConfig.CDC_ENABLED.defaultValue()).booleanValue())).setCDCSupplementalLoggingMode(this.props.getString(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(), (String) HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.defaultValue())).setShouldDropPartitionColumns(HoodieStreamerUtils.isDropPartitionColumns(this.props)).setHiveStylePartitioningEnable(Boolean.valueOf(this.props.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key(), Boolean.parseBoolean((String) HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())))).setUrlEncodePartitioning(Boolean.valueOf(this.props.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING.key(), Boolean.parseBoolean((String) HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue())))).initTable(HadoopFSUtils.getStorageConfWithCopy(this.hoodieSparkContext.hadoopConfiguration()), this.cfg.targetBasePath);
    }

    public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException {
        Pair<Option<String>, JavaRDD<WriteStatus>> pair = null;
        Timer.Context overallTimerContext = this.metrics.getOverallTimerContext();
        refreshTimeline();
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        InputBatch readFromSource = readFromSource(createNewInstantTime, HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(this.conf)).setBasePath(this.cfg.targetBasePath).setRecordMergerStrategy(this.props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), (String) HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).build());
        if (readFromSource != null) {
            if (this.writeClient == null) {
                this.schemaProvider = readFromSource.getSchemaProvider();
                setupWriteClient(readFromSource.getBatch());
            } else {
                Schema sourceSchema = readFromSource.getSchemaProvider().getSourceSchema();
                Schema targetSchema = readFromSource.getSchemaProvider().getTargetSchema();
                if ((sourceSchema != null && !this.processedSchema.isSchemaPresent(sourceSchema)) || (targetSchema != null && !this.processedSchema.isSchemaPresent(targetSchema))) {
                    LOG.info("Seeing new schema. Source: {}, Target: {}", sourceSchema == null ? NULL_PLACEHOLDER : sourceSchema.toString(true), targetSchema == null ? NULL_PLACEHOLDER : targetSchema.toString(true));
                    reInitWriteClient(sourceSchema, targetSchema, readFromSource.getBatch());
                    if (sourceSchema != null) {
                        this.processedSchema.addSchema(sourceSchema);
                    }
                    if (targetSchema != null) {
                        this.processedSchema.addSchema(targetSchema);
                    }
                }
            }
            if (this.cfg.retryLastPendingInlineCompactionJob.booleanValue() && this.writeClient.getConfig().inlineCompactionEnabled()) {
                Option<String> lastPendingCompactionInstant = getLastPendingCompactionInstant(this.allCommitsTimelineOpt);
                if (lastPendingCompactionInstant.isPresent()) {
                    this.writeClient.commitCompaction(lastPendingCompactionInstant.get(), (HoodieCommitMetadata) this.writeClient.compact(lastPendingCompactionInstant.get()).getCommitMetadata().get(), Option.empty());
                    refreshTimeline();
                    reInitWriteClient(this.schemaProvider.getSourceSchema(), this.schemaProvider.getTargetSchema(), null);
                }
            } else if (this.cfg.retryLastPendingInlineClusteringJob.booleanValue() && this.writeClient.getConfig().inlineClusteringEnabled()) {
                Option<String> lastPendingClusteringInstant = getLastPendingClusteringInstant(this.allCommitsTimelineOpt);
                if (lastPendingClusteringInstant.isPresent()) {
                    this.writeClient.cluster(lastPendingClusteringInstant.get());
                }
            }
            pair = writeToSinkAndDoMetaSync(createNewInstantTime, readFromSource, this.metrics, overallTimerContext);
        }
        if (this.schemaProvider != null) {
            this.schemaProvider.refresh();
        }
        this.metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
        return pair;
    }

    private Option<String> getLastPendingClusteringInstant(Option<HoodieTimeline> option) {
        if (!option.isPresent()) {
            return Option.empty();
        }
        Option lastPendingClusterInstant = option.get().getLastPendingClusterInstant();
        return lastPendingClusterInstant.isPresent() ? Option.of(((HoodieInstant) lastPendingClusterInstant.get()).getTimestamp()) : Option.empty();
    }

    private Option<String> getLastPendingCompactionInstant(Option<HoodieTimeline> option) {
        if (!option.isPresent()) {
            return Option.empty();
        }
        Option lastInstant = option.get().filterPendingCompactionTimeline().lastInstant();
        return lastInstant.isPresent() ? Option.of(((HoodieInstant) lastInstant.get()).getTimestamp()) : Option.empty();
    }

    public InputBatch readFromSource(String str, HoodieTableMetaClient hoodieTableMetaClient) throws IOException {
        Option<String> empty = Option.empty();
        if (this.commitsTimelineOpt.isPresent()) {
            empty = getCheckpointToResume(this.commitsTimelineOpt);
        }
        LOG.debug("Checkpoint from config: " + this.cfg.checkpoint);
        if (!empty.isPresent() && this.cfg.checkpoint != null) {
            empty = Option.of(this.cfg.checkpoint);
        }
        LOG.info("Checkpoint to resume from : " + empty);
        int intValue = this.cfg.retryOnSourceFailures.booleanValue() ? this.cfg.maxRetryCount.intValue() : 1;
        int i = 0;
        InputBatch inputBatch = null;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= intValue || inputBatch != null) {
                break;
            }
            try {
                inputBatch = fetchFromSourceAndPrepareRecords(empty, str, hoodieTableMetaClient);
            } catch (HoodieSourceTimeoutException e) {
                if (i >= intValue) {
                    throw e;
                }
                try {
                    LOG.error("Exception thrown while fetching data from source. Msg : " + e.getMessage() + ", class : " + e.getClass() + ", cause : " + e.getCause());
                    LOG.error("Sleeping for " + this.cfg.retryIntervalSecs + " before retrying again. Current retry count " + i + ", max retry count " + this.cfg.maxRetryCount);
                    Thread.sleep(this.cfg.retryIntervalSecs.intValue() * 1000);
                } catch (InterruptedException e2) {
                    LOG.error("Ignoring InterruptedException while waiting to retry on source failure " + e.getMessage());
                }
            }
        }
        return inputBatch;
    }

    private InputBatch fetchFromSourceAndPrepareRecords(Option<String> option, String str, HoodieTableMetaClient hoodieTableMetaClient) {
        this.hoodieSparkContext.setJobStatus(getClass().getSimpleName(), "Fetching next batch: " + this.cfg.targetTableName);
        HoodieRecord.HoodieRecordType recordType = UtilHelpers.createRecordMerger(this.props).getRecordType();
        if (recordType == HoodieRecord.HoodieRecordType.SPARK && HoodieTableType.valueOf(this.cfg.tableType) == HoodieTableType.MERGE_ON_READ && !this.cfg.operation.equals(WriteOperationType.BULK_INSERT) && HoodieLogBlock.HoodieLogBlockType.fromId(this.props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro")) != HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
            throw new UnsupportedOperationException("Spark record only support parquet log.");
        }
        InputBatch fetchNextBatchFromSource = fetchNextBatchFromSource(option, hoodieTableMetaClient);
        String checkpointForNextBatch = fetchNextBatchFromSource.getCheckpointForNextBatch();
        SchemaProvider schemaProvider = fetchNextBatchFromSource.getSchemaProvider();
        if (this.cfg.allowCommitOnNoCheckpointChange.booleanValue() || !Objects.equals(checkpointForNextBatch, option.orElse(null))) {
            this.hoodieSparkContext.setJobStatus(getClass().getSimpleName(), "Checking if input is empty: " + this.cfg.targetTableName);
            return this.useRowWriter ? fetchNextBatchFromSource : new InputBatch(HoodieStreamerUtils.createHoodieRecords(this.cfg, this.props, fetchNextBatchFromSource.getBatch(), schemaProvider, recordType, this.autoGenerateRecordKeys, str, this.errorTableWriter), checkpointForNextBatch, schemaProvider);
        }
        LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" + option + "). New Checkpoint=(" + checkpointForNextBatch + ")");
        this.hoodieMetrics.updateMetricsForEmptyData(CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType)));
        return null;
    }

    @VisibleForTesting
    InputBatch fetchNextBatchFromSource(Option<String> option, HoodieTableMetaClient hoodieTableMetaClient) {
        Option option2 = null;
        String str = null;
        SchemaProvider schemaProvider = null;
        InputBatch inputBatch = null;
        boolean z = this.props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
        if (this.transformer.isPresent()) {
            InputBatch<Dataset<Row>> fetchNewDataInRowFormat = this.formatAdapter.fetchNewDataInRowFormat(option, this.cfg.sourceLimit);
            Option<Dataset<Row>> processErrorEvents = this.formatAdapter.processErrorEvents(fetchNewDataInRowFormat.getBatch().map(dataset -> {
                return this.transformer.get().apply(this.hoodieSparkContext.jsc(), this.sparkSession, dataset, this.props);
            }), ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
            str = fetchNewDataInRowFormat.getCheckpointForNextBatch();
            if (this.userProvidedSchemaProvider == null || this.userProvidedSchemaProvider.getTargetSchema() == null || this.userProvidedSchemaProvider.getTargetSchema() == InputBatch.NULL_SCHEMA) {
                Option<U> map = processErrorEvents.map(dataset2 -> {
                    return AvroConversionUtils.convertStructTypeToAvroSchema(dataset2.schema(), AvroSchemaUtils.getAvroRecordQualifiedName(this.cfg.targetTableName));
                });
                SchemaProvider schemaProvider2 = fetchNewDataInRowFormat.getSchemaProvider();
                schemaProvider2.getClass();
                schemaProvider = getDeducedSchemaProvider((Schema) map.orElseGet(schemaProvider2::getTargetSchema), fetchNewDataInRowFormat.getSchemaProvider(), hoodieTableMetaClient);
                if (this.useRowWriter) {
                    inputBatch = new InputBatch(processErrorEvents, str, schemaProvider);
                } else {
                    option2 = processErrorEvents.map(dataset3 -> {
                        return getTransformedRDD(dataset3, z, schemaProvider.getTargetSchema());
                    });
                }
            } else {
                schemaProvider = getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(), this.userProvidedSchemaProvider, hoodieTableMetaClient);
                if (this.useRowWriter) {
                    inputBatch = new InputBatch(processErrorEvents, str, schemaProvider);
                } else {
                    option2 = (this.errorTableWriter.isPresent() && this.props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), ((Boolean) HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()).booleanValue())) ? processErrorEvents.map(dataset4 -> {
                        Tuple2 safeCreateRDD = HoodieSparkUtils.safeCreateRDD(dataset4, RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, z, Option.of(schemaProvider.getTargetSchema()));
                        this.errorTableWriter.get().addErrorEvents(((RDD) safeCreateRDD._2()).toJavaRDD().map(str2 -> {
                            return new ErrorEvent(str2, ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE);
                        }));
                        return ((RDD) safeCreateRDD._1).toJavaRDD();
                    }) : processErrorEvents.map(dataset5 -> {
                        return getTransformedRDD(dataset5, z, schemaProvider.getTargetSchema());
                    });
                }
            }
        } else if (this.useRowWriter) {
            InputBatch<Dataset<Row>> fetchNewDataInRowFormat2 = this.formatAdapter.fetchNewDataInRowFormat(option, this.cfg.sourceLimit);
            inputBatch = new InputBatch(fetchNewDataInRowFormat2.getBatch(), fetchNewDataInRowFormat2.getCheckpointForNextBatch(), getDeducedSchemaProvider(fetchNewDataInRowFormat2.getSchemaProvider().getTargetSchema(), fetchNewDataInRowFormat2.getSchemaProvider(), hoodieTableMetaClient));
        } else {
            InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat = this.formatAdapter.fetchNewDataInAvroFormat(option, this.cfg.sourceLimit);
            str = fetchNewDataInAvroFormat.getCheckpointForNextBatch();
            schemaProvider = getDeducedSchemaProvider(fetchNewDataInAvroFormat.getSchemaProvider().getTargetSchema(), fetchNewDataInAvroFormat.getSchemaProvider(), hoodieTableMetaClient);
            String schema = schemaProvider.getTargetSchema().toString();
            option2 = (this.errorTableWriter.isPresent() && this.props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), ((Boolean) HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()).booleanValue())) ? fetchNewDataInAvroFormat.getBatch().map(javaRDD -> {
                Tuple2 safeRewriteRDD = HoodieSparkUtils.safeRewriteRDD(javaRDD.rdd(), schema);
                this.errorTableWriter.get().addErrorEvents(((RDD) safeRewriteRDD._2()).toJavaRDD().map(str2 -> {
                    return new ErrorEvent(str2, ErrorEvent.ErrorReason.INVALID_RECORD_SCHEMA);
                }));
                return ((RDD) safeRewriteRDD._1).toJavaRDD();
            }) : fetchNewDataInAvroFormat.getBatch().map(javaRDD2 -> {
                return javaRDD2.mapPartitions(it -> {
                    return new LazyCastingIterator(it, schema);
                });
            });
        }
        return this.useRowWriter ? inputBatch : new InputBatch(option2, str, schemaProvider);
    }

    @VisibleForTesting
    SchemaProvider getDeducedSchemaProvider(Schema schema, SchemaProvider schemaProvider, HoodieTableMetaClient hoodieTableMetaClient) {
        return new DelegatingSchemaProvider(this.props, this.hoodieSparkContext.jsc(), schemaProvider, new SimpleSchemaProvider(this.hoodieSparkContext.jsc(), HoodieSchemaUtils.deduceWriterSchema(HoodieAvroUtils.removeMetadataFields(schema), UtilHelpers.getLatestTableSchema(this.hoodieSparkContext.jsc(), this.storage, this.cfg.targetBasePath, hoodieTableMetaClient), HoodieConversionUtils.toJavaOption(HoodieSchemaUtils.getLatestTableInternalSchema(new HoodieConfig(HoodieStreamer.Config.getProps(this.conf, this.cfg)), hoodieTableMetaClient)), this.props), this.props));
    }

    private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> dataset, boolean z, Schema schema) {
        return HoodieSparkUtils.createRdd(dataset, RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, z, Option.ofNullable(schema)).toJavaRDD();
    }

    @VisibleForTesting
    Option<String> getCheckpointToResume(Option<HoodieTimeline> option) throws IOException {
        Option<String> empty = Option.empty();
        HoodieTimeline filter = option.get().filter(hoodieInstant -> {
            return hoodieInstant.getAction().equals("deltacommit");
        });
        if (this.cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name()) && !filter.empty()) {
            option = Option.of(filter);
        }
        Option lastInstant = option.get().lastInstant();
        if (lastInstant.isPresent()) {
            Option<HoodieCommitMetadata> latestCommitMetadataWithValidCheckpointInfo = getLatestCommitMetadataWithValidCheckpointInfo(option.get());
            if (latestCommitMetadataWithValidCheckpointInfo.isPresent()) {
                HoodieCommitMetadata hoodieCommitMetadata = latestCommitMetadataWithValidCheckpointInfo.get();
                LOG.debug("Checkpoint reset from metadata: " + hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY));
                if (this.cfg.ignoreCheckpoint != null && (StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY)) || !this.cfg.ignoreCheckpoint.equals(hoodieCommitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY)))) {
                    empty = Option.empty();
                } else if (this.cfg.checkpoint != null && (StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY)) || !this.cfg.checkpoint.equals(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY)))) {
                    empty = Option.of(this.cfg.checkpoint);
                } else if (!StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY))) {
                    empty = Option.of(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY));
                } else if (HoodieTimeline.compareTimestamps("00000000000002", HoodieTimeline.LESSER_THAN, ((HoodieInstant) lastInstant.get()).getTimestamp())) {
                    throw new HoodieStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :" + lastInstant + ", Instants :" + option.get().getInstants() + ", CommitMetadata=" + hoodieCommitMetadata.toJsonString());
                }
                if (!StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))) {
                    ConfigUtils.removeConfigFromProps(this.props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE);
                }
            } else if (this.cfg.checkpoint != null) {
                empty = Option.of(this.cfg.checkpoint);
            }
        }
        return empty;
    }

    protected Option<Pair<String, HoodieCommitMetadata>> getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline hoodieTimeline) throws IOException {
        return (Option) hoodieTimeline.getReverseOrderedInstants().map(hoodieInstant -> {
            try {
                HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
                return (StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY)) && StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))) ? Option.empty() : Option.of(Pair.of(hoodieInstant.toString(), hoodieCommitMetadata));
            } catch (IOException e) {
                throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + hoodieInstant.toString(), e);
            }
        }).filter((v0) -> {
            return v0.isPresent();
        }).findFirst().orElse(Option.empty());
    }

    protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline hoodieTimeline) throws IOException {
        return getLatestInstantAndCommitMetadataWithValidCheckpointInfo(hoodieTimeline).map(pair -> {
            return (HoodieCommitMetadata) pair.getRight();
        });
    }

    protected Option<String> getLatestInstantWithValidCheckpointInfo(Option<HoodieTimeline> option) {
        return (Option) option.map(hoodieTimeline -> {
            try {
                return getLatestInstantAndCommitMetadataWithValidCheckpointInfo(hoodieTimeline).map(pair -> {
                    return (String) pair.getLeft();
                });
            } catch (IOException e) {
                throw new HoodieIOException("failed to get latest instant with ValidCheckpointInfo", e);
            }
        }).orElse(Option.empty());
    }

    private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema schema) {
        HoodieConfig hoodieConfig = new HoodieConfig(HoodieStreamer.Config.getProps(this.conf, this.cfg));
        hoodieConfig.setValue(DataSourceWriteOptions.TABLE_TYPE(), this.cfg.tableType);
        hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), this.cfg.payloadClassName);
        hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(this.props));
        hoodieConfig.setValue(ClientCookie.PATH_ATTR, this.cfg.targetBasePath);
        return HoodieSparkSqlWriter.getBulkInsertRowConfig(schema != InputBatch.NULL_SCHEMA ? Option.of(schema) : Option.empty(), hoodieConfig, this.cfg.targetBasePath, this.cfg.targetTableName);
    }

    private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(String str, InputBatch inputBatch, HoodieIngestionMetrics hoodieIngestionMetrics, Timer.Context context) {
        Option empty = Option.empty();
        WriteClientWriteResult writeToSink = writeToSink(inputBatch, str);
        JavaRDD<WriteStatus> writeStatusRDD = writeToSink.getWriteStatusRDD();
        Map<String, List<String>> partitionToReplacedFileIds = writeToSink.getPartitionToReplacedFileIds();
        long longValue = writeStatusRDD.mapToDouble((v0) -> {
            return v0.getTotalErrorRecords();
        }).sum().longValue();
        long longValue2 = writeStatusRDD.mapToDouble((v0) -> {
            return v0.getTotalRecords();
        }).sum().longValue();
        long j = longValue2 - longValue;
        LOG.info(String.format("instantTime=%s, totalRecords=%d, totalErrorRecords=%d, totalSuccessfulRecords=%d", str, Long.valueOf(longValue2), Long.valueOf(longValue), Long.valueOf(j)));
        if (longValue2 == 0) {
            LOG.info("No new data, perform empty commit.");
        }
        boolean z = longValue > 0;
        if (z && !this.cfg.commitOnErrors.booleanValue()) {
            LOG.error("Delta Sync found errors when writing. Errors/Total=" + longValue + "/" + longValue2);
            LOG.error("Printing out the top 100 errors");
            writeStatusRDD.filter((v0) -> {
                return v0.hasErrors();
            }).take(100).forEach(writeStatus -> {
                LOG.error("Global error :", writeStatus.getGlobalError());
                if (writeStatus.getErrors().size() > 0) {
                    writeStatus.getErrors().forEach((hoodieKey, th) -> {
                        LOG.trace("Error for key:" + hoodieKey + " is " + th);
                    });
                }
            });
            this.writeClient.rollback(str);
            throw new HoodieStreamerWriteException("Commit " + str + " failed and rolled-back !");
        }
        HashMap hashMap = new HashMap();
        if (!ConfigUtils.getBooleanWithAltKeys(this.props, HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP)) {
            if (inputBatch.getCheckpointForNextBatch() != null) {
                hashMap.put(HoodieStreamer.CHECKPOINT_KEY, inputBatch.getCheckpointForNextBatch());
            }
            if (this.cfg.checkpoint != null) {
                hashMap.put(HoodieStreamer.CHECKPOINT_RESET_KEY, this.cfg.checkpoint);
            }
            if (this.cfg.ignoreCheckpoint != null) {
                hashMap.put(CHECKPOINT_IGNORE_KEY, this.cfg.ignoreCheckpoint);
            }
        }
        if (z) {
            LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + longValue + "/" + longValue2);
        }
        String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
        if (this.errorTableWriter.isPresent() && !this.errorTableWriter.get().upsertAndCommit(str, getLatestInstantWithValidCheckpointInfo(this.commitsTimelineOpt))) {
            switch (AnonymousClass1.$SwitchMap$org$apache$hudi$config$HoodieErrorTableConfig$ErrorWriteFailureStrategy[this.errorWriteFailureStrategy.ordinal()]) {
                case 1:
                    LOG.info("Commit " + str + " failed!");
                    this.writeClient.rollback(str);
                    throw new HoodieStreamerWriteException("Error table commit failed");
                case 2:
                    LOG.error("Error Table write failed for instant " + str);
                    break;
                default:
                    throw new HoodieStreamerWriteException("Write failure strategy not implemented for " + this.errorWriteFailureStrategy);
            }
        }
        if (!this.writeClient.commit(str, writeStatusRDD, Option.of(hashMap), commitActionType, partitionToReplacedFileIds, Option.empty())) {
            LOG.info("Commit " + str + " failed!");
            throw new HoodieStreamerWriteException("Commit " + str + " failed!");
        }
        LOG.info("Commit " + str + " successful!");
        this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch());
        if (this.cfg.isAsyncCompactionEnabled()) {
            empty = this.writeClient.scheduleCompaction(Option.empty());
        }
        if (j > 0 || this.cfg.forceEmptyMetaSync.booleanValue()) {
            runMetaSync();
        } else {
            LOG.info(String.format("Not running metaSync totalSuccessfulRecords=%d", Long.valueOf(j)));
        }
        hoodieIngestionMetrics.updateStreamerMetrics(context != null ? context.stop() : 0L);
        return Pair.of(empty, writeStatusRDD);
    }

    private String startCommit(String str, boolean z) {
        int i = 1;
        IllegalArgumentException illegalArgumentException = null;
        while (i <= 2) {
            try {
                this.writeClient.startCommitWithTime(str, CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType)));
                return str;
            } catch (IllegalArgumentException e) {
                illegalArgumentException = e;
                if (!z) {
                    throw e;
                }
                LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", (Throwable) e);
                i++;
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                }
                str = HoodieActiveTimeline.createNewInstantTime();
            }
        }
        throw illegalArgumentException;
    }

    private WriteClientWriteResult writeToSink(InputBatch inputBatch, String str) {
        WriteClientWriteResult writeClientWriteResult;
        String startCommit = startCommit(str, !this.autoGenerateRecordKeys);
        if (this.useRowWriter) {
            writeClientWriteResult = new WriteClientWriteResult(new HoodieStreamerDatasetBulkInsertCommitActionExecutor(prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema()), this.writeClient, startCommit).execute((Dataset) inputBatch.getBatch().orElseGet(() -> {
                return this.hoodieSparkContext.getSqlContext().emptyDataFrame();
            }), !HoodieStreamerUtils.getPartitionColumns(this.props).isEmpty()).getWriteStatuses());
        } else {
            JavaRDD javaRDD = (JavaRDD) inputBatch.getBatch().orElseGet(() -> {
                return this.hoodieSparkContext.emptyRDD();
            });
            if (this.cfg.filterDupes.booleanValue()) {
                javaRDD = DataSourceUtils.dropDuplicates(this.hoodieSparkContext.jsc(), javaRDD, this.writeClient.getConfig());
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$model$WriteOperationType[this.cfg.operation.ordinal()]) {
                case 1:
                    writeClientWriteResult = new WriteClientWriteResult(this.writeClient.insert(javaRDD, startCommit));
                    break;
                case 2:
                    writeClientWriteResult = new WriteClientWriteResult(this.writeClient.upsert(javaRDD, startCommit));
                    break;
                case 3:
                    writeClientWriteResult = new WriteClientWriteResult(this.writeClient.bulkInsert(javaRDD, startCommit, DataSourceUtils.createUserDefinedBulkInsertPartitioner(this.writeClient.getConfig())));
                    break;
                case 4:
                    HoodieWriteResult insertOverwrite = this.writeClient.insertOverwrite(javaRDD, startCommit);
                    writeClientWriteResult = new WriteClientWriteResult(insertOverwrite.getWriteStatuses());
                    writeClientWriteResult.setPartitionToReplacedFileIds(insertOverwrite.getPartitionToReplaceFileIds());
                    break;
                case 5:
                    HoodieWriteResult insertOverwriteTable = this.writeClient.insertOverwriteTable(javaRDD, startCommit);
                    writeClientWriteResult = new WriteClientWriteResult(insertOverwriteTable.getWriteStatuses());
                    writeClientWriteResult.setPartitionToReplacedFileIds(insertOverwriteTable.getPartitionToReplaceFileIds());
                    break;
                case 6:
                    HoodieWriteResult deletePartitions = this.writeClient.deletePartitions(javaRDD.map(hoodieRecord -> {
                        return hoodieRecord.getPartitionPath();
                    }).distinct().collect(), startCommit);
                    writeClientWriteResult = new WriteClientWriteResult(deletePartitions.getWriteStatuses());
                    writeClientWriteResult.setPartitionToReplacedFileIds(deletePartitions.getPartitionToReplaceFileIds());
                    break;
                default:
                    throw new HoodieStreamerException("Unknown operation : " + this.cfg.operation);
            }
        }
        return writeClientWriteResult;
    }

    private String getSyncClassShortName(String str) {
        return str.substring(str.lastIndexOf(".") + 1);
    }

    public void runMetaSync() {
        List<String> list = (List) Arrays.stream(this.cfg.syncClientToolClassNames.split(",")).distinct().collect(Collectors.toList());
        if (this.cfg.enableHiveSync.booleanValue()) {
            this.cfg.enableMetaSync = true;
            list.add(HiveSyncTool.class.getName());
            LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward compatibility");
        }
        if (this.cfg.enableMetaSync.booleanValue()) {
            LOG.debug("[MetaSync] Starting sync");
            FileSystem fs = HadoopFSUtils.getFs(this.cfg.targetBasePath, this.hoodieSparkContext.hadoopConfiguration());
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putAll(this.props);
            typedProperties.putAll(this.writeClient.getConfig().getProps());
            if (this.props.getBoolean(HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC.key(), ((Boolean) HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC.defaultValue()).booleanValue())) {
                typedProperties.put(HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(this.props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()), this.props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
            }
            HashMap hashMap = new HashMap();
            for (String str : list) {
                Timer.Context metaSyncTimerContext = this.metrics.getMetaSyncTimerContext();
                Option<HoodieMetaSyncException> empty = Option.empty();
                try {
                    SyncUtilHelpers.runHoodieMetaSync(str.trim(), typedProperties, this.conf, fs, this.cfg.targetBasePath, this.cfg.baseFileFormat);
                } catch (HoodieMetaSyncException e) {
                    empty = Option.of(e);
                }
                logMetaSync(str, metaSyncTimerContext, hashMap, empty);
            }
            if (!hashMap.isEmpty()) {
                throw SyncUtilHelpers.getHoodieMetaSyncException(hashMap);
            }
        }
    }

    private void logMetaSync(String str, Timer.Context context, Map<String, HoodieException> map, Option<HoodieMetaSyncException> option) {
        long stop = context != null ? context.stop() : 0L;
        this.metrics.updateStreamerMetaSyncMetrics(getSyncClassShortName(str), stop);
        long j = stop / 1000000;
        String format = String.format("and took %d s %d ms ", Long.valueOf(j / 1000), Long.valueOf(j % 1000));
        if (!option.isPresent()) {
            LOG.info("[MetaSync] SyncTool class {} completed successfully {}", str.trim(), format);
        } else {
            LOG.error("[MetaSync] SyncTool class {} failed with exception {} {}", str.trim(), option.get(), format);
            map.put(str, option.get());
        }
    }

    private void setupWriteClient(Option<JavaRDD<HoodieRecord>> option) throws IOException {
        if (null != this.schemaProvider) {
            reInitWriteClient(this.schemaProvider.getSourceSchema(), this.schemaProvider.getTargetSchema(), option);
        }
    }

    private void reInitWriteClient(Schema schema, Schema schema2, Option<JavaRDD<HoodieRecord>> option) throws IOException {
        LOG.info("Setting up new Hoodie Write Client");
        if (HoodieStreamerUtils.isDropPartitionColumns(this.props).booleanValue()) {
            schema2 = HoodieAvroUtils.removeFields(schema2, HoodieStreamerUtils.getPartitionColumns(this.props));
        }
        Pair<HoodieWriteConfig, Schema> hoodieClientConfigAndWriterSchema = getHoodieClientConfigAndWriterSchema(schema2, true);
        HoodieWriteConfig hoodieWriteConfig = (HoodieWriteConfig) hoodieClientConfigAndWriterSchema.getLeft();
        registerAvroSchemas(schema, (Schema) hoodieClientConfigAndWriterSchema.getRight());
        HoodieWriteConfig orElse = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(this.hoodieSparkContext.jsc(), option, hoodieWriteConfig).orElse(hoodieWriteConfig);
        if (orElse.isEmbeddedTimelineServerEnabled()) {
            if (this.embeddedTimelineService.isPresent()) {
                EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(this.embeddedTimelineService.get(), orElse);
            } else {
                this.embeddedTimelineService = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(this.hoodieSparkContext, orElse);
            }
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        this.writeClient = new SparkRDDWriteClient(this.hoodieSparkContext, orElse, this.embeddedTimelineService);
        this.onInitializingHoodieWriteClient.apply(this.writeClient);
    }

    private HoodieWriteConfig getHoodieClientConfig() {
        return (HoodieWriteConfig) getHoodieClientConfigAndWriterSchema(null, false).getLeft();
    }

    private Pair<HoodieWriteConfig, Schema> getHoodieClientConfigAndWriterSchema(Schema schema, boolean z) {
        Schema schema2;
        HoodieWriteConfig.Builder withProps = HoodieWriteConfig.newBuilder().withPath(this.cfg.targetBasePath).combineInput(this.cfg.filterDupes.booleanValue(), true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(this.cfg.isInlineCompactionEnabled())).build()).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadClass(this.cfg.payloadClassName).withPayloadOrderingField(this.cfg.sourceOrderingField).build()).forTable(this.cfg.targetTableName).withAutoCommit(false).withProps(this.props);
        if (z) {
            schema2 = getSchemaForWriteConfig(schema);
            withProps.withSchema(schema2.toString());
        } else {
            schema2 = schema;
        }
        HoodieWriteConfig build = withProps.build();
        if (build.writeCommitCallbackOn()) {
            if (HoodieWriteCommitKafkaCallback.class.getName().equals(build.getCallbackClass())) {
                HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(build);
            }
            if (HoodieWriteCommitPulsarCallback.class.getName().equals(build.getCallbackClass())) {
                HoodieWriteCommitPulsarCallbackConfig.setCallbackPulsarConfigIfNeeded(build);
            }
        }
        HoodieClusteringConfig from = HoodieClusteringConfig.from(this.props);
        ValidationUtils.checkArgument(build.inlineCompactionEnabled() == this.cfg.isInlineCompactionEnabled(), String.format("%s should be set to %s", HoodieCompactionConfig.INLINE_COMPACT.key(), Boolean.valueOf(this.cfg.isInlineCompactionEnabled())));
        ValidationUtils.checkArgument(build.inlineClusteringEnabled() == from.isInlineClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), Boolean.valueOf(from.isInlineClusteringEnabled())));
        ValidationUtils.checkArgument(build.isAsyncClusteringEnabled() == from.isAsyncClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), Boolean.valueOf(from.isAsyncClusteringEnabled())));
        ValidationUtils.checkArgument(!build.shouldAutoCommit().booleanValue(), String.format("%s should be set to %s", HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), false));
        ValidationUtils.checkArgument(build.shouldCombineBeforeInsert() == this.cfg.filterDupes.booleanValue(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), this.cfg.filterDupes));
        ValidationUtils.checkArgument(build.shouldCombineBeforeUpsert(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), true));
        return Pair.of(build, schema2);
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x0023, code lost:
    
        if (org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(org.apache.hudi.utilities.sources.InputBatch.NULL_SCHEMA, r6).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) goto L9;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.avro.Schema getSchemaForWriteConfig(org.apache.avro.Schema r6) {
        /*
            r5 = this;
            r0 = r6
            r7 = r0
            r0 = r6
            if (r0 == 0) goto L26
            r0 = r6
            org.apache.avro.Schema r1 = org.apache.hudi.utilities.sources.InputBatch.NULL_SCHEMA     // Catch: java.lang.Exception -> L96
            org.apache.avro.SchemaCompatibility$SchemaPairCompatibility r0 = org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(r0, r1)     // Catch: java.lang.Exception -> L96
            org.apache.avro.SchemaCompatibility$SchemaCompatibilityType r0 = r0.getType()     // Catch: java.lang.Exception -> L96
            org.apache.avro.SchemaCompatibility$SchemaCompatibilityType r1 = org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE     // Catch: java.lang.Exception -> L96
            if (r0 != r1) goto L94
            org.apache.avro.Schema r0 = org.apache.hudi.utilities.sources.InputBatch.NULL_SCHEMA     // Catch: java.lang.Exception -> L96
            r1 = r6
            org.apache.avro.SchemaCompatibility$SchemaPairCompatibility r0 = org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(r0, r1)     // Catch: java.lang.Exception -> L96
            org.apache.avro.SchemaCompatibility$SchemaCompatibilityType r0 = r0.getType()     // Catch: java.lang.Exception -> L96
            org.apache.avro.SchemaCompatibility$SchemaCompatibilityType r1 = org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE     // Catch: java.lang.Exception -> L96
            if (r0 != r1) goto L94
        L26:
            org.apache.hudi.common.table.HoodieTableMetaClient$Builder r0 = org.apache.hudi.common.table.HoodieTableMetaClient.builder()     // Catch: java.lang.Exception -> L96
            r1 = r5
            org.apache.hadoop.conf.Configuration r1 = r1.conf     // Catch: java.lang.Exception -> L96
            org.apache.hudi.storage.StorageConfiguration r1 = org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConfWithCopy(r1)     // Catch: java.lang.Exception -> L96
            org.apache.hudi.common.table.HoodieTableMetaClient$Builder r0 = r0.setConf(r1)     // Catch: java.lang.Exception -> L96
            r1 = r5
            org.apache.hudi.utilities.streamer.HoodieStreamer$Config r1 = r1.cfg     // Catch: java.lang.Exception -> L96
            java.lang.String r1 = r1.targetBasePath     // Catch: java.lang.Exception -> L96
            org.apache.hudi.common.table.HoodieTableMetaClient$Builder r0 = r0.setBasePath(r1)     // Catch: java.lang.Exception -> L96
            r1 = r5
            org.apache.hudi.utilities.streamer.HoodieStreamer$Config r1 = r1.cfg     // Catch: java.lang.Exception -> L96
            java.lang.String r1 = r1.payloadClassName     // Catch: java.lang.Exception -> L96
            org.apache.hudi.common.table.HoodieTableMetaClient$Builder r0 = r0.setPayloadClassName(r1)     // Catch: java.lang.Exception -> L96
            org.apache.hudi.common.table.HoodieTableMetaClient r0 = r0.build()     // Catch: java.lang.Exception -> L96
            r8 = r0
            r0 = r8
            org.apache.hudi.common.table.timeline.HoodieActiveTimeline r0 = r0.getActiveTimeline()     // Catch: java.lang.Exception -> L96
            org.apache.hudi.common.table.timeline.HoodieTimeline r0 = r0.getCommitsTimeline()     // Catch: java.lang.Exception -> L96
            org.apache.hudi.common.table.timeline.HoodieTimeline r0 = r0.filterCompletedInstants()     // Catch: java.lang.Exception -> L96
            int r0 = r0.countInstants()     // Catch: java.lang.Exception -> L96
            r9 = r0
            r0 = r9
            if (r0 <= 0) goto L94
            org.apache.hudi.common.table.TableSchemaResolver r0 = new org.apache.hudi.common.table.TableSchemaResolver     // Catch: java.lang.Exception -> L96
            r1 = r0
            r2 = r8
            r1.<init>(r2)     // Catch: java.lang.Exception -> L96
            r10 = r0
            r0 = r10
            r1 = 0
            org.apache.hudi.common.util.Option r0 = r0.getTableAvroSchemaIfPresent(r1)     // Catch: java.lang.Exception -> L96
            r11 = r0
            r0 = r11
            boolean r0 = r0.isPresent()     // Catch: java.lang.Exception -> L96
            if (r0 == 0) goto L89
            r0 = r11
            java.lang.Object r0 = r0.get()     // Catch: java.lang.Exception -> L96
            org.apache.avro.Schema r0 = (org.apache.avro.Schema) r0     // Catch: java.lang.Exception -> L96
            r7 = r0
            goto L94
        L89:
            org.slf4j.Logger r0 = org.apache.hudi.utilities.streamer.StreamSync.LOG     // Catch: java.lang.Exception -> L96
            java.lang.String r1 = "Could not fetch schema from table. Falling back to using target schema from schema provider"
            r0.warn(r1)     // Catch: java.lang.Exception -> L96
        L94:
            r0 = r7
            return r0
        L96:
            r8 = move-exception
            org.apache.hudi.utilities.exception.HoodieSchemaFetchException r0 = new org.apache.hudi.utilities.exception.HoodieSchemaFetchException
            r1 = r0
            java.lang.String r2 = "Failed to fetch schema from table"
            r3 = r8
            r1.<init>(r2, r3)
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hudi.utilities.streamer.StreamSync.getSchemaForWriteConfig(org.apache.avro.Schema):org.apache.avro.Schema");
    }

    private void registerAvroSchemas(SchemaProvider schemaProvider) {
        if (null != schemaProvider) {
            registerAvroSchemas(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema());
        }
    }

    private void registerAvroSchemas(Schema schema, Schema schema2) {
        ArrayList arrayList = new ArrayList();
        if (schema != null) {
            arrayList.add(schema);
        }
        if (schema2 != null) {
            arrayList.add(schema2);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Registering Schema: " + arrayList);
        }
        this.hoodieSparkContext.getJavaSparkContext().sc().getConf().registerAvroSchemas(JavaScalaConverters.convertJavaListToScalaList(arrayList).toList());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
        if (this.formatAdapter != null) {
            this.formatAdapter.close();
        }
        LOG.info("Shutting down embedded timeline server");
        if (this.embeddedTimelineService.isPresent()) {
            this.embeddedTimelineService.get().stopForBasePath(this.cfg.targetBasePath);
        }
        if (this.metrics != null) {
            this.metrics.shutdown();
        }
    }

    public HoodieStorage getStorage() {
        return this.storage;
    }

    public TypedProperties getProps() {
        return this.props;
    }

    public HoodieStreamer.Config getCfg() {
        return this.cfg;
    }

    public Option<HoodieTimeline> getCommitsTimelineOpt() {
        return this.commitsTimelineOpt;
    }

    public HoodieIngestionMetrics getMetrics() {
        return this.metrics;
    }

    public Option<String> getClusteringInstantOpt() {
        return this.writeClient != null ? this.writeClient.scheduleClustering(Option.empty()) : Option.empty();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1585534781:
                if (implMethodName.equals("lambda$null$dcb61ba3$1")) {
                    z = 4;
                    break;
                }
                break;
            case -1585534780:
                if (implMethodName.equals("lambda$null$dcb61ba3$2")) {
                    z = 2;
                    break;
                }
                break;
            case -1030817580:
                if (implMethodName.equals("getTotalRecords")) {
                    z = 3;
                    break;
                }
                break;
            case -674884955:
                if (implMethodName.equals("hasErrors")) {
                    z = true;
                    break;
                }
                break;
            case -213386296:
                if (implMethodName.equals("getTotalErrorRecords")) {
                    z = 5;
                    break;
                }
                break;
            case 205606096:
                if (implMethodName.equals("lambda$null$a136a3ca$1")) {
                    z = false;
                    break;
                }
                break;
            case 1718674463:
                if (implMethodName.equals("lambda$writeToSink$8ba81d9c$1")) {
                    z = 6;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/streamer/StreamSync") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return it -> {
                        return new LazyCastingIterator(it, str);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return (v0) -> {
                        return v0.hasErrors();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/streamer/StreamSync") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lorg/apache/hudi/utilities/streamer/ErrorEvent;")) {
                    return str2 -> {
                        return new ErrorEvent(str2, ErrorEvent.ErrorReason.INVALID_RECORD_SCHEMA);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/DoubleFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)D") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getTotalRecords();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/streamer/StreamSync") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lorg/apache/hudi/utilities/streamer/ErrorEvent;")) {
                    return str22 -> {
                        return new ErrorEvent(str22, ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/DoubleFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)D") && serializedLambda.getImplClass().equals("org/apache/hudi/client/WriteStatus") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getTotalErrorRecords();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/streamer/StreamSync") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/model/HoodieRecord;)Ljava/lang/String;")) {
                    return hoodieRecord -> {
                        return hoodieRecord.getPartitionPath();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
