package org.apache.hudi.sink.bucket;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.class */
public class ConsistentBucketAssignFunction extends ProcessFunction<HoodieRecord, HoodieRecord> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(ConsistentBucketAssignFunction.class);
    private final Configuration config;
    private final List<String> indexKeyFields;
    private final int bucketNum;
    private transient HoodieFlinkWriteClient writeClient;
    private transient Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
    private transient String lastRefreshInstant;
    private final int maxRetries = 10;
    private final long maxWaitTimeInMs = 1000;

    public ConsistentBucketAssignFunction(Configuration configuration) {
        this.config = configuration;
        this.indexKeyFields = Arrays.asList(OptionsResolver.getIndexKeyField(configuration).split(","));
        this.bucketNum = configuration.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
    }

    public void open(Configuration configuration) throws Exception {
        try {
            this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext());
            this.partitionToIdentifier = new HashMap();
            this.lastRefreshInstant = "00000000000000";
        } catch (Throwable th) {
            LOG.error("Fail to initialize consistent bucket assigner", th);
            throw new RuntimeException(th);
        }
    }

    public void processElement(HoodieRecord hoodieRecord, ProcessFunction<HoodieRecord, HoodieRecord>.Context context, Collector<HoodieRecord> collector) throws Exception {
        HoodieKey key = hoodieRecord.getKey();
        String partitionPath = key.getPartitionPath();
        ConsistentHashingNode bucket = getBucketIdentifier(partitionPath).getBucket(key, this.indexKeyFields);
        Preconditions.checkArgument(StringUtils.nonEmpty(bucket.getFileIdPrefix()), "Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " + this.partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key);
        hoodieRecord.unseal();
        hoodieRecord.setCurrentLocation(new HoodieRecordLocation("U", FSUtils.createNewFileId(bucket.getFileIdPrefix(), 0)));
        hoodieRecord.seal();
        collector.collect(hoodieRecord);
    }

    private ConsistentBucketIdentifier getBucketIdentifier(String str) {
        return this.partitionToIdentifier.computeIfAbsent(str, str2 -> {
            HoodieConsistentHashingMetadata hoodieConsistentHashingMetadata = null;
            for (int i = 0; i <= 10; i++) {
                try {
                    hoodieConsistentHashingMetadata = ConsistentBucketIndexUtils.loadOrCreateMetadata(this.writeClient.getHoodieTable(), str2, this.bucketNum);
                    int i2 = i + 1;
                    break;
                } catch (Exception e) {
                    if (i >= 10) {
                        throw new HoodieLockException("Fail to load or create metadata for partition " + str, e);
                    }
                    try {
                        try {
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e2) {
                        }
                        LOG.info("Retrying to load or create metadata for partition {} for {} times", str, Integer.valueOf(i + 1));
                    } catch (Throwable th) {
                        int i3 = i + 1;
                        throw th;
                    }
                }
            }
            ValidationUtils.checkState(hoodieConsistentHashingMetadata != null);
            return new ConsistentBucketIdentifier(hoodieConsistentHashingMetadata);
        });
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        HoodieTimeline findInstantsAfter = this.writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().findInstantsAfter(this.lastRefreshInstant);
        if (findInstantsAfter.empty()) {
            return;
        }
        Iterator<HoodieInstant> it = findInstantsAfter.getInstants().iterator();
        while (it.hasNext()) {
            Set<String> keySet = ((HoodieReplaceCommitMetadata) HoodieReplaceCommitMetadata.fromBytes(findInstantsAfter.getInstantDetails(it.next()).get(), HoodieReplaceCommitMetadata.class)).getPartitionToReplaceFileIds().keySet();
            LOG.info("Clear up cached hashing metadata because find a new replace commit.\n Instant: {}.\n Effected Partitions: {}.", this.lastRefreshInstant, keySet);
            Map<String, ConsistentBucketIdentifier> map = this.partitionToIdentifier;
            map.getClass();
            keySet.forEach((v1) -> {
                r1.remove(v1);
            });
        }
        this.lastRefreshInstant = findInstantsAfter.lastInstant().get().requestedTime();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((HoodieRecord) obj, (ProcessFunction<HoodieRecord, HoodieRecord>.Context) context, (Collector<HoodieRecord>) collector);
    }
}
