package nstream.adapter.redis.stream;

import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.resps.StreamEntry;

/* loaded from: input_file:nstream/adapter/redis/stream/RedisXReadGroupStream.class */
public class RedisXReadGroupStream extends RedisStream {
    private final String groupName;
    private final String consumer;
    private final XReadGroupParams params;

    public RedisXReadGroupStream(JedisPooled jedisPooled, Map<String, StreamEntryID> map, String str, String str2, XReadGroupParams xReadGroupParams) {
        super(jedisPooled, map);
        this.groupName = str;
        this.consumer = str2;
        this.params = xReadGroupParams;
    }

    public RedisXReadGroupStream(JedisPooled jedisPooled, Map<String, StreamEntryID> map, String str, String str2) {
        this(jedisPooled, map, str, str2, XReadGroupParams.xReadGroupParams());
    }

    @Override // nstream.adapter.redis.stream.RedisStream
    protected List<Map.Entry<String, List<StreamEntry>>> read() {
        return this.jedis.xreadGroup(this.groupName, this.consumer, this.params, this.streams);
    }

    @Override // nstream.adapter.redis.stream.RedisStream
    protected void processStream(Map.Entry<String, List<StreamEntry>> entry) {
        String key = entry.getKey();
        if (!entry.getValue().isEmpty() || this.streams.get(key) == StreamEntryID.UNRECEIVED_ENTRY) {
            entry.getValue().forEach(streamEntry -> {
                processStreamEntry(new AbstractMap.SimpleEntry(key, streamEntry));
            });
        } else {
            this.streams.put(key, StreamEntryID.UNRECEIVED_ENTRY);
        }
    }

    private void processStreamEntry(Map.Entry<String, StreamEntry> entry) {
        if (this.streams.get(entry.getKey()) != StreamEntryID.UNRECEIVED_ENTRY) {
            this.streams.put(entry.getKey(), entry.getValue().getID());
        }
        this.ingestor.accept(entry);
        this.jedis.xack(this.consumer, this.groupName, new StreamEntryID[]{entry.getValue().getID()});
    }
}
