/*
 * Decompiled with CFR 0.152.
 */
package cn.gongler.util.resend;

import cn.gongler.util.GonglerUtil;
import cn.gongler.util.QueueConsumer;
import cn.gongler.util.Recently;
import cn.gongler.util.function.ExceptionBiConsumer;
import cn.gongler.util.resend.IAckEventListener;
import cn.gongler.util.resend.IExpiredEventListener;
import cn.gongler.util.resend.ISendContext;
import cn.gongler.util.resend.ISendEventListener;
import cn.gongler.util.resend.ISendPackParams;
import cn.gongler.util.resend.ISender;
import cn.gongler.util.resend.ResenderEngineer;
import cn.gongler.util.tuple.Tuple;
import cn.gongler.util.tuple.Tuple7;
import java.io.Closeable;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;

public class Resender<Pack, Param, Ack>
implements Closeable {
    private static final long serialVersionUID = -1227835320183986648L;
    private final ResenderEngineer resendEngineer;
    private final Map<Long, SendBus> busSendingMap = new ConcurrentSkipListMap<Long, SendBus>();
    private final BiPredicate<Pack, Ack> ackChecker;
    private final ISender<Pack> realSender;
    private final QueueConsumer queueConsumer = QueueConsumer.of("BusResender,queueAutoConsumer");
    private final Set<ISendEventListener<Pack, Param, Ack>> sendEventListenerSet = ConcurrentHashMap.newKeySet();
    private final Set<IAckEventListener<Pack, Param, Ack>> sendAckEventListenerSet = ConcurrentHashMap.newKeySet();
    private final Set<IExpiredEventListener<Pack, Param, Ack>> sendExpiredEventListenerSet = ConcurrentHashMap.newKeySet();
    Recently<Long> sendMillsRecently = new Recently(4);
    private final ScheduledExecutorService scheduledSerivce = Executors.newSingleThreadScheduledExecutor();

    public Resender(BiPredicate<Pack, Ack> ackChecker, ISender<Pack> realSender) {
        ScheduledFuture<?> scheduledFuture = this.scheduledSerivce.scheduleWithFixedDelay(() -> {
            ArrayList<Long> removeList = new ArrayList<Long>();
            for (Map.Entry<Long, SendBus> entry : this.busSendingMap.entrySet()) {
                Long key = entry.getKey();
                SendBus bus = entry.getValue();
                if (!bus.isInactived()) continue;
                removeList.add(key);
            }
            for (Long key : removeList) {
                this.busSendingMap.remove(key);
            }
        }, 1L, 1L, TimeUnit.HOURS);
        this.resendEngineer = new ResenderEngineer();
        this.ackChecker = ackChecker;
        this.realSender = realSender::send;
    }

    ResenderEngineer ref() {
        return this.resendEngineer;
    }

    public Resender<Pack, Param, Ack> resendSeconds(long resendSeconds) {
        this.resendEngineer.resendSeconds(resendSeconds);
        return this;
    }

    public void add(Long busId, Pack pack, LocalDateTime expiredTime, boolean waitAck, Param param) {
        this.add(busId, pack, expiredTime, waitAck, param, null);
    }

    protected void add(Long busId, Pack pack, LocalDateTime expiredTime, boolean waitAck, Param param, ExceptionBiConsumer<Ack, ISendContext> sendResultHandler) {
        SendBus bus = this.busSendingMap.computeIfAbsent(busId, x$0 -> new SendBus((Long)x$0));
        ISendPackParams2<Pack, Param, Ack> params = Resender.toSendPackParams(busId, pack, expiredTime, waitAck, param, this.realSender, sendResultHandler);
        bus.addNewSendData(params);
    }

    public void ack(Long busId, Ack ackPack) {
        SendBus bus = this.busSendingMap.get(busId);
        if (bus != null) {
            bus.handleAck(ackPack);
        }
    }

    public long sendingBusCount() {
        return this.busSendingMap.values().stream().filter(SendBus::isSending).count();
    }

    public Map<Long, Long> busPendingPackCountMap() {
        return this.busSendingMap.values().stream().filter(SendBus::isSending).collect(Collectors.toMap(SendBus::busId, SendBus::getQueueSize));
    }

    public Resender<Pack, Param, Ack> addSendEventListener(ISendEventListener<Pack, Param, Ack> lis) {
        this.sendEventListenerSet.add(lis);
        return this;
    }

    public Resender<Pack, Param, Ack> addSendAckEventListener(IAckEventListener<Pack, Param, Ack> lis) {
        this.sendAckEventListenerSet.add(lis);
        return this;
    }

    public Resender<Pack, Param, Ack> addSendExpiredEventListener(IExpiredEventListener<Pack, Param, Ack> lis) {
        this.sendExpiredEventListenerSet.add(lis);
        return this;
    }

    private void notifySendEvent(ISendPackParams2<Pack, Param, Ack> params, int sendTimes) {
        this.sendEventListenerSet.stream().forEach(lis -> this.queueConsumer.accept(() -> lis.sendEvent(params, sendTimes)));
    }

    long ackMillnsAvg() {
        OptionalDouble ret = this.sendMillsRecently.values().stream().mapToLong(a -> a).average();
        return ret.isPresent() ? (long)ret.getAsDouble() : -1L;
    }

    private void notifySendAckEvent(ISendPackParams2<Pack, Param, Ack> params, Ack ack) {
        SendContext sendContext = params.sendContext();
        if (sendContext.sendTimes() == 1) {
            this.sendMillsRecently.push(sendContext.duration().toMillis());
        }
        this.sendAckEventListenerSet.stream().forEach(lis -> this.queueConsumer.accept(() -> lis.sendAckEvent(params, ack, params.sendContext())));
    }

    private void notifySendExpiredEvent(ISendPackParams2<Pack, Param, Ack> params, int sendTimes) {
        this.sendExpiredEventListenerSet.stream().forEach(lis -> this.queueConsumer.accept(() -> lis.sendExpiredEvent(params, sendTimes, params.sendContext())));
    }

    private void callback(Ack ackPack, ISendPackParams2<Pack, Param, Ack> sendEvent) {
        ExceptionBiConsumer resultHandler = sendEvent.sendResultHandler();
        if (resultHandler != null) {
            this.queueConsumer.accept(() -> resultHandler.accept(ackPack, sendEvent.sendContext()));
        }
    }

    @Override
    public void close() throws IOException {
        this.resendEngineer.close();
    }

    public String toString() {
        StringBuilder buf = new StringBuilder().append(this.getClass().getSimpleName()).append("_sendingBusCount:").append(this.sendingBusCount()).append("_queueAutoConsumer:").append(this.queueConsumer).append("_ackMillnsAvg:").append(this.ackMillnsAvg());
        return buf.toString();
    }

    private static <Pack, Param, Ack> ISendPackParams2<Pack, Param, Ack> toSendPackParams(Long busId, Pack pack, LocalDateTime expiredTime, boolean waitAck, Param param, ISender<Pack> sender, ExceptionBiConsumer<Ack, ISendContext> sendResultHandler) {
        final Tuple7<Long, Pack, LocalDateTime, Boolean, Param, ISender<Pack>, ExceptionBiConsumer<Ack, ISendContext>> tuple = Tuple.of(busId, pack, expiredTime, waitAck, param, sender, sendResultHandler);
        return new ISendPackParams2<Pack, Param, Ack>(){
            SendContext sendContext = new SendContext();

            @Override
            public Long busId() {
                return (Long)tuple.first();
            }

            @Override
            public Pack pack() {
                return tuple.second();
            }

            @Override
            public LocalDateTime expiredTime() {
                return (LocalDateTime)tuple.third();
            }

            @Override
            public boolean waitAck() {
                return (Boolean)tuple.forth();
            }

            @Override
            public Param param() {
                return tuple.fifth();
            }

            @Override
            public ISender<Pack> sender() {
                return (ISender)tuple.sixth();
            }

            @Override
            public ExceptionBiConsumer<Ack, ISendContext> sendResultHandler() {
                return (ExceptionBiConsumer)tuple.seventh();
            }

            public String toString() {
                return tuple.toString();
            }

            @Override
            public SendContext sendContext() {
                return this.sendContext;
            }
        };
    }

    private static interface ISendPackParams2<Pack, Param, Ack>
    extends ISendPackParams<Pack, Param, Ack> {
        public SendContext sendContext();
    }

    private class SendBus {
        private static final long serialVersionUID = 1L;
        final Long busId;
        private final ConcurrentLinkedQueue<ISendPackParams2<Pack, Param, Ack>> queue = new ConcurrentLinkedQueue();
        private long activeTime = System.currentTimeMillis();
        volatile ResenderEngineer.ISendTaskRef ref;

        SendBus(Long busId) {
            this.busId = busId;
        }

        public Long busId() {
            return this.busId;
        }

        private ISendPackParams2<Pack, Param, Ack> getCurrent() {
            return this.queue.peek();
        }

        public long getQueueSize() {
            return this.queue.size();
        }

        boolean isSending() {
            return !this.queue.isEmpty();
        }

        synchronized void addNewSendData(ISendPackParams2<Pack, Param, Ack> sendEvent) {
            boolean idle = this.getCurrent() == null;
            boolean sucess = this.queue.offer(sendEvent);
            if (sucess && idle) {
                this.notifyChangedSendingObjectEvent();
            }
            this.activeTime = System.currentTimeMillis();
        }

        synchronized ISendPackParams2<Pack, Param, Ack> handleAck(Ack ackPack) {
            ISendPackParams2 sendEvent = this.getCurrent();
            if (sendEvent != null && Resender.this.ackChecker.test(sendEvent.pack(), ackPack)) {
                sendEvent.sendContext().sendFinished(true);
                Resender.this.notifySendAckEvent(sendEvent, ackPack);
                Resender.this.callback(ackPack, sendEvent);
                Resender.this.resendEngineer.removeSendTask(this.ref);
                this.sendFinished();
                return sendEvent;
            }
            return null;
        }

        private void notifyChangedSendingObjectEvent() {
            ISendPackParams2 sendEvent = this.getCurrent();
            if (sendEvent != null) {
                IntConsumer sendTask = times -> {
                    GonglerUtil.ExecuteWithCatchAny(() -> sendEvent.sender().send(this.busId, sendEvent.pack()));
                    sendEvent.sendContext().sendTimeIncrement();
                    Resender.this.notifySendEvent(sendEvent, times);
                };
                sendTask.accept(1);
                if (!sendEvent.waitAck()) {
                    this.sendFinished();
                } else {
                    this.ref = Resender.this.resendEngineer.addSendTask(sendTask, sendEvent.expiredTime(), sendTimes -> this.expiredEvent(sendEvent, sendTimes));
                }
            }
        }

        private void expiredEvent(ISendPackParams2<Pack, Param, Ack> sendEvent, int sendTimes) {
            sendEvent.sendContext().sendFinished(false);
            Resender.this.notifySendExpiredEvent(sendEvent, sendTimes);
            Resender.this.callback(null, sendEvent);
            this.sendFinished();
        }

        private boolean isInactived() {
            return System.currentTimeMillis() - this.activeTime > TimeUnit.HOURS.toMillis(24L);
        }

        synchronized void sendFinished() {
            ISendPackParams2 skipNextExpiredPack = null;
            LocalDateTime now = LocalDateTime.now();
            do {
                if (skipNextExpiredPack != null) {
                    try {
                        Resender.this.notifySendExpiredEvent(skipNextExpiredPack, 0);
                        Resender.this.callback(null, skipNextExpiredPack);
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                    }
                }
                this.queue.poll();
            } while ((skipNextExpiredPack = this.getCurrent()) != null && skipNextExpiredPack.expiredTime().isBefore(now));
            this.notifyChangedSendingObjectEvent();
        }
    }

    static class SendContext
    implements ISendContext {
        private static final long serialVersionUID = 1L;
        int sendTimes = 0;
        LocalDateTime createTime = LocalDateTime.now();
        LocalDateTime firstSendTime;
        LocalDateTime lastSendTime;
        LocalDateTime finishTime;
        ISendContext.SendState sendState = ISendContext.SendState.WAITING;

        SendContext() {
        }

        public void sendTimeIncrement() {
            ++this.sendTimes;
            LocalDateTime now = LocalDateTime.now();
            if (this.firstSendTime == null) {
                this.firstSendTime = now;
                this.sendState = ISendContext.SendState.SENDING;
            }
            this.lastSendTime = now;
        }

        public void sendFinished(boolean sucess) {
            this.finishTime = LocalDateTime.now();
            this.sendState = sucess ? ISendContext.SendState.SENT_SUCESS : ISendContext.SendState.SENT_FAILTURE;
        }

        @Override
        public int sendTimes() {
            return this.sendTimes;
        }

        @Override
        public LocalDateTime createTime() {
            return this.createTime;
        }

        @Override
        public LocalDateTime firstSendTime() {
            return this.firstSendTime;
        }

        @Override
        public LocalDateTime lastSendTime() {
            return this.lastSendTime;
        }

        @Override
        public LocalDateTime finishTime() {
            return this.finishTime;
        }

        @Override
        public ISendContext.SendState sendState() {
            return this.sendState;
        }

        public String toString() {
            return (Object)((Object)this.sendState()) + ",sendTimes:" + this.sendTimes() + ",duration:" + this.duration() + ",create:" + this.createTime() + ",firstSendTime:" + this.firstSendTime() + ",lastSendTime:" + this.lastSendTime() + ",finishTime:" + this.finishTime();
        }
    }
}

