package org.apache.rocketmq.streams.common.channel.source;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.Context;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.MessageHeader;
import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
import org.apache.rocketmq.streams.common.interfaces.ILifeCycle;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/common/channel/source/AbstractSource.class */
public abstract class AbstractSource extends BasedConfigurable implements ISource<AbstractSource>, ILifeCycle {
    public static String CHARSET = "UTF-8";

    @ENVDependence
    protected String groupName;
    protected List<String> logFingerprintFields;
    protected String fieldDelimiter;
    protected MetaData metaData;
    protected List<String> headerFieldNames;
    protected volatile transient IStreamOperator receiver;
    protected Boolean isJsonData = true;
    protected Boolean msgIsJsonArray = false;
    protected int maxThread = Runtime.getRuntime().availableProcessors();

    @ENVDependence
    protected String topic = "";
    protected long checkpointTime = 120000;
    protected boolean isBatchMessage = false;
    protected int maxFetchLogGroupSize = 100;
    protected String encoding = CHARSET;
    protected volatile transient Boolean openMock = false;
    protected transient AtomicBoolean hasStart = new AtomicBoolean(false);
    protected transient CheckPointManager checkPointManager = new CheckPointManager();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.streams.common.configurable.AbstractConfigurable
    public boolean initConfigurable() {
        this.hasStart = new AtomicBoolean(false);
        this.openMock = false;
        return super.initConfigurable();
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public boolean start(IStreamOperator iStreamOperator) {
        this.receiver = iStreamOperator;
        boolean z = true;
        if (this.hasStart.compareAndSet(false, true)) {
            z = startSource();
        }
        return z;
    }

    protected abstract boolean startSource();

    public AbstractSource() {
        setType(ISource.TYPE);
    }

    public AbstractContext doReceiveMessage(JSONObject jSONObject, boolean z, String str, String str2) {
        return executeMessage(createMessage(jSONObject, str, str2, z));
    }

    public AbstractContext doReceiveMessage(String str, boolean z, String str2, String str3) {
        if (!this.msgIsJsonArray.booleanValue()) {
            return doReceiveMessage(create(str), z, str2, str3);
        }
        JSONArray parseArray = JSONObject.parseArray(str);
        if (parseArray == null || parseArray.size() == 0) {
            return null;
        }
        AbstractContext abstractContext = null;
        for (int i = 0; i < parseArray.size(); i++) {
            JSONObject jSONObject = parseArray.getJSONObject(i);
            boolean z2 = false;
            if (z && i == parseArray.size() - 1) {
                z2 = true;
            }
            abstractContext = doReceiveMessage(jSONObject, z2, str2, createBatchOffset(str3, i));
            if (!abstractContext.isContinue()) {
            }
        }
        return abstractContext;
    }

    public void sendCheckpoint(String str) {
        HashSet hashSet = new HashSet();
        hashSet.add(str);
        sendCheckpoint(hashSet);
    }

    public void sendCheckpoint(Set<String> set) {
        Message createMessage = createMessage(new JSONObject(), null, null, true);
        createMessage.getMessageBody().put("_queues", set);
        createMessage.getHeader().setCheckpointQueueIds(set);
        createMessage.getHeader().setNeedFlush(true);
        createMessage.getHeader().setSystemMessage(true);
        if (supportOffsetRest()) {
            createMessage.getHeader().setNeedFlush(false);
        }
        CheckPointMessage checkPointMessage = new CheckPointMessage();
        checkPointMessage.setStreamOperator(this.receiver);
        checkPointMessage.setSource(this);
        createMessage.setSystemMessage(checkPointMessage);
        executeMessage(createMessage);
        if (checkPointMessage.isValidate() && supportOffsetRest()) {
            saveCheckpoint(checkPointMessage);
        }
    }

    protected void saveCheckpoint(CheckPointMessage checkPointMessage) {
        this.checkPointManager.addCheckPointMessage(checkPointMessage);
    }

    public JSONObject createJson(Object obj) {
        JSONObject parseObject;
        if (this.isJsonData.booleanValue()) {
            parseObject = Message.parseObject(obj.toString());
        } else {
            parseObject = new UserDefinedMessage(obj);
            parseObject.put(IMessage.DATA_KEY, obj);
            parseObject.put(IMessage.IS_NOT_JSON_MESSAGE, true);
        }
        return parseObject;
    }

    public JSONObject create(byte[] bArr, Map<String, ?> map) {
        try {
            return create(new String(bArr, getEncoding()), map);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            throw new RuntimeException("msg encode error ");
        }
    }

    public JSONObject create(String str, Map<String, ?> map) {
        JSONObject create = create(str);
        if (this.headerFieldNames != null && map != null) {
            for (String str2 : this.headerFieldNames) {
                create.put(str2, map.get(str2));
            }
        }
        return create;
    }

    public JSONObject create(String str) {
        if (this.isJsonData.booleanValue()) {
            return createJson(str);
        }
        if (this.metaData != null) {
            JSONObject jSONObject = new JSONObject();
            if (this.fieldDelimiter != null) {
                String[] split = str.split(this.fieldDelimiter);
                List metaDataFields = this.metaData.getMetaDataFields();
                if (split.length != this.metaData.getMetaDataFields().size()) {
                    throw new RuntimeException("expect table column's count equals data size (" + metaDataFields.size() + "," + split.length + ")");
                }
                for (int i = 0; i < split.length; i++) {
                    MetaDataField metaDataField = (MetaDataField) metaDataFields.get(i);
                    jSONObject.put(metaDataField.getFieldName(), metaDataField.getDataType().getData(split[i]));
                }
                return jSONObject;
            }
            MetaDataField metaDataField2 = null;
            Iterator it = this.metaData.getMetaDataFields().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                MetaDataField metaDataField3 = (MetaDataField) it.next();
                if (this.headerFieldNames == null) {
                    metaDataField2 = metaDataField3;
                    break;
                }
                if (!this.headerFieldNames.contains(metaDataField3.getFieldName())) {
                    metaDataField2 = metaDataField3;
                    break;
                }
            }
            if (metaDataField2 != null) {
                jSONObject.put(metaDataField2.getFieldName(), str);
                return jSONObject;
            }
        } else if (this.fieldDelimiter != null) {
            String[] split2 = str.split(this.fieldDelimiter);
            ArrayList arrayList = new ArrayList();
            for (String str2 : split2) {
                arrayList.add(str2);
            }
            return createJson(arrayList);
        }
        return createJson(str);
    }

    public AbstractContext executeMessage(Message message) {
        if (BatchFinishMessage.isBatchFinishMessage(message)) {
            message.getHeader().setSystemMessage(true);
            message.setSystemMessage(new BatchFinishMessage(message));
        }
        Context context = new Context(message);
        if (isSplitInRemoving(message)) {
            return context;
        }
        if (!message.getHeader().isSystemMessage()) {
            messageQueueChangedCheck(message.getHeader());
        }
        boolean z = !message.getHeader().isSystemMessage() && message.getHeader().isNeedFlush();
        if (this.receiver != null) {
            this.receiver.doMessage(message, context);
        }
        if (z) {
            sendCheckpoint(message.getHeader().getQueueId());
        }
        executeMessageAfterReceiver(message, context);
        return context;
    }

    protected boolean isSplitInRemoving(Message message) {
        return this.checkPointManager.isRemovingSplit(message.getHeader().getQueueId());
    }

    public boolean supportNewSplitFind() {
        return false;
    }

    public abstract boolean supportRemoveSplitFind();

    @Deprecated
    public boolean supportOffsetRest() {
        return false;
    }

    protected void messageQueueChangedCheck(MessageHeader messageHeader) {
        if (supportNewSplitFind() && supportRemoveSplitFind()) {
            return;
        }
        HashSet<String> hashSet = new HashSet();
        String queueId = messageHeader.getQueueId();
        if (StringUtil.isNotEmpty(queueId)) {
            hashSet.add(queueId);
        }
        Set<String> checkpointQueueIds = messageHeader.getCheckpointQueueIds();
        if (checkpointQueueIds != null) {
            hashSet.addAll(checkpointQueueIds);
        }
        HashSet hashSet2 = new HashSet();
        new HashSet();
        for (String str : hashSet) {
            if (!isNotDataSplit(str) && StringUtil.isNotEmpty(str)) {
                if (this.checkPointManager.contains(str)) {
                    this.checkPointManager.updateLastUpdate(str);
                } else {
                    synchronized (this) {
                        if (!this.checkPointManager.contains(str)) {
                            this.checkPointManager.addSplit(str);
                            hashSet2.add(str);
                        }
                    }
                }
            }
        }
        if (supportNewSplitFind()) {
            return;
        }
        addNewSplit(hashSet2);
    }

    protected abstract boolean isNotDataSplit(String str);

    public void removeSplit(Set<String> set) {
        if (set == null || set.size() == 0) {
            return;
        }
        this.checkPointManager.addRemovingSplit(set);
        sendRemoveSplitSystemMessage(set);
        sendCheckpoint(set);
        this.checkPointManager.flush();
        synchronized (this) {
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                this.checkPointManager.removeSplit(it.next());
            }
        }
    }

    public List<ISplit> getAllSplits() {
        return null;
    }

    public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
        return new HashMap();
    }

    public void addNewSplit(Set<String> set) {
        if (set == null || set.size() == 0) {
            return;
        }
        this.checkPointManager.deleteRemovingSplit(set);
        Message createMessage = createMessage(new JSONObject(), null, null, false);
        createMessage.getMessageBody().put("_queues", set);
        createMessage.getHeader().setNeedFlush(false);
        createMessage.getHeader().setSystemMessage(true);
        NewSplitMessage newSplitMessage = new NewSplitMessage(set, this.checkPointManager.getCurrentSplits());
        newSplitMessage.setStreamOperator(this.receiver);
        newSplitMessage.setSource(this);
        createMessage.setSystemMessage(newSplitMessage);
        executeMessage(createMessage);
    }

    public void sendRemoveSplitSystemMessage(Set<String> set) {
        Message createMessage = createMessage(new JSONObject(), null, null, true);
        createMessage.getMessageBody().put("_queues", set);
        createMessage.getHeader().setNeedFlush(true);
        createMessage.getHeader().setSystemMessage(true);
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.checkPointManager.getCurrentSplits());
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            hashSet.remove(it.next());
        }
        RemoveSplitMessage removeSplitMessage = new RemoveSplitMessage(set, hashSet);
        removeSplitMessage.setStreamOperator(this.receiver);
        removeSplitMessage.setSource(this);
        createMessage.setSystemMessage(removeSplitMessage);
        executeMessage(createMessage);
    }

    protected void executeMessageAfterReceiver(Message message, AbstractContext abstractContext) {
        if (message.getHeader() == null || message.getHeader().getProgress() == null) {
            return;
        }
        JSONObject currentMsg = message.getHeader().getProgress().getCurrentMsg();
        Iterator it = currentMsg.entrySet().iterator();
        JSONObject jSONObject = new JSONObject();
        jSONObject.putAll(currentMsg);
        while (it.hasNext()) {
            String str = (String) ((Map.Entry) it.next()).getKey();
            if (message.getMessageBody().containsKey(str)) {
                jSONObject.put(str, message.getMessageBody().get(str));
            }
        }
        message.getHeader().getProgress().setCurrentMessage(jSONObject.toJSONString());
        message.getHeader().getProgress().update();
    }

    public Message createMessage(JSONObject jSONObject, String str, String str2, boolean z) {
        Message message = new Message(jSONObject);
        message.getHeader().setSource(this);
        message.getHeader().setOffset(str2);
        message.getHeader().setQueueId(str);
        message.getHeader().setNeedFlush(z);
        message.setJsonMessage(this.isJsonData.booleanValue());
        return message;
    }

    private String createBatchOffset(String str, int i) {
        String str2 = "" + i;
        for (int length = str2.length(); length < 5; length++) {
            str2 = "0" + str2;
        }
        return str + str2;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public void setMaxFetchLogGroupSize(int i) {
        this.maxFetchLogGroupSize = i;
    }

    @Override // org.apache.rocketmq.streams.common.topology.builder.IStageBuilder
    /* renamed from: createStageChain */
    public AbstractSource createStageChain2(PipelineBuilder pipelineBuilder) {
        return this;
    }

    public void addConfigurables(PipelineBuilder pipelineBuilder) {
        pipelineBuilder.addConfigurables(this);
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public String getGroupName() {
        return this.groupName;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public void setGroupName(String str) {
        this.groupName = str;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public int getMaxThread() {
        return this.maxThread;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public void setMaxThread(int i) {
        this.maxThread = i;
    }

    public IStreamOperator getReceiver() {
        return this.receiver;
    }

    public void setReceiver(IStreamOperator iStreamOperator) {
        this.receiver = iStreamOperator;
    }

    public Boolean getJsonData() {
        return this.isJsonData;
    }

    public void setJsonData(Boolean bool) {
        this.isJsonData = bool;
    }

    public Boolean getMsgIsJsonArray() {
        return this.msgIsJsonArray;
    }

    public void setMsgIsJsonArray(Boolean bool) {
        this.msgIsJsonArray = bool;
    }

    public void setBatchMessage(boolean z) {
        this.isBatchMessage = z;
    }

    public int getMaxFetchLogGroupSize() {
        return this.maxFetchLogGroupSize;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public String getTopic() {
        return this.topic;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public void setTopic(String str) {
        this.topic = str;
    }

    public void setCheckpointTime(long j) {
        this.checkpointTime = j;
    }

    public List<String> getLogFingerprintFields() {
        return this.logFingerprintFields;
    }

    public void setLogFingerprintFields(List<String> list) {
        this.logFingerprintFields = list;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public long getCheckpointTime() {
        return this.checkpointTime;
    }

    public boolean isBatchMessage() {
        return this.isBatchMessage;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public String createCheckPointName() {
        String nameSpace = getNameSpace();
        String configureName = getConfigureName();
        String groupName = getGroupName();
        if (StringUtil.isEmpty(nameSpace)) {
            nameSpace = "default_namespace";
        }
        if (StringUtil.isEmpty(configureName)) {
            configureName = "default_name";
        }
        if (StringUtil.isEmpty(groupName)) {
            groupName = "default_groupName";
        }
        String topic = getTopic();
        if (topic == null || topic.trim().length() == 0) {
            topic = "default_topic";
        }
        return MapKeyUtil.createKey(nameSpace, groupName, topic, configureName);
    }

    @Override // org.apache.rocketmq.streams.common.interfaces.ILifeCycle
    public boolean isFinished() {
        return false;
    }

    @Override // org.apache.rocketmq.streams.common.interfaces.ILifeCycle
    public void finish() {
        this.checkPointManager.finish();
    }

    public String getEncoding() {
        return this.encoding;
    }

    public void setEncoding(String str) {
        this.encoding = str;
    }

    public String getFieldDelimiter() {
        return this.fieldDelimiter;
    }

    public void setFieldDelimiter(String str) {
        this.fieldDelimiter = str;
    }

    public MetaData getMetaData() {
        return this.metaData;
    }

    public void setMetaData(MetaData metaData) {
        this.metaData = metaData;
    }

    public List<String> getHeaderFieldNames() {
        return this.headerFieldNames;
    }

    public void setHeaderFieldNames(List<String> list) {
        this.headerFieldNames = list;
    }
}
