package org.apache.hudi.sink.bootstrap;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/bootstrap/BootstrapOperator.class */
public class BootstrapOperator<I, O extends HoodieRecord<?>> extends AbstractStreamOperator<O> implements OneInputStreamOperator<I, O> {
    private static final Logger LOG = LoggerFactory.getLogger(BootstrapOperator.class);
    protected HoodieTable<?, ?, ?, ?> hoodieTable;
    private CkpMetadata ckpMetadata;
    protected final Configuration conf;
    protected transient org.apache.hadoop.conf.Configuration hadoopConf;
    protected transient HoodieWriteConfig writeConfig;
    private transient GlobalAggregateManager aggregateManager;
    private transient ListState<String> instantState;
    private final Pattern pattern;
    private String lastInstantTime;

    public BootstrapOperator(Configuration configuration) {
        this.conf = configuration;
        this.pattern = Pattern.compile(configuration.getString(FlinkOptions.INDEX_PARTITION_REGEX));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        this.lastInstantTime = this.ckpMetadata.lastPendingInstant();
        if (null != this.lastInstantTime) {
            this.instantState.update(Collections.singletonList(this.lastInstantTime));
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.instantState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("instantStateDescriptor", Types.STRING));
        if (stateInitializationContext.isRestored()) {
            Iterator it = ((Iterable) this.instantState.get()).iterator();
            if (it.hasNext()) {
                this.lastInstantTime = (String) it.next();
            }
        }
        this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
        this.hoodieTable = FlinkTables.createTable(this.writeConfig, this.hadoopConf, getRuntimeContext());
        this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(this.writeConfig, this.conf);
        this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
        preLoadIndexRecords();
    }

    protected void preLoadIndexRecords() throws Exception {
        String basePath = this.hoodieTable.getMetaClient().getBasePath();
        LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        for (String str : FSUtils.getAllPartitionPaths(new HoodieFlinkEngineContext(this.hadoopConf), FileIndex.metadataConfig(this.conf), basePath)) {
            if (this.pattern.matcher(str).matches()) {
                loadRecords(str);
            }
        }
        LOG.info("Finish sending index records, taskId = {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        waitForBootstrapReady(getRuntimeContext().getIndexOfThisSubtask());
        this.hoodieTable = null;
    }

    private void waitForBootstrapReady(int i) {
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int i2 = 1;
        while (numberOfParallelSubtasks != i2) {
            try {
                i2 = ((Integer) this.aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME + this.conf.getString(FlinkOptions.TABLE_NAME), Integer.valueOf(i), new BootstrapAggFunction())).intValue();
                LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", Integer.valueOf(i));
                TimeUnit.SECONDS.sleep(5L);
            } catch (Exception e) {
                LOG.warn("Update global task bootstrap summary error", e);
            }
        }
    }

    public void processElement(StreamRecord<I> streamRecord) throws Exception {
        this.output.collect(streamRecord);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void loadRecords(String str) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int maxNumberOfParallelSubtasks = getRuntimeContext().getMaxNumberOfParallelSubtasks();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
        if (!StringUtils.isNullOrEmpty(this.lastInstantTime)) {
            commitsTimeline = commitsTimeline.findInstantsAfter(this.lastInstantTime);
        }
        Option<HoodieInstant> lastInstant = commitsTimeline.filterCompletedInstants().lastInstant();
        if (lastInstant.isPresent()) {
            BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
            Schema tableAvroSchema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
            for (FileSlice fileSlice : (List) this.hoodieTable.getSliceView().getLatestMergedFileSlicesBeforeOrOn(str, lastInstant.get().getTimestamp()).collect(Collectors.toList())) {
                if (shouldLoadFile(fileSlice.getFileId(), maxNumberOfParallelSubtasks, numberOfParallelSubtasks, indexOfThisSubtask)) {
                    LOG.info("Load records from {}.", fileSlice);
                    fileSlice.getBaseFile().ifPresent(hoodieBaseFile -> {
                        if (StreamerUtil.isValidFile(hoodieBaseFile.getFileStatus())) {
                            ClosableIterator<HoodieKey> hoodieKeyIterator = baseFileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(hoodieBaseFile.getPath()));
                            Throwable th = null;
                            try {
                                try {
                                    hoodieKeyIterator.forEachRemaining(hoodieKey -> {
                                        this.output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
                                    });
                                    if (hoodieKeyIterator != null) {
                                        if (0 == 0) {
                                            hoodieKeyIterator.close();
                                            return;
                                        }
                                        try {
                                            hoodieKeyIterator.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                } catch (Throwable th3) {
                                    th = th3;
                                    throw th3;
                                }
                            } catch (Throwable th4) {
                                if (hoodieKeyIterator != null) {
                                    if (th != null) {
                                        try {
                                            hoodieKeyIterator.close();
                                        } catch (Throwable th5) {
                                            th.addSuppressed(th5);
                                        }
                                    } else {
                                        hoodieKeyIterator.close();
                                    }
                                }
                                throw th4;
                            }
                        }
                    });
                    List list = (List) fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).filter(hoodieLogFile -> {
                        return StreamerUtil.isValidFile(hoodieLogFile.getFileStatus());
                    }).map(hoodieLogFile2 -> {
                        return hoodieLogFile2.getPath().toString();
                    }).collect(Collectors.toList());
                    try {
                        HoodieMergedLogRecordScanner logScanner = FormatUtils.logScanner((List<String>) list, tableAvroSchema, lastInstant.get().getTimestamp(), this.writeConfig, this.hadoopConf);
                        Throwable th = null;
                        try {
                            try {
                                Iterator<String> it = logScanner.getRecords().keySet().iterator();
                                while (it.hasNext()) {
                                    this.output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(it.next(), str), fileSlice))));
                                }
                                if (logScanner != null) {
                                    if (0 != 0) {
                                        try {
                                            logScanner.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        logScanner.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } catch (Exception e) {
                        throw new HoodieException(String.format("Error when loading record keys from files: %s", list), e);
                    }
                }
            }
        }
        LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
    }

    public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) {
        HoodieAvroRecord hoodieAvroRecord = new HoodieAvroRecord(hoodieKey, null);
        hoodieAvroRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId()));
        hoodieAvroRecord.seal();
        return hoodieAvroRecord;
    }

    protected boolean shouldLoadFile(String str, int i, int i2, int i3) {
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(str, i, i2) == i3;
    }

    @VisibleForTesting
    public boolean isAlreadyBootstrap() throws Exception {
        return ((Iterable) this.instantState.get()).iterator().hasNext();
    }
}
