package org.apache.rocketmq.streams.common.checkpoint;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.context.MessageOffset;

/* loaded from: input_file:org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.class */
public abstract class AbstractCheckPointStorage implements ICheckPointStorage {
    static final Log logger = LogFactory.getLog(AbstractCheckPointStorage.class);
    protected transient IMessageCache<CheckPointMessage> messageCache = new MessageCache(new IMessageFlushCallBack<CheckPointMessage>() { // from class: org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage.1
        @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack
        public boolean flushMessage(List<CheckPointMessage> list) {
            Map<String, SourceState> mergeSourceState = AbstractCheckPointStorage.this.mergeSourceState(list);
            AbstractCheckPointStorage.logger.info(String.format("flushMessage raw size %d, merge size %d", Integer.valueOf(list.size()), Integer.valueOf(mergeSourceState.size())));
            AbstractCheckPointStorage.logger.info("flushMessage : " + list.get(0).getCheckPointStates().get(0).getQueueIdAndOffset().toString());
            AbstractCheckPointStorage.this.saveCheckPoint(mergeSourceState);
            return true;
        }
    });

    public AbstractCheckPointStorage() {
        ((MessageCache) this.messageCache).setAutoFlushSize(50);
        ((MessageCache) this.messageCache).setAutoFlushTimeGap(10000);
        this.messageCache.openAutoFlush();
    }

    @Override // org.apache.rocketmq.streams.common.checkpoint.ICheckPointStorage
    public void flush() {
        this.messageCache.flush();
    }

    protected Map<String, SourceState> mergeSourceState(List<CheckPointMessage> list) {
        HashMap hashMap = new HashMap();
        Iterator<CheckPointMessage> it = list.iterator();
        while (it.hasNext()) {
            SourceState createSourceState = createSourceState(it.next());
            if (createSourceState != null) {
                String sourceName = createSourceState.getSourceName();
                SourceState sourceState = (SourceState) hashMap.get(sourceName);
                SourceState sourceState2 = createSourceState;
                if (sourceState != null) {
                    sourceState2 = merge(createSourceState, sourceState);
                }
                hashMap.put(sourceName, sourceState2);
            }
        }
        return hashMap;
    }

    protected SourceState merge(SourceState sourceState, SourceState sourceState2) {
        for (Map.Entry<String, MessageOffset> entry : sourceState.getQueueId2Offsets().entrySet()) {
            String key = entry.getKey();
            MessageOffset value = entry.getValue();
            MessageOffset messageOffset = sourceState2.getQueueId2Offsets().get(key);
            if (messageOffset == null) {
                sourceState2.getQueueId2Offsets().put(key, value);
            } else if (value.greateThan(messageOffset.getOffsetStr())) {
                sourceState2.getQueueId2Offsets().put(key, value);
            }
        }
        return sourceState2;
    }

    protected SourceState createSourceState(CheckPointMessage checkPointMessage) {
        SourceState sourceState = new SourceState();
        String pipelineName = checkPointMessage.getPipelineName();
        HashMap hashMap = new HashMap();
        sourceState.setSourceName(CheckPointManager.createSourceName(checkPointMessage.getSource(), pipelineName));
        sourceState.setQueueId2Offsets(hashMap);
        for (CheckPointState checkPointState : checkPointMessage.getCheckPointStates()) {
            if (!checkPointState.isReplyAnyOny()) {
                if (checkPointState.isReplyRefuse()) {
                    return null;
                }
                for (Map.Entry<String, MessageOffset> entry : checkPointState.getQueueIdAndOffset().entrySet()) {
                    String key = entry.getKey();
                    MessageOffset value = entry.getValue();
                    MessageOffset messageOffset = hashMap.get(key);
                    if (messageOffset == null) {
                        hashMap.put(key, value);
                    } else if (messageOffset.greateThan(value.getOffsetStr())) {
                        hashMap.put(key, value);
                    } else {
                        hashMap.put(key, messageOffset);
                    }
                }
            }
        }
        return sourceState;
    }

    protected void saveCheckPoint(Map<String, SourceState> map) {
        ArrayList arrayList = new ArrayList();
        for (SourceState sourceState : map.values()) {
            for (Map.Entry<String, MessageOffset> entry : sourceState.getQueueId2Offsets().entrySet()) {
                CheckPoint checkPoint = new CheckPoint();
                checkPoint.setSourceName(sourceState.getSourceName());
                checkPoint.setQueueId(entry.getKey());
                checkPoint.setData(entry.getValue().getMainOffset());
                checkPoint.setGmtCreate(new Date());
                checkPoint.setGmtModified(new Date());
                arrayList.add(checkPoint.toSnapShot());
            }
        }
        save(arrayList);
    }

    @Override // org.apache.rocketmq.streams.common.checkpoint.ICheckPointStorage
    public void addCheckPointMessage(CheckPointMessage checkPointMessage) {
        Iterator<CheckPointState> it = checkPointMessage.getCheckPointStates().iterator();
        while (it.hasNext()) {
            logger.debug(String.format("addCheckPointMessage states %s", it.next().getQueueIdAndOffset().toString()));
        }
        this.messageCache.addCache(checkPointMessage);
    }

    @Override // org.apache.rocketmq.streams.common.checkpoint.ICheckPointStorage
    public void finish() {
        this.messageCache.closeAutoFlush();
    }
}
