package net.dreamlu.mica.redis.stream;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import net.dreamlu.mica.core.utils.ReflectUtil;
import net.dreamlu.mica.core.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:net/dreamlu/mica/redis/stream/RStreamListenerDetector.class */
public class RStreamListenerDetector implements BeanPostProcessor, InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(RStreamListenerDetector.class);
    private final StreamMessageListenerContainer<String, MapRecord<String, String, byte[]>> streamMessageListenerContainer;
    private final RedisTemplate<String, Object> redisTemplate;
    private final String consumerGroup;
    private final String consumerName;

    public RStreamListenerDetector(StreamMessageListenerContainer<String, MapRecord<String, String, byte[]>> streamMessageListenerContainer, RedisTemplate<String, Object> redisTemplate, String str, String str2) {
        this.streamMessageListenerContainer = streamMessageListenerContainer;
        this.redisTemplate = redisTemplate;
        this.consumerGroup = str;
        this.consumerName = str2;
    }

    public Object postProcessAfterInitialization(Object obj, String str) throws BeansException {
        ReflectionUtils.doWithMethods(ClassUtils.getUserClass(obj), method -> {
            RStreamListener rStreamListener = (RStreamListener) AnnotationUtils.findAnnotation(method, RStreamListener.class);
            if (rStreamListener != null) {
                String name = rStreamListener.name();
                Assert.hasText(name, "@RStreamListener name must not be empty.");
                log.info("Found @RStreamListener on bean:{} method:{}", str, method);
                if (method.getParameterCount() > 1) {
                    throw new IllegalArgumentException("@RStreamListener on method " + method + " parameter count must less or equal to 1.");
                }
                ReadOffset readOffset = rStreamListener.offsetModel().getReadOffset();
                StreamOffset<String> create = StreamOffset.create(name, readOffset);
                if (MessageModel.BROADCASTING == rStreamListener.messageModel()) {
                    broadCast(create, obj, method, rStreamListener.readRawBytes());
                    return;
                }
                String group = StringUtil.isNotBlank(rStreamListener.group()) ? rStreamListener.group() : this.consumerGroup;
                Consumer from = Consumer.from(group, this.consumerName);
                createGroupIfNeed(this.redisTemplate, name, readOffset, group);
                cluster(from, create, rStreamListener, obj, method);
            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        return obj;
    }

    private void broadCast(StreamOffset<String> streamOffset, Object obj, Method method, boolean z) {
        this.streamMessageListenerContainer.receive(streamOffset, mapRecord -> {
            invokeMethod(obj, method, mapRecord, z);
        });
    }

    private void cluster(Consumer consumer, StreamOffset<String> streamOffset, RStreamListener rStreamListener, Object obj, Method method) {
        boolean autoAcknowledge = rStreamListener.autoAcknowledge();
        StreamMessageListenerContainer.ConsumerStreamReadRequest build = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(autoAcknowledge).build();
        StreamOperations opsForStream = this.redisTemplate.opsForStream();
        this.streamMessageListenerContainer.register(build, mapRecord -> {
            invokeMethod(obj, method, mapRecord, rStreamListener.readRawBytes());
            if (autoAcknowledge) {
                return;
            }
            opsForStream.acknowledge(consumer.getGroup(), mapRecord);
        });
    }

    private static void createGroupIfNeed(RedisTemplate<String, Object> redisTemplate, String str, ReadOffset readOffset, String str2) {
        StreamOperations opsForStream = redisTemplate.opsForStream();
        try {
            if (opsForStream.groups(str).stream().noneMatch(xInfoGroup -> {
                return str2.equals(xInfoGroup.groupName());
            })) {
                opsForStream.createGroup(str, readOffset, str2);
            }
        } catch (RedisSystemException e) {
            opsForStream.createGroup(str, str2);
        }
    }

    private void invokeMethod(Object obj, Method method, MapRecord<String, String, byte[]> mapRecord, boolean z) {
        if (method.getParameterCount() == 0) {
            ReflectUtil.invokeMethod(method, obj);
        } else if (z) {
            ReflectUtil.invokeMethod(method, obj, new Object[]{mapRecord});
        } else {
            ReflectUtil.invokeMethod(method, obj, new Object[]{getRecordValue(mapRecord)});
        }
    }

    private Object getRecordValue(MapRecord<String, String, byte[]> mapRecord) {
        Map map = (Map) mapRecord.getValue();
        if (!map.containsKey(RStreamTemplate.OBJECT_PAYLOAD_KEY)) {
            return mapRecord.mapEntries(entry -> {
                return (Map.Entry) Collections.singletonMap((String) entry.getKey(), this.redisTemplate.getValueSerializer().deserialize((byte[]) entry.getValue())).entrySet().iterator().next();
            });
        }
        return ObjectRecord.create((String) mapRecord.getStream(), this.redisTemplate.getValueSerializer().deserialize((byte[]) map.get(RStreamTemplate.OBJECT_PAYLOAD_KEY))).withId(mapRecord.getId());
    }

    public void afterPropertiesSet() throws Exception {
        this.streamMessageListenerContainer.start();
    }
}
