package org.apache.rocketmq.streams.connectors.source;

import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.ChangeTableNameMessage;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.ThreadUtil;
import org.apache.rocketmq.streams.connectors.IBoundedSource;
import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
import org.apache.rocketmq.streams.connectors.source.filter.CycleScheduleFilter;
import org.apache.rocketmq.streams.db.CycleSplit;

/* loaded from: input_file:org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.class */
public class CycleDynamicMultipleDBScanSource extends DynamicMultipleDBScanSource implements IBoundedSource, Serializable {
    private static final long serialVersionUID = 6840988298037061128L;
    private static final Log logger = LogFactory.getLog(CycleDynamicMultipleDBScanSource.class);
    CycleSchedule.Cycle cycle;
    Map<String, Boolean> initReaderMap = new ConcurrentHashMap();
    transient AtomicInteger size = new AtomicInteger(0);

    public CycleDynamicMultipleDBScanSource() {
    }

    public CycleDynamicMultipleDBScanSource(CycleSchedule.Cycle cycle) {
        this.cycle = cycle;
    }

    public AtomicInteger getSize() {
        return this.size;
    }

    public void setSize(AtomicInteger atomicInteger) {
        this.size = atomicInteger;
    }

    @Override // org.apache.rocketmq.streams.connectors.source.DynamicMultipleDBScanSource, org.apache.rocketmq.streams.connectors.source.IPullSource
    public synchronized List<ISplit> fetchAllSplits() {
        if (this.filter == null) {
            this.filter = new CycleScheduleFilter(this.cycle.getAllPattern());
        }
        if (this.size.get() == this.cycle.getCycleCount().intValue()) {
            return this.splits;
        }
        String createKey = createKey(this);
        List<String> listTableNameByPattern = MetaDataUtils.listTableNameByPattern(this.url, this.userName, this.password, this.logicTableName + "%");
        logger.info(String.format("load all logic table : %s", Arrays.toString(listTableNameByPattern.toArray())));
        Iterator<String> it = listTableNameByPattern.iterator();
        while (it.hasNext()) {
            String next = it.next();
            String replace = next.replace(this.logicTableName + "_", "");
            if (this.filter.filter(createKey, this.logicTableName, replace)) {
                logger.info(String.format("filter add %s", next));
                ISplit cycleSplit = new CycleSplit();
                cycleSplit.setLogicTableName(this.logicTableName);
                cycleSplit.setSuffix(replace);
                cycleSplit.setCyclePeriod(this.cycle.getCycleDateStr());
                String queueId = cycleSplit.getQueueId();
                if (this.initReaderMap.get(queueId) == null) {
                    this.initReaderMap.put(queueId, false);
                    this.splits.add(cycleSplit);
                    this.size.incrementAndGet();
                }
            } else {
                logger.info(String.format("filter remove %s", next));
                it.remove();
            }
        }
        this.tableNames = listTableNameByPattern;
        return this.splits;
    }

    public Map<String, Boolean> getInitReaderMap() {
        return this.initReaderMap;
    }

    public void setInitReaderMap(Map<String, Boolean> map) {
        this.initReaderMap = map;
    }

    public void finish() {
        super.finish();
        for (Map.Entry<String, Boolean> entry : this.initReaderMap.entrySet()) {
            String key = entry.getKey();
            if (!entry.getValue().booleanValue()) {
                logger.error(String.format("split[%s] reader is not finish, exit with error. ", key));
            }
        }
        this.initReaderMap.clear();
        this.initReaderMap = null;
        this.splits.clear();
        this.splits = null;
    }

    public boolean isFinished() {
        List<ReaderStatus> queryReaderStatusListBySourceName = ReaderStatus.queryReaderStatusListBySourceName(createKey(this));
        return queryReaderStatusListBySourceName != null && queryReaderStatusListBySourceName.size() == this.size.get();
    }

    @Override // org.apache.rocketmq.streams.connectors.source.DynamicMultipleDBScanSource, org.apache.rocketmq.streams.connectors.source.AbstractPullSource
    protected ISplitReader createSplitReader(ISplit iSplit) {
        return super.createSplitReader(iSplit);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendChangeTableNameMessage() {
        logger.info(String.format("start send change table name message.", new Object[0]));
        ChangeTableNameMessage changeTableNameMessage = new ChangeTableNameMessage();
        changeTableNameMessage.setScheduleCycle(this.cycle.getCycleDateStr());
        Message createMessage = createMessage(new JSONObject(), null, null, false);
        createMessage.setSystemMessage(changeTableNameMessage);
        createMessage.getHeader().setSystemMessage(true);
        executeMessage(createMessage);
        logger.info(String.format("finish send change table name message.", new Object[0]));
    }

    @Override // org.apache.rocketmq.streams.connectors.IBoundedSource
    public synchronized void boundedFinishedCallBack(ISplit iSplit) {
        this.initReaderMap.put(iSplit.getQueueId(), true);
        logger.info(String.format("current map is %s, key is %s. ", this.initReaderMap, iSplit.getQueueId()));
        if (this.statusCheckerStart.compareAndSet(false, true)) {
            Thread thread = new Thread(new Runnable() { // from class: org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource.1
                @Override // java.lang.Runnable
                public void run() {
                    while (!CycleDynamicMultipleDBScanSource.this.isFinished()) {
                        ThreadUtil.sleep(3000L);
                    }
                    CycleDynamicMultipleDBScanSource.logger.info(String.format("source will be closed.", new Object[0]));
                    CycleDynamicMultipleDBScanSource.this.sendChangeTableNameMessage();
                    ThreadUtil.sleep(1000L);
                    CycleDynamicMultipleDBScanSource.this.finish();
                }
            });
            thread.setName(createKey(this) + "_callback");
            thread.start();
        }
    }

    public CycleSchedule.Cycle getCycle() {
        return this.cycle;
    }

    public void setCycle(CycleSchedule.Cycle cycle) {
        this.cycle = cycle;
    }

    public String createCheckPointName() {
        return super.createCheckPointName();
    }

    public synchronized int getTotalReader() {
        return this.size.get();
    }

    public static String createKey(ISource iSource) {
        AbstractSource abstractSource = (AbstractSource) iSource;
        return MapKeyUtil.createKey(new String[]{abstractSource.getNameSpace(), abstractSource.getGroupName(), abstractSource.getConfigureName(), abstractSource.getTopic(), ((CycleDynamicMultipleDBScanSource) iSource).getCycle().getCycleDateStr()});
    }
}
