package nstream.adapter.redis;

import java.util.Map;
import nstream.adapter.common.ext.RedisIngressSettings;
import nstream.adapter.common.ingress.IngestPanicException;
import nstream.adapter.common.ingress.IngestorMetricsAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import nstream.adapter.redis.stream.RedisStream;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.resps.StreamEntry;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/redis/RedisStreamIngestingAgent.class */
public abstract class RedisStreamIngestingAgent extends IngestorMetricsAgent<RedisIngressSettings, Map.Entry<String, StreamEntry>> {
    protected RedisStream redisStream;

    protected void start() {
        execute(() -> {
            if (this.redisStream == null) {
                throw new IngestPanicException("No Redis stream configured to start.");
            }
            this.redisStream.start(obj -> {
                this.ingestOrContinue(obj);
            });
        });
    }

    protected void cancel() {
        if (this.redisStream != null) {
            this.redisStream.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public RedisIngressSettings m6parseIngressSettings(Value value) {
        return RedisAdapterUtils.evaluateIngressSettings(this.labeledLog, value);
    }

    protected void stageReception() {
        loadSettings("redisIngressConf");
        this.redisStream = RedisStream.builder().withJedis((JedisPooled) ProvisionLoader.getProvision(this.ingressSettings.poolProvisionName()).value()).withStreams(RedisAdapterUtils.parseStreams(this.ingressSettings.streams())).withGroupName(this.ingressSettings.groupName()).withConsumer(this.ingressSettings.consumer()).withBlock(this.ingressSettings.block()).withCount(this.ingressSettings.count()).build();
        start();
    }
}
