package org.apache.rocketmq.streams.connectors.balance;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.connectors.source.SourceInstance;

/* loaded from: input_file:org/apache/rocketmq/streams/connectors/balance/AbstractBalance.class */
public abstract class AbstractBalance implements ISourceBalance {
    protected int balanceCount = 0;

    @Override // org.apache.rocketmq.streams.connectors.balance.ISourceBalance
    public SplitChanged doBalance(List<ISplit> list, List<ISplit> list2) {
        this.balanceCount++;
        heartBeat();
        List<SourceInstance> fetchSourceInstances = fetchSourceInstances();
        List<ISplit> fetchWorkingSplits = fetchWorkingSplits(list);
        SplitChanged additionSplits = getAdditionSplits(list, fetchSourceInstances, fetchWorkingSplits, list2);
        return additionSplits != null ? additionSplits : getRemoveSplits(list, fetchSourceInstances, fetchWorkingSplits, list2);
    }

    protected void heartBeat() {
        holdLockSourceInstance();
    }

    protected abstract List<ISplit> fetchWorkingSplits(List<ISplit> list);

    protected abstract List<SourceInstance> fetchSourceInstances();

    protected abstract boolean holdLockSourceInstance();

    protected abstract void unlockSourceInstance();

    protected SplitChanged getAdditionSplits(List<ISplit> list, List<SourceInstance> list2, List<ISplit> list3, List<ISplit> list4) {
        SplitChanged changedSplitCount = getChangedSplitCount(list, list2, list3.size(), list4.size());
        if (changedSplitCount == null || !changedSplitCount.isNewSplit || changedSplitCount.splitCount <= 0) {
            return null;
        }
        List<ISplit> noWorkingSplits = getNoWorkingSplits(list, list3);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < noWorkingSplits.size(); i++) {
            if (holdLockSplit(noWorkingSplits.get(i))) {
                arrayList.add(noWorkingSplits.get(i));
                if (arrayList.size() >= changedSplitCount.splitCount) {
                    break;
                }
            }
        }
        changedSplitCount.setChangedSplits(arrayList);
        return changedSplitCount;
    }

    protected List<ISplit> getNoWorkingSplits(List<ISplit> list, List<ISplit> list2) {
        HashSet hashSet = new HashSet();
        Iterator<ISplit> it = list2.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getQueueId());
        }
        ArrayList arrayList = new ArrayList();
        for (ISplit iSplit : list) {
            if (!hashSet.contains(iSplit.getQueueId())) {
                arrayList.add(iSplit);
            }
        }
        return arrayList;
    }

    protected SplitChanged getRemoveSplits(List<ISplit> list, List<SourceInstance> list2, List<ISplit> list3, List<ISplit> list4) {
        SplitChanged changedSplitCount = getChangedSplitCount(list, list2, list3.size(), list4.size());
        if (changedSplitCount == null || changedSplitCount.isNewSplit || changedSplitCount.splitCount <= 0) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < changedSplitCount.splitCount; i++) {
            arrayList.add(list4.get(i));
        }
        changedSplitCount.setChangedSplits(arrayList);
        return changedSplitCount;
    }

    protected SplitChanged getChangedSplitCount(List<ISplit> list, List<SourceInstance> list2, int i, int i2) {
        int size = list2.size();
        if (size == 0) {
            size = 1;
        }
        int size2 = list.size();
        int i3 = size2 / size;
        int i4 = i3 + (size2 % size == 0 ? 0 : 1);
        if (i2 == i4) {
            return null;
        }
        if (i2 > i4) {
            return new SplitChanged(i2 - i4, false);
        }
        if ((i != size2 || i2 < i3) && i < size2 && i2 < i4) {
            return new SplitChanged(Math.min(i4 - i2, getMaxSplitCountInOneBalance()), true);
        }
        return null;
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.ISourceBalance
    public int getBalanceCount() {
        return this.balanceCount;
    }

    private int getMaxSplitCountInOneBalance() {
        return (int) Math.pow(2.0d, getBalanceCount() - 1);
    }
}
