package org.apache.rocketmq.streams.connectors.balance.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.model.ServiceName;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
import org.apache.rocketmq.streams.connectors.balance.AbstractBalance;
import org.apache.rocketmq.streams.connectors.source.SourceInstance;
import org.apache.rocketmq.streams.lease.LeaseComponent;
import org.apache.rocketmq.streams.lease.model.LeaseInfo;

@ServiceName(LeaseBalanceImpl.DB_BALANCE_NAME)
/* loaded from: input_file:org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.class */
public class LeaseBalanceImpl extends AbstractBalance {
    private static final Log logger = LogFactory.getLog(LeaseBalanceImpl.class);
    public static final String DB_BALANCE_NAME = "db_balance";
    private static final String REMOVE_SPLIT_LOCK_NAME = "lock_remove_split";
    private static final String SOURCE_LOCK_PREFIX = "SOURCE_";
    private static final String SPLIT_LOCK_PREFIX = "SPLIT_";
    protected transient String sourceIdentification;
    protected transient LeaseComponent leaseComponent = LeaseComponent.getInstance();
    protected int lockTimeSecond = 5;

    public LeaseBalanceImpl(String str) {
        this.sourceIdentification = str;
    }

    public LeaseBalanceImpl() {
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.AbstractBalance
    protected List<ISplit> fetchWorkingSplits(List<ISplit> list) {
        List queryLockedInstanceByNamePrefix = this.leaseComponent.getService().queryLockedInstanceByNamePrefix(SPLIT_LOCK_PREFIX + this.sourceIdentification, (String) null);
        logger.info(String.format("lease SPLIT_LOCK_PREFIX is %s, sourceIdentification is %s. ", SPLIT_LOCK_PREFIX, this.sourceIdentification));
        if (queryLockedInstanceByNamePrefix == null) {
            return new ArrayList();
        }
        HashMap hashMap = new HashMap();
        for (ISplit iSplit : list) {
            hashMap.put(iSplit.getQueueId(), iSplit);
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = queryLockedInstanceByNamePrefix.iterator();
        while (it.hasNext()) {
            arrayList.add(hashMap.get(MapKeyUtil.getLast(((LeaseInfo) it.next()).getLeaseName())));
        }
        logger.info(String.format("working split is %s", Arrays.toString(arrayList.toArray())));
        return arrayList;
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.AbstractBalance
    protected List<SourceInstance> fetchSourceInstances() {
        List queryLockedInstanceByNamePrefix = this.leaseComponent.getService().queryLockedInstanceByNamePrefix(SOURCE_LOCK_PREFIX + this.sourceIdentification, (String) null);
        if (queryLockedInstanceByNamePrefix == null) {
            return new ArrayList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = queryLockedInstanceByNamePrefix.iterator();
        while (it.hasNext()) {
            arrayList.add(new SourceInstance(((LeaseInfo) it.next()).getLeaseName()));
        }
        return arrayList;
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.AbstractBalance
    protected boolean holdLockSourceInstance() {
        return holdLock(SOURCE_LOCK_PREFIX + this.sourceIdentification, RuntimeUtil.getDipperInstanceId());
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.AbstractBalance
    protected void unlockSourceInstance() {
        this.leaseComponent.getService().unlock(SOURCE_LOCK_PREFIX + this.sourceIdentification, RuntimeUtil.getDipperInstanceId());
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.ISourceBalance
    public boolean holdLockSplit(ISplit iSplit) {
        return holdLock(SPLIT_LOCK_PREFIX + this.sourceIdentification, iSplit.getQueueId());
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.ISourceBalance
    public void unlockSplit(ISplit iSplit) {
        this.leaseComponent.getService().unlock(SPLIT_LOCK_PREFIX + this.sourceIdentification, iSplit.getQueueId());
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.ISourceBalance
    public boolean getRemoveSplitLock() {
        return holdLock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME);
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.ISourceBalance
    public void unLockRemoveSplitLock() {
        this.leaseComponent.getService().unlock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME);
    }

    public String getSourceIdentification() {
        return this.sourceIdentification;
    }

    @Override // org.apache.rocketmq.streams.connectors.balance.ISourceBalance
    public void setSourceIdentification(String str) {
        this.sourceIdentification = str;
    }

    protected boolean holdLock(String str, String str2) {
        return this.leaseComponent.getService().holdLock(str, str2, this.lockTimeSecond);
    }
}
