package org.apache.streampipes.sinks.databases.jvm.redis;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;

/* loaded from: input_file:org/apache/streampipes/sinks/databases/jvm/redis/Redis.class */
public class Redis implements EventSink<RedisParameters> {
    private static final String EVENT_PREFIX = "sp:event:";
    private static final String EVENT_COUNT = "sp:events";
    private static JedisPool jedisPool = null;
    private String primaryKey;
    private Boolean autoIncrement;
    private String password;
    private String clientName;
    private Integer index;
    private Integer ttl;

    public void onInvocation(RedisParameters redisParameters, EventSinkRuntimeContext eventSinkRuntimeContext) {
        if (jedisPool == null) {
            initialPool(redisParameters);
        }
        this.primaryKey = redisParameters.getPrimaryKey();
        this.autoIncrement = redisParameters.isAutoIncrement();
        this.password = redisParameters.getRedisPassword();
        this.clientName = redisParameters.getRedisClient();
        this.index = redisParameters.getRedisIndex();
        this.ttl = redisParameters.getTTL();
    }

    public void onEvent(Event event) throws SpRuntimeException {
        try {
            Jedis jedis = getJedis();
            try {
                String eventKey = getEventKey(event, Long.valueOf(this.autoIncrement.booleanValue() ? jedis.incr(EVENT_COUNT) : 0L));
                jedis.set(eventKey, getEventValue(event));
                if (this.ttl.intValue() > -1) {
                    jedis.expire(eventKey, this.ttl.intValue());
                }
                if (jedis != null) {
                    jedis.close();
                }
            } finally {
            }
        } catch (SpRuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new SpRuntimeException("Could not persist event to redis", e2);
        }
    }

    public void onDetach() {
        if (jedisPool != null && !jedisPool.isClosed()) {
            jedisPool.close();
        }
        jedisPool = null;
    }

    private void initialPool(RedisParameters redisParameters) {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(redisParameters.getRedisPoolMaxActive().intValue());
        jedisPoolConfig.setMaxIdle(redisParameters.getRedisPoolMaxIdle().intValue());
        jedisPoolConfig.setMaxWaitMillis(redisParameters.getRedisPoolMaxWait().intValue());
        jedisPoolConfig.setTestWhileIdle(false);
        jedisPoolConfig.setTestOnBorrow(false);
        jedisPoolConfig.setTestOnReturn(false);
        jedisPool = new JedisPool(jedisPoolConfig, redisParameters.getRedisHost(), redisParameters.getRedisPort().intValue(), redisParameters.getRedisPoolTimeout().intValue());
    }

    private Jedis getJedis() throws SpRuntimeException {
        Jedis resource = jedisPool.getResource();
        try {
            resource.connect();
            if (StringUtils.isNotBlank(this.password)) {
                resource.auth(this.password);
            }
            if (StringUtils.isNotBlank(this.clientName)) {
                resource.clientSetname(this.clientName);
            }
            if (this.index.intValue() > -1) {
                resource.select(this.index.intValue());
            }
            return resource;
        } catch (JedisException e) {
            resource.close();
            throw new SpRuntimeException("Could not connect to redis", e);
        }
    }

    private String getEventKey(Event event, Long l) {
        return this.autoIncrement.booleanValue() ? "sp:event:" + l : "sp:event:" + event.getFieldBySelector(this.primaryKey).getAsPrimitive().getAsString();
    }

    private String getEventValue(Event event) throws SpRuntimeException {
        try {
            return new ObjectMapper().writeValueAsString(event.getRaw());
        } catch (JsonProcessingException e) {
            throw new SpRuntimeException("Could not convert event to JSON", e);
        }
    }
}
