package org.apache.hudi.client.clustering.run.strategy;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieSparkIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.class */
public abstract class SingleSparkJobExecutionStrategy<T> extends ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(SingleSparkJobExecutionStrategy.class);

    public SingleSparkJobExecutionStrategy(HoodieTable hoodieTable, HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig) {
        super(hoodieTable, hoodieEngineContext, hoodieWriteConfig);
    }

    @Override // org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy
    public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(HoodieClusteringPlan hoodieClusteringPlan, Schema schema, String str) {
        JavaSparkContext sparkContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
        TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier();
        SerializableSchema serializableSchema = new SerializableSchema(schema);
        List list = (List) hoodieClusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList());
        Broadcast broadcast = sparkContext.broadcast(sparkContext.hadoopConfiguration().get("fs.permissions.umask-mode"));
        JavaRDD parallelize = sparkContext.parallelize(list, list.size());
        LOG.info("number of partitions for clustering " + parallelize.getNumPartitions());
        JavaRDD mapPartitions = parallelize.mapPartitions(it -> {
            new Configuration().set("fs.permissions.umask-mode", (String) broadcast.getValue());
            Iterable iterable = () -> {
                return it;
            };
            return ((List) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList())).stream().flatMap(clusteringGroupInfo -> {
                return runClusteringForGroup(clusteringGroupInfo, hoodieClusteringPlan.getStrategy().getStrategyParams(), ((Boolean) Option.ofNullable(hoodieClusteringPlan.getPreserveHoodieMetadata()).orElse(false)).booleanValue(), serializableSchema, taskContextSupplier, str);
            }).iterator();
        });
        HoodieWriteMetadata<HoodieData<WriteStatus>> hoodieWriteMetadata = new HoodieWriteMetadata<>();
        hoodieWriteMetadata.setWriteStatuses(HoodieJavaRDD.of(mapPartitions));
        return hoodieWriteMetadata;
    }

    private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo clusteringGroupInfo, Map<String, String> map, boolean z, SerializableSchema serializableSchema, TaskContextSupplier taskContextSupplier, String str) {
        Iterator<List<WriteStatus>> performClusteringWithRecordsIterator = performClusteringWithRecordsIterator(readRecordsForGroupBaseFiles(clusteringGroupInfo.getOperations()), clusteringGroupInfo.getNumOutputGroups(), str, map, serializableSchema.get(), (List) clusteringGroupInfo.getOperations().stream().map(clusteringOperation -> {
            return new HoodieFileGroupId(clusteringOperation.getPartitionPath(), clusteringOperation.getFileId());
        }).collect(Collectors.toList()), z, taskContextSupplier);
        Iterable iterable = () -> {
            return performClusteringWithRecordsIterator;
        };
        return StreamSupport.stream(iterable.spliterator(), false).flatMap((v0) -> {
            return v0.stream();
        });
    }

    public abstract Iterator<List<WriteStatus>> performClusteringWithRecordsIterator(Iterator<HoodieRecord<T>> it, int i, String str, Map<String, String> map, Schema schema, List<HoodieFileGroupId> list, boolean z, TaskContextSupplier taskContextSupplier);

    private Iterator<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> list) {
        return new ConcatenatingIterator((List) list.stream().map(clusteringOperation -> {
            Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
            Iterable iterable = () -> {
                try {
                    HoodieFileReader fileReader = HoodieSparkIOFactory.getHoodieSparkIOFactory(getHoodieTable().getStorage()).getReaderFactory(this.recordType).getFileReader(this.writeConfig, new StoragePath(clusteringOperation.getDataFilePath()));
                    Option<BaseKeyGenerator> createBaseKeyGenerator = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(this.writeConfig);
                    return new CloseableMappingIterator(fileReader.getRecordIterator(addMetadataFields), obj -> {
                        return ((HoodieRecord) obj).copy2().wrapIntoHoodieRecordPayloadWithKeyGen(addMetadataFields, getWriteConfig().getProps(), createBaseKeyGenerator);
                    });
                } catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOperation.getDataFilePath() + " and " + clusteringOperation.getDeltaFilePaths(), e);
                }
            };
            return StreamSupport.stream(iterable.spliterator(), false).iterator();
        }).collect(Collectors.toList()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1215439287:
                if (implMethodName.equals("lambda$performClustering$a55bde5e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/broadcast/Broadcast;Lorg/apache/hudi/avro/model/HoodieClusteringPlan;Lorg/apache/hudi/common/config/SerializableSchema;Lorg/apache/hudi/common/engine/TaskContextSupplier;Ljava/lang/String;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    SingleSparkJobExecutionStrategy singleSparkJobExecutionStrategy = (SingleSparkJobExecutionStrategy) serializedLambda.getCapturedArg(0);
                    Broadcast broadcast = (Broadcast) serializedLambda.getCapturedArg(1);
                    HoodieClusteringPlan hoodieClusteringPlan = (HoodieClusteringPlan) serializedLambda.getCapturedArg(2);
                    SerializableSchema serializableSchema = (SerializableSchema) serializedLambda.getCapturedArg(3);
                    TaskContextSupplier taskContextSupplier = (TaskContextSupplier) serializedLambda.getCapturedArg(4);
                    String str = (String) serializedLambda.getCapturedArg(5);
                    return it -> {
                        new Configuration().set("fs.permissions.umask-mode", (String) broadcast.getValue());
                        Iterable iterable = () -> {
                            return it;
                        };
                        return ((List) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList())).stream().flatMap(clusteringGroupInfo -> {
                            return runClusteringForGroup(clusteringGroupInfo, hoodieClusteringPlan.getStrategy().getStrategyParams(), ((Boolean) Option.ofNullable(hoodieClusteringPlan.getPreserveHoodieMetadata()).orElse(false)).booleanValue(), serializableSchema, taskContextSupplier, str);
                        }).iterator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
