package org.apache.flink.streaming.connectors.redis;

import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/redis/RedisSink.class */
public class RedisSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
    private String additionalKey;
    private Integer additionalTTL;
    private RedisMapper<IN> redisSinkMapper;
    private RedisCommand redisCommand;
    private FlinkJedisConfigBase flinkJedisConfigBase;
    private RedisCommandsContainer redisCommandsContainer;

    public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisMapper) {
        Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
        Objects.requireNonNull(redisMapper, "Redis Mapper can not be null");
        Objects.requireNonNull(redisMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
        this.flinkJedisConfigBase = flinkJedisConfigBase;
        this.redisSinkMapper = redisMapper;
        RedisCommandDescription commandDescription = redisMapper.getCommandDescription();
        this.redisCommand = commandDescription.getCommand();
        this.additionalTTL = commandDescription.getAdditionalTTL();
        this.additionalKey = commandDescription.getAdditionalKey();
    }

    public void invoke(IN in, SinkFunction.Context context) throws Exception {
        String keyFromData = this.redisSinkMapper.getKeyFromData(in);
        String valueFromData = this.redisSinkMapper.getValueFromData(in);
        Optional<String> additionalKey = this.redisSinkMapper.getAdditionalKey(in);
        Optional<Integer> additionalTTL = this.redisSinkMapper.getAdditionalTTL(in);
        switch (this.redisCommand) {
            case RPUSH:
                this.redisCommandsContainer.rpush(keyFromData, valueFromData);
                return;
            case LPUSH:
                this.redisCommandsContainer.lpush(keyFromData, valueFromData);
                return;
            case SADD:
                this.redisCommandsContainer.sadd(keyFromData, valueFromData);
                return;
            case SET:
                this.redisCommandsContainer.set(keyFromData, valueFromData);
                return;
            case SETEX:
                this.redisCommandsContainer.setex(keyFromData, valueFromData, additionalTTL.orElse(this.additionalTTL));
                return;
            case PFADD:
                this.redisCommandsContainer.pfadd(keyFromData, valueFromData);
                return;
            case PUBLISH:
                this.redisCommandsContainer.publish(keyFromData, valueFromData);
                return;
            case ZADD:
                this.redisCommandsContainer.zadd(additionalKey.orElse(this.additionalKey), valueFromData, keyFromData);
                return;
            case ZINCRBY:
                this.redisCommandsContainer.zincrBy(additionalKey.orElse(this.additionalKey), valueFromData, keyFromData);
                return;
            case ZREM:
                this.redisCommandsContainer.zrem(additionalKey.orElse(this.additionalKey), keyFromData);
                return;
            case HSET:
                this.redisCommandsContainer.hset(additionalKey.orElse(this.additionalKey), keyFromData, valueFromData, additionalTTL.orElse(this.additionalTTL));
                return;
            case HINCRBY:
                this.redisCommandsContainer.hincrBy(additionalKey.orElse(this.additionalKey), keyFromData, Long.valueOf(valueFromData), additionalTTL.orElse(this.additionalTTL));
                return;
            case INCRBY:
                this.redisCommandsContainer.incrBy(keyFromData, Long.valueOf(valueFromData));
                return;
            case INCRBY_EX:
                this.redisCommandsContainer.incrByEx(keyFromData, Long.valueOf(valueFromData), additionalTTL.orElse(this.additionalTTL));
                return;
            case DECRBY:
                this.redisCommandsContainer.decrBy(keyFromData, Long.valueOf(valueFromData));
                return;
            case DESCRBY_EX:
                this.redisCommandsContainer.decrByEx(keyFromData, Long.valueOf(valueFromData), additionalTTL.orElse(this.additionalTTL));
                return;
            default:
                throw new IllegalArgumentException("Cannot process such data type: " + this.redisCommand);
        }
    }

    public void open(Configuration configuration) throws Exception {
        try {
            this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
            this.redisCommandsContainer.open();
        } catch (Exception e) {
            LOG.error("Redis has not been properly initialized: ", e);
            throw e;
        }
    }

    public void close() throws IOException {
        if (this.redisCommandsContainer != null) {
            this.redisCommandsContainer.close();
        }
    }
}
