package com.github.javaclub.delaytask.impl;

import com.alibaba.fastjson.JSON;
import com.github.javaclub.BizException;
import com.github.javaclub.Constants;
import com.github.javaclub.delaytask.DelayJob;
import com.github.javaclub.delaytask.DelayQueueConsumer;
import com.github.javaclub.delaytask.DelayQueueProducer;
import com.github.javaclub.delaytask.DelayTaskConstants;
import com.github.javaclub.delaytask.annotation.DelayJobProcessor;
import com.github.javaclub.delaytask.annotation.DelayTaskListener;
import com.github.javaclub.delaytask.redis.PooledRedisClient;
import com.github.javaclub.delaytask.redis.RedisLua;
import com.github.javaclub.toolbox.AppBootHook;
import com.github.javaclub.toolbox.ToolBox;
import com.github.javaclub.toolbox.conf.AppConfigPropertiesWatcher;
import com.github.javaclub.toolbox.conf.CompositeAppConfigProperties;
import com.github.javaclub.toolbox.cron.CronUtils;
import com.github.javaclub.toolbox.model.MethodIdentifier;
import com.github.javaclub.toolbox.utils.ThreadLocalDateFormatter;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.Method;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;

/* loaded from: input_file:com/github/javaclub/delaytask/impl/DelayQueueConsumerImpl.class */
public class DelayQueueConsumerImpl implements DelayQueueConsumer, ApplicationContextAware, InitializingBean, DisposableBean {
    private static final long SLEEP_FOR_EMPTY_QUEUE = 1000;
    private static final long SLEEP_FOR_ERROR = 5000;
    private static final int MAX_IN_FLIGHT_JOB = 2000;
    private static final int EVENT_BUS_THREAD_POOL_SIZE = 8;
    private static final int DEFAULT_POP_COUNT = 10;
    private AsyncEventBus asyncEventBus;
    private ExecutorService eventBusPool;
    private PooledRedisClient pooledRedisClient;
    private String initWithQueue;
    private DelayQueueProducer delayQueueProducer;
    private static final Logger logger = LoggerFactory.getLogger(DelayQueueConsumerImpl.class);
    private static final Multimap<Class<?>, String> subscribers = HashMultimap.create();
    private volatile State state = State.NEW;
    private volatile Map<String, Thread> queueConsumerMap = Maps.newConcurrentMap();
    private String groupId = "default";
    private String batchSize = "10";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/javaclub/delaytask/impl/DelayQueueConsumerImpl$Consumer.class */
    public class Consumer implements Runnable {
        private String queue;
        private Map<String, Class> classCache = new HashMap();

        public Consumer(String str) {
            this.queue = str;
            DelayQueueConsumerImpl.logger.info("Subscribe queue={} on groupId={}", str, DelayQueueConsumerImpl.this.groupId);
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName("delaytask-queue-consumer-" + DelayQueueConsumerImpl.this.groupId + "-" + this.queue);
            String buildKey = DelayTaskConstants.RedisKeys.buildKey(DelayQueueConsumerImpl.this.groupId, this.queue);
            while (true) {
                if (DelayQueueConsumerImpl.this.state != State.RUNNING) {
                    break;
                }
                if (AppBootHook.isAppShuttingDown()) {
                    try {
                        DelayQueueConsumerImpl.this.destroy();
                        break;
                    } catch (Exception e) {
                    }
                } else if (DelayTaskConstants.DelayTaskConfig.isDelayTaskEnabled()) {
                    ArrayList newArrayList = Lists.newArrayList();
                    List list = null;
                    try {
                        try {
                            String valueOf = String.valueOf(Instant.now().toEpochMilli());
                            DelayQueueConsumerImpl.this.batchSize = CompositeAppConfigProperties.getInstance().getValue(DelayTaskConstants.DelayTaskConfig.DELAY_TASK_CONSUMER_BATCH_SIZE);
                            try {
                                String objects = Objects.toString(ToolBox.Numbers.parseInt(DelayQueueConsumerImpl.this.batchSize), String.valueOf(DelayQueueConsumerImpl.DEFAULT_POP_COUNT));
                                list = (List) DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis -> {
                                    return jedis.eval(RedisLua.ZSET_POP_SCRIPT, 1, new String[]{buildKey, valueOf, objects});
                                });
                            } catch (Exception e2) {
                                DelayQueueConsumerImpl.logger.error("Redis processing error, groupId: {}, queue: {}", new Object[]{DelayQueueConsumerImpl.this.groupId, this.queue, e2});
                            }
                            if (ToolBox.Collections.isEmpty(list)) {
                                Thread.sleep(ToolBox.Numbers.random(500L, DelayQueueConsumerImpl.SLEEP_FOR_EMPTY_QUEUE));
                                if (ToolBox.Collections.isNotEmpty(newArrayList)) {
                                    HashSet newHashSet = Sets.newHashSet(newArrayList);
                                    DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis2 -> {
                                        return jedis2.hdel(DelayTaskConstants.RedisKeys.scoreKey(DelayQueueConsumerImpl.this.groupId, this.queue), (String[]) newHashSet.toArray(new String[0]));
                                    });
                                }
                                if (ToolBox.Collections.isNotEmpty(list)) {
                                    String tracekey = DelayTaskConstants.RedisKeys.tracekey(DelayQueueConsumerImpl.this.groupId, this.queue);
                                    String concat = ToolBox.Strings.concat(new Object[]{ToolBox.Systems.getServerHostName(), "@", ToolBox.Systems.getLocalIp()});
                                    String valueOf2 = String.valueOf(ToolBox.Numbers.getTimestampMsLong());
                                    DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis3 -> {
                                        return jedis3.hset(tracekey, concat, valueOf2);
                                    });
                                }
                            } else {
                                Iterator it = list.iterator();
                                while (it.hasNext()) {
                                    DelayJob delayJob = (DelayJob) JSON.parseObject(it.next().toString(), DelayJob.class);
                                    Object javaObject = JSON.toJavaObject((JSON) delayJob.getEntity(), getClass(delayJob.getEntityClass()));
                                    DelayQueueConsumerImpl.this.asyncEventBus.post(javaObject);
                                    if (DelayQueueConsumerImpl.logger.isInfoEnabled()) {
                                        DelayQueueConsumerImpl.logger.info("Post delayed job: {}", javaObject);
                                    }
                                    if (ToolBox.Strings.isNotBlank(delayJob.getJobKey())) {
                                        newArrayList.add(delayJob.getJobKey());
                                    }
                                    if (null != delayJob.getContext() && delayJob.getContext().containsKey(DelayJob.CONTEXT_PARAM_CRON)) {
                                        String objects2 = ToolBox.Objects.toString(delayJob.getContext().get(DelayJob.CONTEXT_PARAM_CRON), "");
                                        if (ToolBox.Strings.isNotBlank(objects2)) {
                                            try {
                                                Date nextTime = CronUtils.getNextTime(objects2);
                                                if (null != nextTime) {
                                                    DelayJob<?> delayJob2 = new DelayJob<>(javaObject, delayJob.getContext());
                                                    delayJob2.addContextParam(DelayJob.CONTEXT_PARAM_TRIGGER_TIME, ThreadLocalDateFormatter.timestampFormat(nextTime));
                                                    delayJob2.removeAttribute(DelayJob.CONTEXT_PARAM_DELIVERY_TIMES);
                                                    DelayQueueConsumerImpl.this.delayQueueProducer.submit(this.queue, delayJob2, nextTime);
                                                }
                                            } catch (Exception e3) {
                                                DelayQueueConsumerImpl.logger.error("Errored submit delay task by cron={}, error={}", objects2, e3.getMessage());
                                            }
                                        }
                                    }
                                }
                                if (ToolBox.Collections.isNotEmpty(newArrayList)) {
                                    HashSet newHashSet2 = Sets.newHashSet(newArrayList);
                                    DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis22 -> {
                                        return jedis22.hdel(DelayTaskConstants.RedisKeys.scoreKey(DelayQueueConsumerImpl.this.groupId, this.queue), (String[]) newHashSet2.toArray(new String[0]));
                                    });
                                }
                                if (ToolBox.Collections.isNotEmpty(list)) {
                                    String tracekey2 = DelayTaskConstants.RedisKeys.tracekey(DelayQueueConsumerImpl.this.groupId, this.queue);
                                    String concat2 = ToolBox.Strings.concat(new Object[]{ToolBox.Systems.getServerHostName(), "@", ToolBox.Systems.getLocalIp()});
                                    String valueOf3 = String.valueOf(ToolBox.Numbers.getTimestampMsLong());
                                    DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis32 -> {
                                        return jedis32.hset(tracekey2, concat2, valueOf3);
                                    });
                                }
                            }
                        } catch (Throwable th) {
                            DelayQueueConsumerImpl.logger.error("Unexpected consume error, groupId:{}, queue:{}", new Object[]{DelayQueueConsumerImpl.this.groupId, this.queue, th});
                            try {
                                Thread.sleep(DelayQueueConsumerImpl.SLEEP_FOR_ERROR);
                            } catch (InterruptedException e4) {
                            }
                            if (ToolBox.Collections.isNotEmpty(newArrayList)) {
                                HashSet newHashSet3 = Sets.newHashSet(newArrayList);
                                DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis222 -> {
                                    return jedis222.hdel(DelayTaskConstants.RedisKeys.scoreKey(DelayQueueConsumerImpl.this.groupId, this.queue), (String[]) newHashSet3.toArray(new String[0]));
                                });
                            }
                            if (ToolBox.Collections.isNotEmpty((Collection) null)) {
                                String tracekey3 = DelayTaskConstants.RedisKeys.tracekey(DelayQueueConsumerImpl.this.groupId, this.queue);
                                String concat3 = ToolBox.Strings.concat(new Object[]{ToolBox.Systems.getServerHostName(), "@", ToolBox.Systems.getLocalIp()});
                                String valueOf4 = String.valueOf(ToolBox.Numbers.getTimestampMsLong());
                                DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis322 -> {
                                    return jedis322.hset(tracekey3, concat3, valueOf4);
                                });
                            }
                        }
                    } catch (Throwable th2) {
                        if (ToolBox.Collections.isNotEmpty(newArrayList)) {
                            HashSet newHashSet4 = Sets.newHashSet(newArrayList);
                            DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis2222 -> {
                                return jedis2222.hdel(DelayTaskConstants.RedisKeys.scoreKey(DelayQueueConsumerImpl.this.groupId, this.queue), (String[]) newHashSet4.toArray(new String[0]));
                            });
                        }
                        if (ToolBox.Collections.isNotEmpty((Collection) null)) {
                            String tracekey4 = DelayTaskConstants.RedisKeys.tracekey(DelayQueueConsumerImpl.this.groupId, this.queue);
                            String concat4 = ToolBox.Strings.concat(new Object[]{ToolBox.Systems.getServerHostName(), "@", ToolBox.Systems.getLocalIp()});
                            String valueOf5 = String.valueOf(ToolBox.Numbers.getTimestampMsLong());
                            DelayQueueConsumerImpl.this.pooledRedisClient.executeWithRetry(jedis3222 -> {
                                return jedis3222.hset(tracekey4, concat4, valueOf5);
                            });
                        }
                        throw th2;
                    }
                } else {
                    DelayQueueConsumerImpl.logger.warn("功能尚未开启, 请检查delaytask.enabled配置");
                }
            }
            DelayQueueConsumerImpl.logger.info("Consumer terminated, groupId:{}, queue:{}", DelayQueueConsumerImpl.this.groupId, this.queue);
        }

        private Class<?> getClass(String str) throws ClassNotFoundException {
            Class<?> cls = this.classCache.get(str);
            if (null == cls) {
                cls = Class.forName(str);
                this.classCache.put(str, cls);
            }
            return cls;
        }
    }

    /* loaded from: input_file:com/github/javaclub/delaytask/impl/DelayQueueConsumerImpl$State.class */
    public enum State {
        NEW,
        RUNNING,
        TERMINATED
    }

    public void setDelayQueueProducer(DelayQueueProducer delayQueueProducer) {
        this.delayQueueProducer = delayQueueProducer;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }

    public void setInitWithQueue(String str) {
        this.initWithQueue = str;
    }

    public void setBatchSize(String str) {
        this.batchSize = str;
    }

    public void setPooledRedisClient(PooledRedisClient pooledRedisClient) {
        this.pooledRedisClient = pooledRedisClient;
    }

    public void afterPropertiesSet() throws Exception {
        this.groupId = ToolBox.Strings.noneBlank(CompositeAppConfigProperties.getInstance().getValue(DelayTaskConstants.DelayTaskConfig.DELAY_TASK_GROUP_ID), "default");
        subscribe(this.initWithQueue);
    }

    /* JADX WARN: Type inference failed for: r0v26, types: [com.github.javaclub.delaytask.impl.DelayQueueConsumerImpl$1] */
    @Override // com.github.javaclub.delaytask.DelayQueueConsumer
    public void subscribe(String str) {
        if (this.state == State.TERMINATED) {
            return;
        }
        this.state = State.RUNNING;
        if (ToolBox.Strings.isBlank(str)) {
            str = CompositeAppConfigProperties.getInstance().getValue(DelayTaskConstants.DelayTaskConfig.DELAY_TASK_CONSUMER_QUEUE);
        }
        String[] splitAndTrim = ToolBox.Strings.splitAndTrim(str, ",");
        if (ToolBox.Objects.isEmpty(splitAndTrim)) {
            logger.warn("Please check the configuration of [delaytask.consumer.queue] if you want to subscribe DelayTask topics！");
        } else {
            logger.info("Delaytask group={}, subscription queue={}", this.groupId, Arrays.asList(splitAndTrim));
            String jSONString = JSON.toJSONString(currentGroupConfig());
            for (String str2 : splitAndTrim) {
                addGroupToTSR(str2, jSONString);
                registerConsumerThread(str2).start();
            }
        }
        new AppConfigPropertiesWatcher(new String[]{DelayTaskConstants.DelayTaskConfig.DELAY_TASK_CONSUMER_QUEUE}) { // from class: com.github.javaclub.delaytask.impl.DelayQueueConsumerImpl.1
            protected void doOnChange() {
                String[] splitAndTrim2;
                String value = CompositeAppConfigProperties.getInstance().getValue(DelayTaskConstants.DelayTaskConfig.DELAY_TASK_CONSUMER_QUEUE);
                DelayQueueConsumerImpl.logger.info("AppConfigPropertiesWatcher watch CompositeAppConfigProperties changed, consumerQueue = {}", value);
                if (!ToolBox.Strings.isNotBlank(value) || null == (splitAndTrim2 = ToolBox.Strings.splitAndTrim(value, ","))) {
                    return;
                }
                String jSONString2 = JSON.toJSONString(DelayQueueConsumerImpl.this.currentGroupConfig());
                for (String str3 : splitAndTrim2) {
                    Thread thread = (Thread) DelayQueueConsumerImpl.this.queueConsumerMap.get(str3);
                    if (null == thread) {
                        DelayQueueConsumerImpl.this.registerConsumerThread(str3).start();
                        DelayQueueConsumerImpl.this.addGroupToTSR(str3, jSONString2);
                    } else if (null != thread && !thread.isInterrupted()) {
                        try {
                            thread.join();
                        } catch (InterruptedException e) {
                        }
                    }
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (Map.Entry entry : DelayQueueConsumerImpl.this.queueConsumerMap.entrySet()) {
                    if (!ToolBox.Objects.contains(splitAndTrim2, entry.getKey())) {
                        String str4 = (String) entry.getKey();
                        newArrayList.add(str4);
                        DelayQueueConsumerImpl.this.removeGroupFromTSR(str4);
                        if (!((Thread) entry.getValue()).isInterrupted()) {
                            ((Thread) entry.getValue()).interrupt();
                            DelayQueueConsumerImpl.logger.warn("Consumer of queue = {} is interrupt for removed reason", entry.getKey());
                        }
                    }
                }
                Iterator it = newArrayList.iterator();
                while (it.hasNext()) {
                    DelayQueueConsumerImpl.this.queueConsumerMap.remove((String) it.next());
                }
            }
        }.start();
    }

    void addGroupToTSR(String str, String str2) {
        this.pooledRedisClient.executeWithRetry(jedis -> {
            return jedis.hset(DelayTaskConstants.RedisKeys.tsrKey(str), this.groupId, str2);
        });
    }

    void removeGroupFromTSR(String str) {
        this.pooledRedisClient.executeWithRetry(jedis -> {
            return jedis.hdel(DelayTaskConstants.RedisKeys.tsrKey(str), new String[]{this.groupId});
        });
    }

    Map<String, String> currentGroupConfig() {
        Map<String, String> treeMap = ToolBox.Maps.toTreeMap(CompositeAppConfigProperties.getInstance().selectPrefixKeyGroup(DelayTaskConstants.RedisKeys.DEFAULT_PREFIX));
        treeMap.put("appName", CompositeAppConfigProperties.getInstance().getAppName());
        treeMap.put("hostName", ToolBox.Systems.getServerHostName());
        return treeMap;
    }

    Thread registerConsumerThread(String str) {
        if (!this.queueConsumerMap.containsKey(str)) {
            this.queueConsumerMap.putIfAbsent(str, new Thread(new Consumer(str)));
        }
        return this.queueConsumerMap.get(str);
    }

    public void destroy() throws Exception {
        this.state = State.TERMINATED;
        this.eventBusPool.shutdown();
        if (this.eventBusPool.awaitTermination(30L, TimeUnit.SECONDS)) {
            return;
        }
        logger.warn("Force shutdown delay queue consumer eventBusPool");
        this.eventBusPool.shutdownNow();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.eventBusPool = new ThreadPoolExecutor(EVENT_BUS_THREAD_POOL_SIZE, EVENT_BUS_THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(MAX_IN_FLIGHT_JOB), new ThreadFactoryBuilder().setNameFormat("eventbus-delay-queue-consumer").build(), (runnable, threadPoolExecutor) -> {
            logger.error("Delaytask discarded due to overflow");
        });
        this.asyncEventBus = new AsyncEventBus(this.eventBusPool, (th, subscriberExceptionContext) -> {
            logger.error("Async event process failed with Subscriber: {}  Method: {}  Event: {}", new Object[]{subscriberExceptionContext.getSubscriber().getClass().getSimpleName(), subscriberExceptionContext.getSubscriberMethod().getName(), subscriberExceptionContext.getEvent(), th});
        });
        Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(DelayTaskListener.class);
        if (null != beansWithAnnotation) {
            checkDuplicatedAnnotationOfDelayJobProcessor(beansWithAnnotation);
        }
        Map<String, Object> beansWithAnnotation2 = applicationContext.getBeansWithAnnotation(DelayJobProcessor.class);
        if (null != beansWithAnnotation2) {
            checkDuplicatedAnnotationOfDelayTaskListener(beansWithAnnotation2);
        }
        Map<String, Object> union = ToolBox.Maps.union(new Map[]{beansWithAnnotation, beansWithAnnotation2});
        if (!DelayTaskConstants.DelayTaskConfig.isMultipleProcessEnabled()) {
            checkMultipleSubscribeProcess(union);
        }
        union.values().forEach(obj -> {
            logger.info("Register delay queue subscriber: {}", obj.getClass().getName());
            this.asyncEventBus.register(obj);
        });
    }

    private void checkMultipleSubscribeProcess(Map<String, Object> map) {
        for (Object obj : map.values()) {
            Class<?> cls = obj.getClass();
            String actualClassName = ToolBox.Objects.getActualClassName(obj);
            UnmodifiableIterator it = getAnnotatedMethods(cls).iterator();
            while (it.hasNext()) {
                Method method = (Method) it.next();
                String concat = ToolBox.Strings.concat(new Object[]{actualClassName, "#", method.getName()});
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (ToolBox.Objects.isEmpty(parameterTypes)) {
                    throw new BizException("!!!EventBus subscribeMethod=" + concat + " should have method parameter!!!");
                }
                Class<?> cls2 = parameterTypes[0];
                if (ToolBox.Objects.length(subscribers.get(cls2)) >= 1) {
                    Collection collection = subscribers.get(cls2);
                    collection.add(concat);
                    throw new BizException("EventBus eventType=" + cls2.getName() + " has one more Subscriber process, Please check allowed only one!!!" + Constants.Environments.LINE_SEPARATER + collection.toString());
                }
                subscribers.put(cls2, concat);
            }
        }
    }

    private void checkDuplicatedAnnotationOfDelayJobProcessor(Map<String, Object> map) {
        for (Object obj : map.values()) {
            if (null != AnnotationUtils.findAnnotation(obj.getClass(), DelayJobProcessor.class)) {
                throw new BizException("Duplicated DelayTask annotation @DelayJobProcessor on " + ToolBox.Objects.getActualClassName(obj) + ", @DelayTaskListener had already annotated.");
            }
        }
    }

    private void checkDuplicatedAnnotationOfDelayTaskListener(Map<String, Object> map) {
        for (Object obj : map.values()) {
            if (null != AnnotationUtils.findAnnotation(obj.getClass(), DelayTaskListener.class)) {
                throw new BizException("Duplicated DelayTask Annotation @DelayJobProcessor on " + ToolBox.Objects.getActualClassName(obj) + ", Please use @DelayTaskListener only.");
            }
        }
    }

    private static ImmutableList<Method> getAnnotatedMethods(Class<?> cls) {
        Set rawTypes = TypeToken.of(cls).getTypes().rawTypes();
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = rawTypes.iterator();
        while (it.hasNext()) {
            for (Method method : ((Class) it.next()).getDeclaredMethods()) {
                if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    Preconditions.checkArgument(parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters.Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length);
                    MethodIdentifier methodIdentifier = new MethodIdentifier(method);
                    if (!newHashMap.containsKey(methodIdentifier)) {
                        newHashMap.put(methodIdentifier, method);
                    }
                }
            }
        }
        return ImmutableList.copyOf(newHashMap.values());
    }
}
