package org.apache.hudi.sink.partitioner;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.util.StreamerUtil;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/sink/partitioner/BucketAssigner.class */
public class BucketAssigner implements AutoCloseable {
    private static final Logger LOG = LogManager.getLogger(BucketAssigner.class);
    private final int taskID;
    private final int maxParallelism;
    private final int numTasks;
    protected final HoodieWriteConfig config;
    private final WriteProfile writeProfile;
    private final HashMap<String, BucketInfo> bucketInfoMap = new HashMap<>();
    private final Map<String, SmallFileAssign> smallFileAssignMap = new HashMap();
    private final Map<String, NewFileAssignState> newFileAssignStates = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/partitioner/BucketAssigner$NewFileAssignState.class */
    public static class NewFileAssignState {
        long assigned = 0;
        long totalUnassigned;
        final String fileId;

        NewFileAssignState(String str, long j) {
            this.fileId = str;
            this.totalUnassigned = j;
        }

        public boolean canAssign() {
            return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
        }

        public void assign() {
            Preconditions.checkState(canAssign(), "Can not assign insert to new file: assigned => " + this.assigned + " totalUnassigned => " + this.totalUnassigned);
            this.assigned++;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/partitioner/BucketAssigner$SmallFileAssign.class */
    public static class SmallFileAssign {
        final SmallFileAssignState[] states;
        int assignIdx = 0;
        boolean noSpace = false;

        SmallFileAssign(SmallFileAssignState[] smallFileAssignStateArr) {
            this.states = smallFileAssignStateArr;
        }

        public boolean assign() {
            if (this.noSpace) {
                return false;
            }
            SmallFileAssignState smallFileAssignState = this.states[this.assignIdx];
            while (true) {
                SmallFileAssignState smallFileAssignState2 = smallFileAssignState;
                if (smallFileAssignState2.canAssign()) {
                    smallFileAssignState2.assign();
                    return true;
                }
                this.assignIdx++;
                if (this.assignIdx >= this.states.length) {
                    this.noSpace = true;
                    return false;
                }
                smallFileAssignState = this.states[this.assignIdx];
            }
        }

        public String getFileId() {
            return this.states[this.assignIdx].fileId;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/partitioner/BucketAssigner$SmallFileAssignState.class */
    public static class SmallFileAssignState {
        long assigned = 0;
        long totalUnassigned;
        final String fileId;

        SmallFileAssignState(long j, SmallFile smallFile, long j2) {
            this.totalUnassigned = (j - smallFile.sizeBytes) / j2;
            this.fileId = smallFile.location.getFileId();
        }

        public boolean canAssign() {
            return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
        }

        public void assign() {
            Preconditions.checkState(canAssign(), "Can not assign insert to small file: assigned => " + this.assigned + " totalUnassigned => " + this.totalUnassigned);
            this.assigned++;
        }
    }

    public BucketAssigner(int i, int i2, int i3, WriteProfile writeProfile, HoodieWriteConfig hoodieWriteConfig) {
        this.taskID = i;
        this.maxParallelism = i2;
        this.numTasks = i3;
        this.config = hoodieWriteConfig;
        this.writeProfile = writeProfile;
    }

    public void reset() {
        this.bucketInfoMap.clear();
        this.newFileAssignStates.clear();
    }

    public BucketInfo addUpdate(String str, String str2) {
        String generateBucketKey = StreamerUtil.generateBucketKey(str, str2);
        if (!this.bucketInfoMap.containsKey(generateBucketKey)) {
            this.bucketInfoMap.put(generateBucketKey, new BucketInfo(BucketType.UPDATE, str2, str));
        }
        return this.bucketInfoMap.get(generateBucketKey);
    }

    public BucketInfo addInsert(String str) {
        SmallFileAssign smallFileAssign = getSmallFileAssign(str);
        if (smallFileAssign != null && smallFileAssign.assign()) {
            String generateBucketKey = StreamerUtil.generateBucketKey(str, smallFileAssign.getFileId());
            return this.bucketInfoMap.containsKey(generateBucketKey) ? this.bucketInfoMap.get(generateBucketKey) : addUpdate(str, smallFileAssign.getFileId());
        }
        if (this.newFileAssignStates.containsKey(str)) {
            NewFileAssignState newFileAssignState = this.newFileAssignStates.get(str);
            if (newFileAssignState.canAssign()) {
                newFileAssignState.assign();
                return this.bucketInfoMap.get(StreamerUtil.generateBucketKey(str, newFileAssignState.fileId));
            }
        }
        BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, createFileIdOfThisTask(), str);
        this.bucketInfoMap.put(StreamerUtil.generateBucketKey(str, bucketInfo.getFileIdPrefix()), bucketInfo);
        NewFileAssignState newFileAssignState2 = new NewFileAssignState(bucketInfo.getFileIdPrefix(), this.writeProfile.getRecordsPerBucket());
        newFileAssignState2.assign();
        this.newFileAssignStates.put(str, newFileAssignState2);
        return bucketInfo;
    }

    private SmallFileAssign getSmallFileAssign(String str) {
        if (this.smallFileAssignMap.containsKey(str)) {
            return this.smallFileAssignMap.get(str);
        }
        List<SmallFile> smallFilesOfThisTask = smallFilesOfThisTask(this.writeProfile.getSmallFiles(str));
        if (smallFilesOfThisTask.size() <= 0) {
            return null;
        }
        LOG.info("For partitionPath : " + str + " Small Files => " + smallFilesOfThisTask);
        SmallFileAssign smallFileAssign = new SmallFileAssign((SmallFileAssignState[]) smallFilesOfThisTask.stream().map(smallFile -> {
            return new SmallFileAssignState(this.config.getParquetMaxFileSize(), smallFile, this.writeProfile.getAvgSize());
        }).toArray(i -> {
            return new SmallFileAssignState[i];
        }));
        this.smallFileAssignMap.put(str, smallFileAssign);
        return smallFileAssign;
    }

    public void reload(long j) {
        this.smallFileAssignMap.clear();
        this.writeProfile.reload(j);
    }

    public HoodieTable<?, ?, ?, ?> getTable() {
        return this.writeProfile.getTable();
    }

    private boolean fileIdOfThisTask(String str) {
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(str, this.maxParallelism, this.numTasks) == this.taskID;
    }

    @VisibleForTesting
    public String createFileIdOfThisTask() {
        String createNewFileIdPfx = FSUtils.createNewFileIdPfx();
        while (true) {
            String str = createNewFileIdPfx;
            if (fileIdOfThisTask(str)) {
                return str;
            }
            createNewFileIdPfx = FSUtils.createNewFileIdPfx();
        }
    }

    @VisibleForTesting
    public List<SmallFile> smallFilesOfThisTask(List<SmallFile> list) {
        return (List) list.stream().filter(smallFile -> {
            return fileIdOfThisTask(smallFile.location.getFileId());
        }).collect(Collectors.toList());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        reset();
        WriteProfiles.clean(this.config.getBasePath());
    }
}
