package com.github.taccisum.pigeon.core.entity.core;

import com.github.taccisum.domain.core.Entity;
import com.github.taccisum.domain.core.Event;
import com.github.taccisum.pigeon.core.dao.MessageDAO;
import com.github.taccisum.pigeon.core.dao.MessageMassDAO;
import com.github.taccisum.pigeon.core.data.MessageMassDO;
import com.github.taccisum.pigeon.core.entity.core.Message;
import com.github.taccisum.pigeon.core.repo.MassTacticRepo;
import com.github.taccisum.pigeon.core.repo.MessageRepo;
import com.github.taccisum.pigeon.core.repo.SubMassRepo;
import com.github.taccisum.pigeon.core.service.AsyncDeliverSubMassService;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.lang.NotImplementedException;
import org.pf4j.PluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/taccisum/pigeon/core/entity/core/MessageMass.class */
public class MessageMass extends Entity.Base<Long> {
    private static final int SUB_MASS_SIZE = 500;
    protected final Logger log;

    @Resource
    private PluginManager pluginManager;

    @Resource
    MessageMassDAO dao;

    @Resource
    private SubMassRepo subMassRepo;

    @Resource
    private MessageRepo messageRepo;

    @Resource
    private MassTacticRepo massTacticRepo;

    @Resource
    private MessageDAO messageDAO;

    /* loaded from: input_file:com/github/taccisum/pigeon/core/entity/core/MessageMass$DeliveredEvent.class */
    public static class DeliveredEvent extends Event.Base<MessageMass> {
    }

    /* loaded from: input_file:com/github/taccisum/pigeon/core/entity/core/MessageMass$PartDeliveredEvent.class */
    public static class PartDeliveredEvent extends Event.Base<MessageMass> {
    }

    /* loaded from: input_file:com/github/taccisum/pigeon/core/entity/core/MessageMass$StartDeliverEvent.class */
    public static class StartDeliverEvent extends Event.Base<MessageMass> {
        private boolean boost;

        public StartDeliverEvent(boolean z) {
            this.boost = z;
        }

        public boolean isBoost() {
            return this.boost;
        }
    }

    /* loaded from: input_file:com/github/taccisum/pigeon/core/entity/core/MessageMass$Status.class */
    public enum Status {
        CREATING,
        NOT_DELIVERED,
        DELIVERING,
        ALL_DELIVERED
    }

    public MessageMass(Long l) {
        super(l);
        this.log = LoggerFactory.getLogger(getClass());
    }

    public MessageMassDO data() {
        return this.dao.selectById((Serializable) id());
    }

    public List<Message> listMessages() {
        return listMessages(100L);
    }

    public List<Message> listMessages(long j) {
        return this.messageRepo.listByMassId((Long) id(), j);
    }

    public void deliver() {
        deliver(false);
    }

    public void deliver(boolean z) {
        boolean z2 = z || size() > SUB_MASS_SIZE;
        updateStatus(Status.DELIVERING);
        publish(new StartDeliverEvent(z2));
        if (!z2) {
            deliverOnLocal();
            publish(new DeliveredEvent());
            updateStatus(Status.ALL_DELIVERED);
            return;
        }
        List<SubMass> partition = partition();
        List extensions = this.pluginManager.getExtensions(AsyncDeliverSubMassService.class);
        if (extensions == null || extensions.size() == 0) {
            throw new RuntimeException("");
        }
        if (extensions.size() > 1) {
            this.log.warn("仅允许存在一个 {} 扩展点，将使用优先级最高的扩展点", AsyncDeliverSubMassService.class.getSimpleName());
            extensions.sort(Comparator.comparingInt((v0) -> {
                return v0.getOrder();
            }));
        }
        AsyncDeliverSubMassService asyncDeliverSubMassService = (AsyncDeliverSubMassService) extensions.get(0);
        if (asyncDeliverSubMassService != null) {
            Iterator<SubMass> it = partition.iterator();
            while (it.hasNext()) {
                asyncDeliverSubMassService.publish(new AsyncDeliverSubMassService.DeliverSubMassCommand((Long) it.next().id()));
            }
        }
    }

    private List<SubMass> partition() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < size() / SUB_MASS_SIZE; i++) {
            arrayList.add(this.subMassRepo.create((Long) id(), i, i * SUB_MASS_SIZE, (i + 1) * SUB_MASS_SIZE, SUB_MASS_SIZE));
        }
        return arrayList;
    }

    private void deliverOnLocal() {
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        Iterator<Message> it = listMessages(Long.MAX_VALUE).iterator();
        while (it.hasNext()) {
            try {
                if (it.next().deliver()) {
                    i++;
                } else {
                    i2++;
                }
            } catch (Message.DeliverException e) {
                this.log.warn("消息发送失败", e);
                i2++;
            } catch (Exception e2) {
                this.log.error("消息发送出错", e2);
                i3++;
            }
        }
        increaseCount(i, i2, i3);
    }

    void increaseCount(int i, int i2, int i3) {
        MessageMassDO data = data();
        MessageMassDO newEmptyDataObject = this.dao.newEmptyDataObject();
        newEmptyDataObject.setId((Long) id());
        newEmptyDataObject.setSuccessCount(Integer.valueOf(((Integer) Optional.ofNullable(data.getSuccessCount()).orElse(0)).intValue() + i));
        newEmptyDataObject.setFailCount(Integer.valueOf(((Integer) Optional.ofNullable(data.getFailCount()).orElse(0)).intValue() + i2));
        newEmptyDataObject.setErrorCount(Integer.valueOf(((Integer) Optional.ofNullable(data.getErrorCount()).orElse(0)).intValue() + i3));
        this.dao.updateById(newEmptyDataObject);
    }

    private int size() {
        return ((Integer) Optional.ofNullable(data().getSize()).orElse(0)).intValue();
    }

    public void refreshStat() {
        throw new NotImplementedException();
    }

    public void addAll(List<Message> list) {
        this.messageDAO.updateMassIdBatch((Long) id(), (List) list.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList()));
        MessageMassDO newEmptyDataObject = this.dao.newEmptyDataObject();
        newEmptyDataObject.setId((Long) id());
        newEmptyDataObject.setSize(Integer.valueOf(size() + list.size()));
        this.dao.updateById(newEmptyDataObject);
    }

    public void markPrepared() {
        updateStatus(Status.NOT_DELIVERED);
    }

    private void updateStatus(Status status) {
        MessageMassDO newEmptyDataObject = this.dao.newEmptyDataObject();
        newEmptyDataObject.setId((Long) id());
        newEmptyDataObject.setStatus(status);
        this.dao.updateById(newEmptyDataObject);
    }

    public Optional<MassTactic> getTactic() {
        return this.massTacticRepo.get(data().getTacticId().longValue());
    }
}
