package net.dreamlu.mica.redis.config;

import java.time.Duration;
import java.util.Objects;
import net.dreamlu.mica.core.utils.INetUtil;
import net.dreamlu.mica.core.utils.StringUtil;
import net.dreamlu.mica.redis.config.MicaRedisProperties;
import net.dreamlu.mica.redis.stream.DefaultRStreamTemplate;
import net.dreamlu.mica.redis.stream.RStreamListenerDetector;
import net.dreamlu.mica.redis.stream.RStreamTemplate;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.util.ErrorHandler;

@AutoConfiguration
@ConditionalOnProperty(prefix = MicaRedisProperties.Stream.PREFIX, name = {"enable"}, havingValue = "true")
/* loaded from: input_file:net/dreamlu/mica/redis/config/RedisStreamConfiguration.class */
public class RedisStreamConfiguration {
    @ConditionalOnMissingBean
    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, byte[]>> streamMessageListenerContainerOptions(MicaRedisProperties micaRedisProperties, ObjectProvider<ErrorHandler> objectProvider) {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder hashValueSerializer = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().keySerializer(RedisSerializer.string()).hashKeySerializer(RedisSerializer.string()).hashValueSerializer(RedisSerializer.byteArray());
        MicaRedisProperties.Stream stream = micaRedisProperties.getStream();
        Integer pollBatchSize = stream.getPollBatchSize();
        if (pollBatchSize != null && pollBatchSize.intValue() > 0) {
            hashValueSerializer.batchSize(pollBatchSize.intValue());
        }
        Duration pollTimeout = stream.getPollTimeout();
        if (pollTimeout != null && !pollTimeout.isNegative()) {
            hashValueSerializer.pollTimeout(pollTimeout);
        }
        Objects.requireNonNull(hashValueSerializer);
        objectProvider.ifAvailable(hashValueSerializer::errorHandler);
        return hashValueSerializer.build();
    }

    @ConditionalOnMissingBean
    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, byte[]>> streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, byte[]>> streamMessageListenerContainerOptions) {
        return StreamMessageListenerContainer.create(redisConnectionFactory, streamMessageListenerContainerOptions);
    }

    @ConditionalOnMissingBean
    @Bean
    public RStreamListenerDetector streamListenerDetector(StreamMessageListenerContainer<String, MapRecord<String, String, byte[]>> streamMessageListenerContainer, RedisTemplate<String, Object> redisTemplate, ObjectProvider<ServerProperties> objectProvider, MicaRedisProperties micaRedisProperties, Environment environment) {
        MicaRedisProperties.Stream stream = micaRedisProperties.getStream();
        String consumerGroup = stream.getConsumerGroup();
        if (StringUtil.isBlank(consumerGroup)) {
            String requiredProperty = environment.getRequiredProperty("spring.application.name");
            String property = environment.getProperty("spring.profiles.active");
            consumerGroup = StringUtil.isBlank(property) ? requiredProperty : requiredProperty + ':' + property;
        }
        String consumerName = stream.getConsumerName();
        if (StringUtil.isBlank(consumerName)) {
            StringBuilder sb = new StringBuilder(INetUtil.getHostIp());
            objectProvider.ifAvailable(serverProperties -> {
                sb.append(':').append(serverProperties.getPort());
            });
            consumerName = sb.toString();
        }
        return new RStreamListenerDetector(streamMessageListenerContainer, redisTemplate, consumerGroup, consumerName);
    }

    @Bean
    public RStreamTemplate streamTemplate(RedisTemplate<String, Object> redisTemplate) {
        return new DefaultRStreamTemplate(redisTemplate);
    }
}
