/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bucket;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.sink.StreamWriteFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketStreamWriteFunction<I>
extends StreamWriteFunction<I> {
    private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);
    private int parallelism;
    private int bucketNum;
    private String indexKeyFields;
    private Map<String, Map<Integer, String>> bucketIndex;
    private Set<String> incBucketIndex;

    public BucketStreamWriteFunction(Configuration config) {
        super(config);
    }

    @Override
    public void open(Configuration parameters) throws IOException {
        super.open(parameters);
        this.bucketNum = this.config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
        this.indexKeyFields = this.config.getString(FlinkOptions.INDEX_KEY_FIELD);
        this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        this.parallelism = this.getRuntimeContext().getNumberOfParallelSubtasks();
        this.bucketIndex = new HashMap<String, Map<Integer, String>>();
        this.incBucketIndex = new HashSet<String>();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        super.initializeState(context);
    }

    @Override
    public void snapshotState() {
        super.snapshotState();
        this.incBucketIndex.clear();
    }

    @Override
    public void processElement(I i, ProcessFunction.Context context, Collector<Object> collector) throws Exception {
        HoodieRecordLocation location;
        HoodieRecord record = (HoodieRecord)i;
        HoodieKey hoodieKey = record.getKey();
        String partition = hoodieKey.getPartitionPath();
        this.bootstrapIndexIfNeed(partition);
        Map bucketToFileId = this.bucketIndex.computeIfAbsent(partition, p -> new HashMap());
        int bucketNum = BucketIdentifier.getBucketId((HoodieKey)hoodieKey, (String)this.indexKeyFields, (int)this.bucketNum);
        String bucketId = partition + bucketNum;
        if (this.incBucketIndex.contains(bucketId)) {
            location = new HoodieRecordLocation("I", (String)bucketToFileId.get(bucketNum));
        } else if (bucketToFileId.containsKey(bucketNum)) {
            location = new HoodieRecordLocation("U", (String)bucketToFileId.get(bucketNum));
        } else {
            String newFileId = BucketIdentifier.newBucketFileIdPrefix((int)bucketNum);
            location = new HoodieRecordLocation("I", newFileId);
            bucketToFileId.put(bucketNum, newFileId);
            this.incBucketIndex.add(bucketId);
        }
        record.unseal();
        record.setCurrentLocation(location);
        record.seal();
        this.bufferRecord(record);
    }

    public boolean isBucketToLoad(int bucketNumber, String partition) {
        int globalHash = (partition + bucketNumber).hashCode() & Integer.MAX_VALUE;
        return BucketIdentifier.mod((int)globalHash, (int)this.parallelism) == this.taskID;
    }

    private void bootstrapIndexIfNeed(String partition) {
        if (this.bucketIndex.containsKey(partition)) {
            return;
        }
        LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(), this.metaClient.getBasePath() + "/" + partition));
        HashMap bucketToFileIDMap = new HashMap();
        this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> {
            String fileID = fileGroup.getFileGroupId().getFileId();
            int bucketNumber = BucketIdentifier.bucketIdFromFileId((String)fileID);
            if (this.isBucketToLoad(bucketNumber, partition)) {
                LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));
                if (bucketToFileIDMap.containsKey(bucketNumber)) {
                    throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found during the BucketStreamWriteFunction index bootstrap.", fileID, bucketNumber, partition));
                }
                LOG.info(String.format("Adding fileID %s to the bucket %s of partition %s.", fileID, bucketNumber, partition));
                bucketToFileIDMap.put(bucketNumber, fileID);
            }
        });
        this.bucketIndex.put(partition, bucketToFileIDMap);
    }
}

