package org.apache.rocketmq.streams.connectors.source;

import com.alibaba.fastjson.JSON;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
import org.apache.rocketmq.streams.connectors.balance.SplitChanged;
import org.apache.rocketmq.streams.connectors.balance.impl.LeaseBalanceImpl;
import org.apache.rocketmq.streams.connectors.model.PullMessage;
import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;

/* loaded from: input_file:org/apache/rocketmq/streams/connectors/source/AbstractPullSource.class */
public abstract class AbstractPullSource extends AbstractSource implements IPullSource<AbstractSource> {
    private static final Log logger = LogFactory.getLog(AbstractPullSource.class);
    protected transient ISourceBalance balance;
    protected transient ScheduledExecutorService balanceExecutor;
    protected long pullIntervalMs;
    protected transient Map<String, ISplitReader> splitReaders = new HashMap();
    protected transient Map<String, ISplit> ownerSplits = new HashMap();
    protected String balanceName = LeaseBalanceImpl.DB_BALANCE_NAME;
    protected int balanceTimeSecond = 10;
    transient CheckPointManager checkPointManager = new CheckPointManager();

    protected boolean startSource() {
        this.balance = (ISourceBalance) ServiceLoaderComponent.getInstance(ISourceBalance.class).getService().loadService(this.balanceName);
        this.balance.setSourceIdentification(MapKeyUtil.createKey(new String[]{getNameSpace(), getConfigureName()}));
        this.balanceExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new BasicThreadFactory.Builder().namingPattern("balance-task-%d").daemon(true).build());
        doSplitChanged(this.balance.doBalance(fetchAllSplits(), new ArrayList(this.ownerSplits.values())));
        this.balanceExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.rocketmq.streams.connectors.source.AbstractPullSource.1
            @Override // java.lang.Runnable
            public void run() {
                AbstractPullSource.logger.info("balance running..... current splits is " + AbstractPullSource.this.ownerSplits);
                AbstractPullSource.this.doSplitChanged(AbstractPullSource.this.balance.doBalance(AbstractPullSource.this.fetchAllSplits(), new ArrayList(AbstractPullSource.this.ownerSplits.values())));
            }
        }, this.balanceTimeSecond, this.balanceTimeSecond, TimeUnit.SECONDS);
        return true;
    }

    @Override // org.apache.rocketmq.streams.connectors.source.IPullSource
    public Map<String, ISplit> getAllSplitMap() {
        List<ISplit> fetchAllSplits = fetchAllSplits();
        if (fetchAllSplits == null) {
            return new HashMap();
        }
        HashMap hashMap = new HashMap();
        for (ISplit iSplit : fetchAllSplits) {
            hashMap.put(iSplit.getQueueId(), iSplit);
        }
        return hashMap;
    }

    protected void doSplitChanged(SplitChanged splitChanged) {
        if (splitChanged == null || splitChanged.getSplitCount() == 0) {
            return;
        }
        if (splitChanged.isNewSplit()) {
            doSplitAddition(splitChanged.getChangedSplits());
        } else {
            doSplitRelease(splitChanged.getChangedSplits());
        }
    }

    protected void doSplitAddition(List<ISplit> list) {
        if (list == null) {
            return;
        }
        HashSet hashSet = new HashSet();
        Iterator<ISplit> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getQueueId());
        }
        addNewSplit(hashSet);
        for (final ISplit iSplit : list) {
            final ISplitReader createSplitReader = createSplitReader(iSplit);
            createSplitReader.open(iSplit);
            createSplitReader.seek(loadSplitOffset(iSplit));
            this.splitReaders.put(iSplit.getQueueId(), createSplitReader);
            this.ownerSplits.put(iSplit.getQueueId(), iSplit);
            logger.info("start next");
            Thread thread = new Thread(new Runnable() { // from class: org.apache.rocketmq.streams.connectors.source.AbstractPullSource.2
                long mLastCheckTime = System.currentTimeMillis();

                @Override // java.lang.Runnable
                public void run() {
                    List<PullMessage> message;
                    AbstractPullSource.logger.info("start running");
                    while (!createSplitReader.isInterrupt()) {
                        if (createSplitReader.next() && (message = createSplitReader.getMessage()) != null) {
                            for (PullMessage pullMessage : message) {
                                Message createMessage = AbstractPullSource.this.createMessage(AbstractPullSource.this.createJson(pullMessage.getMessage()), iSplit.getQueueId(), pullMessage.getOffsetStr(), false);
                                createMessage.getHeader().setOffsetIsLong(Boolean.valueOf(pullMessage.getMessageOffset().isLongOfMainOffset()));
                                AbstractPullSource.this.executeMessage(createMessage);
                            }
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis - this.mLastCheckTime > AbstractPullSource.this.getCheckpointTime()) {
                            AbstractPullSource.this.sendCheckpoint(createSplitReader.getSplit().getQueueId());
                            this.mLastCheckTime = currentTimeMillis;
                        }
                        try {
                            Thread.sleep(AbstractPullSource.this.pullIntervalMs);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e2) {
                        e2.printStackTrace();
                    }
                    HashSet hashSet2 = new HashSet();
                    hashSet2.add(createSplitReader.getSplit().getQueueId());
                    AbstractPullSource.this.removeSplit(hashSet2);
                    AbstractPullSource.this.balance.unlockSplit(iSplit);
                    createSplitReader.close();
                    synchronized (createSplitReader) {
                        createSplitReader.notifyAll();
                    }
                }
            });
            thread.setName("reader-task-" + createSplitReader.getSplit().getQueueId());
            thread.start();
        }
    }

    @Override // org.apache.rocketmq.streams.connectors.source.IPullSource
    public String loadSplitOffset(ISplit iSplit) {
        String str = null;
        CheckPoint recover = this.checkPointManager.recover(this, iSplit);
        if (recover != null) {
            str = JSON.parseObject((String) recover.getData()).getString("offset");
        }
        return str;
    }

    protected abstract ISplitReader createSplitReader(ISplit iSplit);

    protected void doSplitRelease(List<ISplit> list) {
        if (this.balance.getRemoveSplitLock()) {
            try {
                ArrayList<SplitCloseFuture> arrayList = new ArrayList();
                Iterator<ISplit> it = list.iterator();
                while (it.hasNext()) {
                    ISplitReader iSplitReader = this.splitReaders.get(it.next().getQueueId());
                    if (iSplitReader != null) {
                        arrayList.add(iSplitReader.close());
                    }
                }
                for (SplitCloseFuture splitCloseFuture : arrayList) {
                    try {
                        splitCloseFuture.get();
                        this.splitReaders.remove(splitCloseFuture.getSplit().getQueueId());
                        this.ownerSplits.remove(splitCloseFuture.getSplit().getQueueId());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e2) {
                        e2.printStackTrace();
                    }
                }
            } finally {
                this.balance.unLockRemoveSplitLock();
            }
        }
    }

    public boolean supportNewSplitFind() {
        return true;
    }

    public boolean supportRemoveSplitFind() {
        return true;
    }

    public boolean supportOffsetRest() {
        return true;
    }

    @Override // org.apache.rocketmq.streams.connectors.source.IPullSource
    public Long getPullIntervalMs() {
        return Long.valueOf(this.pullIntervalMs);
    }

    public String getBalanceName() {
        return this.balanceName;
    }

    public void setBalanceName(String str) {
        this.balanceName = str;
    }

    public int getBalanceTimeSecond() {
        return this.balanceTimeSecond;
    }

    public void setBalanceTimeSecond(int i) {
        this.balanceTimeSecond = i;
    }

    public void setPullIntervalMs(long j) {
        this.pullIntervalMs = j;
    }

    @Override // org.apache.rocketmq.streams.connectors.source.IPullSource
    public List<ISplit> ownerSplits() {
        return new ArrayList(this.ownerSplits.values());
    }
}
