package org.apache.rocketmq.streams.common.channel.impl;

import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;

/* loaded from: input_file:org/apache/rocketmq/streams/common/channel/impl/CollectionSource.class */
public class CollectionSource extends AbstractSource implements Serializable {
    private static final Log logger = LogFactory.getLog(CollectionSource.class);
    long maxOffset;
    private static final int checkpointIntervalMs = 10000;
    volatile transient long currentOffset;
    transient ConcurrentLinkedQueue<JSONObject> queue = new ConcurrentLinkedQueue<>();
    transient AtomicLong offset = new AtomicLong(0);
    long lastCheckpointTime = System.currentTimeMillis();
    List<String> elements = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean isInterrupted() {
        return this.currentOffset == this.maxOffset;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized JSONObject consume() {
        while (this.queue.isEmpty()) {
            try {
                logger.info("queue is empty, sleep 50ms.");
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.currentOffset = this.offset.incrementAndGet();
        try {
            Thread.sleep(10L);
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        return this.queue.poll();
    }

    public CollectionSource addAll(JSONObject... jSONObjectArr) {
        this.maxOffset = jSONObjectArr.length;
        if (this.elements == null) {
            this.elements = new ArrayList();
        }
        for (JSONObject jSONObject : jSONObjectArr) {
            this.elements.add(jSONObject.toJSONString());
        }
        return this;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource, org.apache.rocketmq.streams.common.configurable.AbstractConfigurable
    public boolean initConfigurable() {
        this.elements.forEach(str -> {
            this.queue.offer(JSONObject.parseObject(str));
        });
        return super.initConfigurable();
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    protected boolean startSource() {
        new Thread(new Runnable() { // from class: org.apache.rocketmq.streams.common.channel.impl.CollectionSource.1
            @Override // java.lang.Runnable
            public void run() {
                while (!CollectionSource.this.isInterrupted()) {
                    JSONObject consume = CollectionSource.this.consume();
                    boolean z = false;
                    long currentTimeMillis = System.currentTimeMillis();
                    if (currentTimeMillis - CollectionSource.this.lastCheckpointTime > 10000) {
                        System.out.println("start checkping....");
                        z = true;
                        CollectionSource.this.lastCheckpointTime = currentTimeMillis;
                    }
                    CollectionSource.this.doReceiveMessage(consume, z, "1", String.valueOf(CollectionSource.this.currentOffset));
                }
            }
        }).start();
        return true;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean supportNewSplitFind() {
        return false;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean supportRemoveSplitFind() {
        return false;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean supportOffsetRest() {
        return true;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    protected boolean isNotDataSplit(String str) {
        return false;
    }

    public long getMaxOffset() {
        return this.maxOffset;
    }

    public void setMaxOffset(long j) {
        this.maxOffset = j;
    }

    public List<String> getElements() {
        return this.elements;
    }

    public void setElements(List list) {
        this.elements = list;
    }

    public long getLastCheckpointTime() {
        return this.lastCheckpointTime;
    }

    public void setLastCheckpointTime(long j) {
        this.lastCheckpointTime = j;
    }
}
