package org.apache.rocketmq.streams.common.channel.impl.memory;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.streams.common.channel.AbstractChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.context.IMessage;

/* loaded from: input_file:org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.class */
public class MemoryChannel extends AbstractChannel {
    protected volatile transient boolean startQPSCount = false;
    protected transient AtomicLong COUNT = new AtomicLong(0);
    protected transient long firstReceiveTime = System.currentTimeMillis();

    public void setStartQPSCount(boolean z) {
        this.startQPSCount = z;
    }

    @Override // org.apache.rocketmq.streams.common.channel.AbstractChannel
    protected ISink createSink() {
        return new AbstractSink() { // from class: org.apache.rocketmq.streams.common.channel.impl.memory.MemoryChannel.1
            @Override // org.apache.rocketmq.streams.common.channel.sink.AbstractSink
            protected boolean batchInsert(List<IMessage> list) {
                if (MemoryChannel.this.startQPSCount) {
                    System.out.println("qps is " + (r0 / r0) + "。the count is " + MemoryChannel.this.COUNT.addAndGet(list.size()) + ".the process time is " + ((System.currentTimeMillis() - MemoryChannel.this.firstReceiveTime) / 1000));
                }
                Iterator<IMessage> it = list.iterator();
                while (it.hasNext()) {
                    ((AbstractUnreliableSource) MemoryChannel.this.source).doUnreliableReceiveMessage(it.next().getMessageValue());
                }
                return true;
            }
        };
    }

    @Override // org.apache.rocketmq.streams.common.channel.AbstractChannel
    protected ISource createSource() {
        return new AbstractUnreliableSource() { // from class: org.apache.rocketmq.streams.common.channel.impl.memory.MemoryChannel.2
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource, org.apache.rocketmq.streams.common.channel.source.AbstractSource
            public boolean startSource() {
                return super.startSource();
            }
        };
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.ISource
    public String createCheckPointName() {
        return "memory-source";
    }
}
