package nstream.adapter.redis.stream;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;

/* loaded from: input_file:nstream/adapter/redis/stream/RedisStream.class */
public abstract class RedisStream {
    protected final JedisPooled jedis;
    protected final Map<String, StreamEntryID> streams;
    protected Consumer<Map.Entry<String, StreamEntry>> ingestor;
    private volatile boolean running = false;

    /* loaded from: input_file:nstream/adapter/redis/stream/RedisStream$Builder.class */
    public static final class Builder {
        private JedisPooled jedis;
        private Map<String, StreamEntryID> streams = new HashMap();
        private String groupName;
        private String consumer;
        private Integer block;
        private Integer count;

        public Builder withJedis(JedisPooled jedisPooled) {
            this.jedis = jedisPooled;
            return this;
        }

        public Builder withStreams(Map<String, StreamEntryID> map) {
            this.streams = map;
            return this;
        }

        public Builder withGroupName(String str) {
            this.groupName = str;
            return this;
        }

        public Builder withConsumer(String str) {
            this.consumer = str;
            return this;
        }

        public Builder withBlock(Integer num) {
            this.block = num;
            return this;
        }

        public Builder withCount(Integer num) {
            this.count = num;
            return this;
        }

        public RedisStream build() {
            return (this.groupName == null && this.consumer == null) ? buildXReadStream() : buildXReadGroupStream();
        }

        private RedisXReadGroupStream buildXReadGroupStream() {
            XReadGroupParams xReadGroupParams = XReadGroupParams.xReadGroupParams();
            if (this.block != null) {
                xReadGroupParams.block(this.block.intValue());
            }
            if (this.count != null) {
                xReadGroupParams.count(this.count.intValue());
            }
            return new RedisXReadGroupStream(this.jedis, this.streams, this.groupName, this.consumer, xReadGroupParams);
        }

        private RedisXReadStream buildXReadStream() {
            XReadParams xReadParams = XReadParams.xReadParams();
            if (this.block != null) {
                xReadParams.block(this.block.intValue());
            }
            if (this.count != null) {
                xReadParams.count(this.count.intValue());
            }
            return new RedisXReadStream(this.jedis, this.streams, xReadParams);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisStream(JedisPooled jedisPooled, Map<String, StreamEntryID> map) {
        this.jedis = jedisPooled;
        this.streams = map;
    }

    public void start(Consumer<Map.Entry<String, StreamEntry>> consumer) {
        this.running = true;
        this.ingestor = consumer;
        readLoop();
    }

    public void stop() {
        this.running = false;
    }

    protected void readLoop() {
        while (this.running) {
            processRead(read());
        }
    }

    protected abstract List<Map.Entry<String, List<StreamEntry>>> read();

    private void processRead(List<Map.Entry<String, List<StreamEntry>>> list) {
        if (list != null) {
            list.forEach(this::processStream);
        }
    }

    protected abstract void processStream(Map.Entry<String, List<StreamEntry>> entry);

    public static Builder builder() {
        return new Builder();
    }
}
