/*
 * Decompiled with CFR 0.152.
 */
package io.continual.onap.services.publisher;

import io.continual.onap.services.mrCommon.OnapMrResponse;
import io.continual.onap.services.publisher.OnapMsgRouterPublisher;
import java.io.Closeable;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OnapMsgRouterBatchPublisher
implements Closeable {
    private final OnapMsgRouterPublisher fPub;
    private final LinkedList<MessageWrapper> fPendingMsgs;
    private final int fMaxBatch;
    private final long fMaxTimeMs;
    private final long fBackoffMs;
    private final long fMaxPendingCount;
    private final DropPolicy fDropPolicy;
    private final Logger fLog;
    private final SvcThread fService;
    private static final Logger defaultLog = LoggerFactory.getLogger(OnapMsgRouterBatchPublisher.class);
    private static final long kEmptyQueueMaxWaitMs = 100L;
    private static final int kDefaultMaxBatch = 100;
    private static final long kDefaultMaxTimeMs = 500L;
    private static final long kDefaultBackoffTimeMs = 1000L;
    private static final long kDefaultMaxPendingCount = -1L;

    public static OnapMsgRouterBatchPublisher build(OnapMsgRouterPublisher pub, int atMost, int maxAgeMs) {
        return new Builder().usingPublisher(pub).batchAtMost(atMost).batchMaxAgeMs(maxAgeMs).build();
    }

    public synchronized void start() {
        this.fService.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void close() {
        try {
            this.fService.signalClose();
            this.fService.join();
            LinkedList<MessageWrapper> linkedList = this.fPendingMsgs;
            synchronized (linkedList) {
                long giveUpAtMs = this.fPub.getClock().nowMs() + 60000L;
                while (this.fPendingMsgs.size() > 0 && this.fPub.getClock().nowMs() < giveUpAtMs) {
                    long waitMs = this.send();
                    if (waitMs <= 0L) continue;
                    Thread.sleep(waitMs);
                }
                if (this.fPendingMsgs.size() > 0) {
                    this.fLog.warn("Unable to send {} messages before giving up.", (Object)this.fPendingMsgs.size());
                } else {
                    this.fLog.info("Batch sender closed with no pending messages.");
                }
            }
        }
        catch (InterruptedException e) {
            this.fLog.warn("Interrupted while closing background send thread: {}", (Object)e.getMessage());
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public OnapMsgRouterBatchPublisher send(OnapMsgRouterPublisher.Message msg) {
        LinkedList<MessageWrapper> linkedList = this.fPendingMsgs;
        synchronized (linkedList) {
            this.fPendingMsgs.add(new MessageWrapper(msg));
            this.fPendingMsgs.notify();
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public OnapMsgRouterBatchPublisher send(List<OnapMsgRouterPublisher.Message> msgs) {
        LinkedList<MessageWrapper> wrappers = new LinkedList<MessageWrapper>();
        for (OnapMsgRouterPublisher.Message msg : msgs) {
            wrappers.add(new MessageWrapper(msg));
        }
        LinkedList<MessageWrapper> linkedList = this.fPendingMsgs;
        synchronized (linkedList) {
            this.fPendingMsgs.addAll(wrappers);
            this.fPendingMsgs.notify();
        }
        return this;
    }

    private OnapMsgRouterBatchPublisher(Builder builder) {
        this.fPub = builder.fPub;
        this.fLog = builder.fLog;
        this.fPendingMsgs = new LinkedList();
        this.fMaxBatch = builder.fMaxBatch;
        this.fMaxTimeMs = builder.fMaxTimeMs;
        this.fBackoffMs = builder.fBackoffMs;
        this.fMaxPendingCount = builder.fMaxPendingCount;
        this.fDropPolicy = builder.fMaxPendingDropPolicy;
        this.fService = new SvcThread();
        if (this.fPub == null) {
            throw new IllegalArgumentException("A publisher must be provided.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long checkSend() {
        LinkedList<MessageWrapper> linkedList = this.fPendingMsgs;
        synchronized (linkedList) {
            int pending = this.fPendingMsgs.size();
            boolean sendNow = pending >= this.fMaxBatch || pending > 0 && this.fPendingMsgs.peekFirst().queuedAtMs() + this.fMaxTimeMs <= this.fPub.getClock().nowMs();
            long result = 100L;
            if (sendNow) {
                result = this.send();
            } else if (pending > 0) {
                long queuedAtMs = this.fPendingMsgs.peekFirst().queuedAtMs();
                long dueAtMs = queuedAtMs + this.fMaxTimeMs;
                result = Math.max(0L, dueAtMs - this.fPub.getClock().nowMs());
            }
            return result;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long send() {
        LinkedList<MessageWrapper> linkedList = this.fPendingMsgs;
        synchronized (linkedList) {
            LinkedList<OnapMsgRouterPublisher.Message> msgs = new LinkedList<OnapMsgRouterPublisher.Message>();
            for (MessageWrapper wrap : this.fPendingMsgs) {
                msgs.add(wrap.message());
            }
            OnapMrResponse response = this.fPub.send(msgs);
            if (response.isSuccess()) {
                this.fPendingMsgs.clear();
                return 100L;
            }
            this.fLog.warn("MR send failed with {} {}. Waiting {} ms for retry.", new Object[]{response.getStatusCode(), response.getStatusText(), this.fBackoffMs});
            return this.fBackoffMs;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkForDrops() {
        if (this.fMaxPendingCount < 0L) {
            return;
        }
        int removals = 0;
        long earliestTs = Long.MAX_VALUE;
        long latestTs = Long.MIN_VALUE;
        LinkedList<MessageWrapper> linkedList = this.fPendingMsgs;
        synchronized (linkedList) {
            while ((long)this.fPendingMsgs.size() > this.fMaxPendingCount) {
                long queueTimeMs = 0L;
                switch (this.fDropPolicy.ordinal()) {
                    case 1: {
                        queueTimeMs = this.fPendingMsgs.removeLast().queuedAtMs();
                        break;
                    }
                    case 0: {
                        queueTimeMs = this.fPendingMsgs.removeFirst().queuedAtMs();
                    }
                }
                ++removals;
                earliestTs = Math.min(earliestTs, queueTimeMs);
                latestTs = Math.max(latestTs, queueTimeMs);
            }
            if (removals > 0) {
                this.fLog.warn("Dropped {} messages with time range from {} to {}.", new Object[]{removals, earliestTs, latestTs});
            }
        }
    }

    static /* synthetic */ Logger access$100() {
        return defaultLog;
    }

    public static class Builder {
        private OnapMsgRouterPublisher fPub = null;
        private Logger fLog = OnapMsgRouterBatchPublisher.access$100();
        private int fMaxBatch = 100;
        private long fMaxTimeMs = 500L;
        private long fBackoffMs = 1000L;
        private long fMaxPendingCount = -1L;
        private DropPolicy fMaxPendingDropPolicy = DropPolicy.DROP_OLDEST;

        public Builder usingPublisher(OnapMsgRouterPublisher pub) {
            this.fPub = pub;
            return this;
        }

        public Builder logTo(Logger log) {
            this.fLog = log;
            return this;
        }

        public Builder batchAtMost(int atMost) {
            this.fMaxBatch = atMost;
            return this;
        }

        public Builder batchMaxAgeMs(int maxAgeMs) {
            this.fMaxTimeMs = maxAgeMs;
            return this;
        }

        public Builder withMaxPendingCount(int maxPending, DropPolicy dropPolicy) {
            this.fMaxPendingCount = maxPending;
            this.fMaxPendingDropPolicy = dropPolicy;
            return this;
        }

        public Builder retryAfterMs(int retryWaitMs) {
            this.fBackoffMs = retryWaitMs;
            return this;
        }

        public OnapMsgRouterBatchPublisher build() {
            return new OnapMsgRouterBatchPublisher(this);
        }
    }

    private class SvcThread
    extends Thread {
        private boolean fClose = false;

        private SvcThread() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long nextSendDueMs = 100L;
            while (!this.shouldClose()) {
                LinkedList linkedList = OnapMsgRouterBatchPublisher.this.fPendingMsgs;
                synchronized (linkedList) {
                    try {
                        if (nextSendDueMs > 0L) {
                            OnapMsgRouterBatchPublisher.this.fPendingMsgs.wait(nextSendDueMs);
                        }
                        nextSendDueMs = OnapMsgRouterBatchPublisher.this.checkSend();
                        OnapMsgRouterBatchPublisher.this.checkForDrops();
                    }
                    catch (InterruptedException e) {
                        OnapMsgRouterBatchPublisher.this.fLog.warn("Background thread interrupted while waiting for input signal: {}", (Object)e.getMessage());
                    }
                }
            }
        }

        public synchronized void signalClose() {
            this.fClose = true;
        }

        private synchronized boolean shouldClose() {
            return this.fClose;
        }
    }

    private class MessageWrapper {
        public final OnapMsgRouterPublisher.Message fMsg;
        public final long fQueuedAtMs;

        public MessageWrapper(OnapMsgRouterPublisher.Message msg) {
            this.fMsg = msg;
            this.fQueuedAtMs = OnapMsgRouterBatchPublisher.this.fPub.getClock().nowMs();
        }

        public OnapMsgRouterPublisher.Message message() {
            return this.fMsg;
        }

        public long queuedAtMs() {
            return this.fQueuedAtMs;
        }
    }

    public static enum DropPolicy {
        DROP_OLDEST,
        DROP_NEWEST;


        public static DropPolicy fromSettingString(String val) {
            if (val == null) {
                return DROP_OLDEST;
            }
            try {
                return DropPolicy.valueOf(val.trim().toUpperCase());
            }
            catch (IllegalArgumentException x) {
                return DROP_OLDEST;
            }
        }
    }
}

