package org.apache.hudi.sink.clustering;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.metrics.FlinkClusteringMetrics;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/clustering/ClusteringOperator.class */
public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEvent> implements OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>, BoundedOneInput {
    private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
    private final Configuration conf;
    private final RowType rowType;
    private int taskID;
    private transient HoodieWriteConfig writeConfig;
    private transient HoodieFlinkTable<?> table;
    private transient Schema schema;
    private transient Schema readerSchema;
    private transient int[] requiredPos;
    private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
    private transient HoodieFlinkWriteClient writeClient;
    private transient StreamRecordCollector<ClusteringCommitEvent> collector;
    private transient BinaryRowDataSerializer binarySerializer;
    private final boolean asyncClustering;
    private final boolean sortClusteringEnabled;
    private transient NonThrownExecutor executor;
    private transient FlinkClusteringMetrics clusteringMetrics;

    public ClusteringOperator(Configuration configuration, RowType rowType) {
        this.conf = new Configuration(configuration);
        this.rowType = BulkInsertWriterHelper.addMetadataFields(rowType, false);
        this.asyncClustering = OptionsResolver.needsAsyncClustering(configuration);
        this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(configuration);
        this.conf.setLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES));
        this.conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT.key(), Math.min((this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES) / 1024) / 1024, this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT)));
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<ClusteringCommitEvent>> output) {
        super.setup(streamTask, streamConfig, new MaskingOutputAdapter(output));
    }

    public void open() throws Exception {
        super.open();
        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
        this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf);
        this.writeClient = FlinkWriteClients.createWriteClient(this.conf, getRuntimeContext());
        this.table = this.writeClient.getHoodieTable();
        this.schema = AvroSchemaConverter.convertToSchema(this.rowType);
        this.readerSchema = AvroSchemaUtils.asNullable(this.schema);
        this.requiredPos = getRequiredPositions();
        this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(this.rowType);
        this.binarySerializer = new BinaryRowDataSerializer(this.rowType.getFieldCount());
        if (this.asyncClustering) {
            this.executor = NonThrownExecutor.builder(LOG).build();
        }
        this.collector = new StreamRecordCollector<>(this.output);
        registerMetrics();
    }

    public void processElement(StreamRecord<ClusteringPlanEvent> streamRecord) throws Exception {
        ClusteringPlanEvent clusteringPlanEvent = (ClusteringPlanEvent) streamRecord.getValue();
        String clusteringInstantTime = clusteringPlanEvent.getClusteringInstantTime();
        List<ClusteringOperation> operations = clusteringPlanEvent.getClusteringGroupInfo().getOperations();
        if (this.asyncClustering) {
            this.executor.execute(() -> {
                doClustering(clusteringInstantTime, operations);
            }, (str, th) -> {
                this.collector.collect(new ClusteringCommitEvent(clusteringInstantTime, getFileIds(operations), this.taskID));
            }, "Execute clustering for instant %s from task %d", clusteringInstantTime, Integer.valueOf(this.taskID));
        } else {
            LOG.info("Execute clustering for instant {} from task {}", clusteringInstantTime, Integer.valueOf(this.taskID));
            doClustering(clusteringInstantTime, operations);
        }
    }

    public void close() throws Exception {
        if (null != this.executor) {
            this.executor.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
    }

    public void endInput() {
    }

    private void doClustering(String str, List<ClusteringOperation> list) throws Exception {
        this.clusteringMetrics.startClustering();
        BulkInsertWriterHelper bulkInsertWriterHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig, str, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), this.rowType, true);
        Iterator<RowData> readRecordsForGroupWithLogs = list.stream().anyMatch(clusteringOperation -> {
            return CollectionUtils.nonEmpty(clusteringOperation.getDeltaFilePaths());
        }) ? readRecordsForGroupWithLogs(list, str) : readRecordsForGroupBaseFiles(list);
        if (this.sortClusteringEnabled) {
            RowDataSerializer rowDataSerializer = new RowDataSerializer(this.rowType);
            BinaryExternalSorter initSorter = initSorter();
            while (readRecordsForGroupWithLogs.hasNext()) {
                initSorter.write(rowDataSerializer.toBinaryRow(readRecordsForGroupWithLogs.next()).copy());
            }
            RowData createInstance = this.binarySerializer.createInstance();
            while (true) {
                RowData rowData = (BinaryRowData) initSorter.getIterator().next(createInstance);
                createInstance = rowData;
                if (rowData == null) {
                    break;
                } else {
                    bulkInsertWriterHelper.write(createInstance);
                }
            }
            initSorter.close();
        } else {
            while (readRecordsForGroupWithLogs.hasNext()) {
                bulkInsertWriterHelper.write(readRecordsForGroupWithLogs.next());
            }
        }
        List<WriteStatus> writeStatuses = bulkInsertWriterHelper.getWriteStatuses(this.taskID);
        this.clusteringMetrics.endClustering();
        this.collector.collect(new ClusteringCommitEvent(str, getFileIds(list), writeStatuses, this.taskID));
        bulkInsertWriterHelper.close();
    }

    private Iterator<RowData> readRecordsForGroupWithLogs(List<ClusteringOperation> list, String str) {
        ArrayList arrayList = new ArrayList();
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), this.writeConfig);
        LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
        for (ClusteringOperation clusteringOperation : list) {
            try {
                Option empty = StringUtils.isNullOrEmpty(clusteringOperation.getDataFilePath()) ? Option.empty() : Option.of(HoodieIOFactory.getIOFactory(this.table.getStorage()).getReaderFactory(this.table.getConfig().getRecordMerger().getRecordType()).getFileReader(this.table.getConfig(), new StoragePath(clusteringOperation.getDataFilePath())));
                HoodieMergedLogRecordScanner build = HoodieMergedLogRecordScanner.newBuilder().withStorage(this.table.getStorage()).withBasePath(this.table.getMetaClient().getBasePath()).withLogFilePaths(clusteringOperation.getDeltaFilePaths()).withReaderSchema(this.readerSchema).withLatestInstantTime(str).withMaxMemorySizeInBytes(Long.valueOf(maxMemoryPerCompaction)).withReverseReader(this.writeConfig.getCompactionReverseLogReadEnabled().booleanValue()).withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath()).withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withRecordMerger(this.writeConfig.getRecordMerger()).build();
                HoodieTableConfig tableConfig = this.table.getMetaClient().getTableConfig();
                arrayList.add(StreamSupport.stream(Spliterators.spliteratorUnknownSize(new HoodieFileSliceReader(empty, build, this.readerSchema, tableConfig.getPreCombineField(), this.writeConfig.getRecordMerger(), tableConfig.getProps(), tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp())), tableConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(this.writeConfig.getProps())))), 256), false).map(hoodieRecord -> {
                    try {
                        return transform(hoodieRecord.toIndexedRecord(this.readerSchema, new Properties()).get().getData());
                    } catch (IOException e) {
                        throw new HoodieIOException("Failed to read next record", e);
                    }
                }).iterator());
            } catch (IOException e) {
                throw new HoodieClusteringException("Error reading input data for " + clusteringOperation.getDataFilePath() + " and " + clusteringOperation.getDeltaFilePaths(), e);
            }
        }
        return new ConcatenatingIterator(arrayList);
    }

    private Iterator<RowData> readRecordsForGroupBaseFiles(List<ClusteringOperation> list) {
        return new ConcatenatingIterator((List) list.stream().map(clusteringOperation -> {
            Iterable iterable = () -> {
                try {
                    return new CloseableMappingIterator(((HoodieAvroFileReader) HoodieIOFactory.getIOFactory(this.table.getStorage()).getReaderFactory(this.table.getConfig().getRecordMerger().getRecordType()).getFileReader(this.table.getConfig(), new StoragePath(clusteringOperation.getDataFilePath()))).getRecordIterator(this.readerSchema), (v0) -> {
                        return v0.getData();
                    });
                } catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOperation.getDataFilePath() + " and " + clusteringOperation.getDeltaFilePaths(), e);
                }
            };
            return StreamSupport.stream(iterable.spliterator(), false).map(this::transform).iterator();
        }).collect(Collectors.toList()));
    }

    private RowData transform(IndexedRecord indexedRecord) {
        return (RowData) this.avroToRowDataConverter.convert((GenericRecord) indexedRecord);
    }

    private int[] getRequiredPositions() {
        List list = (List) this.readerSchema.getFields().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        return this.schema.getFields().stream().map(field -> {
            return Integer.valueOf(list.indexOf(field.name()));
        }).mapToInt(num -> {
            return num.intValue();
        }).toArray();
    }

    private BinaryExternalSorter initSorter() {
        ClassLoader userCodeClassLoader = getContainingTask().getUserCodeClassLoader();
        NormalizedKeyComputer normalizedKeyComputer = (NormalizedKeyComputer) createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(userCodeClassLoader);
        RecordComparator recordComparator = (RecordComparator) createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(userCodeClassLoader);
        BinaryExternalSorter binaryExternalSorter = Utils.getBinaryExternalSorter(getContainingTask(), getContainingTask().getEnvironment().getMemoryManager(), computeMemorySize(), getContainingTask().getEnvironment().getIOManager(), this.binarySerializer, this.binarySerializer, normalizedKeyComputer, recordComparator, this.conf);
        binaryExternalSorter.startThreads();
        OperatorMetricGroup metricGroup = getMetricGroup();
        binaryExternalSorter.getClass();
        metricGroup.gauge("memoryUsedSizeInBytes", binaryExternalSorter::getUsedMemoryInBytes);
        OperatorMetricGroup metricGroup2 = getMetricGroup();
        binaryExternalSorter.getClass();
        metricGroup2.gauge("numSpillFiles", binaryExternalSorter::getNumSpillFiles);
        OperatorMetricGroup metricGroup3 = getMetricGroup();
        binaryExternalSorter.getClass();
        metricGroup3.gauge("spillInBytes", binaryExternalSorter::getSpillInBytes);
        return binaryExternalSorter;
    }

    private SortCodeGenerator createSortCodeGenerator() {
        return new SortOperatorGen(this.rowType, this.conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(",")).createSortCodeGenerator();
    }

    private String getFileIds(List<ClusteringOperation> list) {
        return (String) list.stream().map((v0) -> {
            return v0.getFileId();
        }).collect(Collectors.joining(","));
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor nonThrownExecutor) {
        this.executor = nonThrownExecutor;
    }

    @VisibleForTesting
    public void setOutput(Output<StreamRecord<ClusteringCommitEvent>> output) {
        this.output = output;
    }

    private void registerMetrics() {
        this.clusteringMetrics = new FlinkClusteringMetrics(getRuntimeContext().getMetricGroup());
        this.clusteringMetrics.registerMetrics();
    }
}
