/*
 * Decompiled with CFR 0.152.
 */
package net.sf.eBus.feed.historic;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import javax.annotation.Nullable;
import net.sf.eBus.client.EFeed;
import net.sf.eBus.client.EFeedState;
import net.sf.eBus.client.EObject;
import net.sf.eBus.client.EPublishFeed;
import net.sf.eBus.client.EPublisher;
import net.sf.eBus.client.EReplier;
import net.sf.eBus.client.EReplyFeed;
import net.sf.eBus.client.IEPublishFeed;
import net.sf.eBus.feed.historic.EAbstractHistoricFeed;
import net.sf.eBus.feed.historic.HistoricPublishStatusCallback;
import net.sf.eBus.feed.historic.HistoricReply;
import net.sf.eBus.feed.historic.HistoricRequest;
import net.sf.eBus.feed.historic.IEHistoricPublisher;
import net.sf.eBus.feed.historic.IEMessageStore;
import net.sf.eBus.feed.historic.PublishStatusEvent;
import net.sf.eBus.messages.EMessageKey;
import net.sf.eBus.messages.ENotificationMessage;
import net.sf.eBus.messages.EReplyMessage;
import net.sf.eBus.util.Validator;
import net.sf.eBusx.time.EInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class EHistoricPublishFeed
extends EAbstractHistoricFeed<IEHistoricPublisher>
implements EPublisher,
EReplier {
    public static final int DEFAULT_NOTIFICATIONS_PER_REPLY = 5;
    public static final String DEFAULT_PUBLISH_FEED_NAME = "EHistoricPublishFeed-";
    private static final Logger sLogger = LoggerFactory.getLogger(EHistoricPublishFeed.class);
    private final HistoricPublishStatusCallback mStatusCallback;
    private final long mPublisherId;
    private final IEMessageStore mMessageStore;
    private final int mNotificationsPerReply;
    private EPublishFeed mPublishFeed;
    private EPublishFeed mStatusFeed;
    private EReplyFeed mReplyFeed;
    private int mStatusPosition;

    private EHistoricPublishFeed(Builder builder) {
        super(builder);
        this.mStatusCallback = builder.getStatusCallback();
        this.mPublisherId = ((IEHistoricPublisher)builder.mOwner).publisherId();
        this.mMessageStore = builder.mMessageStore;
        this.mNotificationsPerReply = builder.mNotificationsPerReply;
        this.mStatusPosition = 0;
    }

    @Override
    protected void doClose() {
        this.closeFeed((EFeed)this.mPublishFeed);
        this.mPublishFeed = null;
        this.closeFeed((EFeed)this.mStatusFeed);
        this.mStatusFeed = null;
        this.closeFeed((EFeed)this.mReplyFeed);
        this.mReplyFeed = null;
    }

    public String name() {
        return this.mName;
    }

    public void startup() {
        EPublishFeed.Builder pubBuilder = EPublishFeed.builder();
        EPublishFeed.Builder statBuilder = EPublishFeed.builder();
        EReplyFeed.Builder repBuilder = EReplyFeed.builder();
        sLogger.debug("{}: starting.", (Object)this.mName);
        this.mPublishFeed = (EPublishFeed)((EPublishFeed.Builder)((EPublishFeed.Builder)((EPublishFeed.Builder)pubBuilder.target((EObject)this)).messageKey(this.mKey)).scope(this.mScope)).statusCallback(this::logFeedState).build();
        this.mStatusFeed = (EPublishFeed)((EPublishFeed.Builder)((EPublishFeed.Builder)((EPublishFeed.Builder)statBuilder.target((EObject)this)).messageKey(this.mStatusKey)).scope(this.mScope)).statusCallback(this::logFeedState).build();
        this.mStatusFeed.advertise();
        this.mStatusFeed.updateFeedState(EFeedState.UP);
        this.mReplyFeed = (EReplyFeed)((EReplyFeed.Builder)((EReplyFeed.Builder)((EReplyFeed.Builder)repBuilder.target((EObject)this)).messageKey(this.mRequestKey)).scope(this.mScope)).requestCallback(this::historicRequest).cancelRequestCallback(this::historicCancel).build();
        this.mReplyFeed.advertise();
        this.mReplyFeed.updateFeedState(EFeedState.UP);
        this.mIsOpen = true;
        sLogger.info("{}: started.", (Object)this.mName);
    }

    public void shutdown() {
        sLogger.debug("{}: shutting down.", (Object)this.mName);
        this.close();
        sLogger.info("{}: shut down.", (Object)this.mName);
    }

    private void logFeedState(EFeedState feedState, IEPublishFeed feed) {
        sLogger.debug("{} publisher {}, feed {}: {} is {}.", new Object[]{this.mPublishFeed.location(), this.mPublishFeed.clientId(), this.mPublishFeed.feedId(), feed.key(), feedState});
        this.forwardPublishStatus(feedState);
    }

    private void historicRequest(EReplyFeed.ERequest request) {
        HistoricRequest hReq = (HistoricRequest)request.request();
        EInterval interval = hReq.interval;
        sLogger.debug("{}: received {} historic request for interval {}.", new Object[]{this.mName, this.mRequestKey, interval});
        try {
            Collection<ENotificationMessage> history = this.mMessageStore.retrieve(interval);
            if (history.isEmpty()) {
                this.postEmptyReply(hReq.interval, request);
            } else {
                this.sendHistoricReplies(history, request);
            }
        }
        catch (Exception jex) {
            String reason = Strings.isNullOrEmpty((String)jex.getMessage()) ? String.format("failed to retrieve historic messages for interval %s; no reason given", interval) : jex.getMessage();
            sLogger.warn("{}: failed to retrieve historic notifications for interval {}.", (Object)this.mName, (Object)interval);
            this.postErrorReply(reason, request);
        }
    }

    private void historicCancel(EReplyFeed.ERequest request, boolean mayRespond) {
        sLogger.debug("Attempt to cancel {} historic request (may respond {}); ignored.", (Object)request.messageSubject(), (Object)mayRespond);
    }

    public long publisherId() {
        return this.mPublisherId;
    }

    public int notificationsPerReply() {
        return this.mNotificationsPerReply;
    }

    public boolean isFeedUp() {
        return this.mInPlace && this.mFeedState == EFeedState.UP;
    }

    @VisibleForTesting
    IEMessageStore messageStore() {
        return this.mMessageStore;
    }

    public void advertise() {
        if (!this.mIsOpen) {
            throw new IllegalStateException("feed is closed");
        }
        if (!this.mInPlace) {
            sLogger.debug("{} publisher {}, feed {}: advertised.", new Object[]{this.mPublishFeed.location(), this.mPublishFeed.clientId(), this.mPublishFeed.feedId()});
            this.mPublishFeed.advertise();
            this.mInPlace = true;
        }
    }

    public void unadvertise() {
        if (this.mInPlace) {
            if (this.mFeedState == EFeedState.UP) {
                this.updateFeedState(EFeedState.UNKNOWN);
            }
            this.mPublishFeed.unadvertise();
            this.mInPlace = false;
        }
    }

    public void updateFeedState(EFeedState update) {
        Objects.requireNonNull(update, "update is null");
        if (!this.mInPlace) {
            throw new IllegalStateException("feed not advertised");
        }
        if (update != this.mFeedState) {
            PublishStatusEvent.Builder builder = PublishStatusEvent.builder();
            PublishStatusEvent pse = (PublishStatusEvent)((PublishStatusEvent.Builder)((PublishStatusEvent.Builder)((PublishStatusEvent.Builder)builder.subject(this.mKey.subject())).publisherId(this.mPublisherId)).position(this.mStatusPosition)).key(this.mKey).feedState(update).build();
            ++this.mStatusPosition;
            this.mFeedState = update;
            sLogger.debug("{} publisher {}, feed {}: setting {} feed state to {} ({}).", new Object[]{this.mPublishFeed.location(), this.mPublishFeed.clientId(), this.mPublishFeed.feedId(), this.mKey, update, this.mScope});
            this.mPublishFeed.updateFeedState(update);
            try {
                this.mMessageStore.store(pse);
            }
            catch (Exception jex) {
                sLogger.warn("{}: attempt to persist {} failed:", new Object[]{this.mName, this.mStatusKey, jex});
            }
            if (this.mStatusFeed.isFeedUp()) {
                this.mStatusFeed.publish((ENotificationMessage)pse);
            }
        }
    }

    public void publish(ENotificationMessage msg) {
        Objects.requireNonNull(msg, "msg is null");
        if (!this.mKey.equals((Object)msg.key())) {
            throw new IllegalArgumentException(String.format("received msg key %s, expected %s", msg.key(), this.mKey));
        }
        if (msg.publisherId != this.mPublisherId) {
            throw new IllegalArgumentException(String.format("received publisher ID %d, expected %d", msg.publisherId, this.mPublisherId));
        }
        if (!this.mInPlace) {
            throw new IllegalStateException("feed not advertised");
        }
        if (this.mFeedState != EFeedState.UP) {
            throw new IllegalStateException("publish state is down");
        }
        try {
            this.mMessageStore.store(msg);
        }
        catch (Exception jex) {
            sLogger.warn("{}: attempt to persist {} failed:", new Object[]{this.mName, this.mKey, jex});
        }
        if (this.mPublishFeed.isFeedUp()) {
            this.mPublishFeed.publish(msg);
        }
    }

    public static Builder builder(EMessageKey key, IEHistoricPublisher publisher) {
        Objects.requireNonNull(key, "message key is null");
        Objects.requireNonNull(publisher, "publisher is null");
        if (!key.isNotification()) {
            throw new IllegalArgumentException("not a notification message key");
        }
        return new Builder(publisher, key);
    }

    private void sendHistoricReplies(Collection<ENotificationMessage> history, EReplyFeed.ERequest request) {
        Iterator<ENotificationMessage> nIt = history.iterator();
        ArrayList<ENotificationMessage> notifications = new ArrayList<ENotificationMessage>(this.mNotificationsPerReply);
        while (nIt.hasNext()) {
            notifications.add(nIt.next());
            if (notifications.size() != this.mNotificationsPerReply) continue;
            EReplyMessage.ReplyStatus status = nIt.hasNext() ? EReplyMessage.ReplyStatus.OK_CONTINUING : EReplyMessage.ReplyStatus.OK_FINAL;
            this.postReply(status, notifications, request);
            notifications.clear();
        }
        if (!notifications.isEmpty()) {
            this.postReply(EReplyMessage.ReplyStatus.OK_FINAL, notifications, request);
        }
    }

    private void postReply(EReplyMessage.ReplyStatus status, Collection<ENotificationMessage> notifications, EReplyFeed.ERequest request) {
        HistoricReply.Builder builder = HistoricReply.builder();
        request.reply((EReplyMessage)((HistoricReply.Builder)((HistoricReply.Builder)((HistoricReply.Builder)builder.subject(this.mRequestKey.subject())).timestamp(Instant.now())).replyStatus(status)).notifications(notifications).build());
    }

    private void postEmptyReply(EInterval interval, EReplyFeed.ERequest request) {
        String reason = String.format("no notification messages within %s", interval);
        EReplyMessage.Builder builder = EReplyMessage.builder();
        request.reply((EReplyMessage)((EReplyMessage.Builder)((EReplyMessage.Builder)builder.subject(this.mRequestKey.subject())).timestamp(Instant.now())).replyStatus(EReplyMessage.ReplyStatus.OK_FINAL).replyReason(reason).build());
    }

    private void postErrorReply(String reason, EReplyFeed.ERequest request) {
        EReplyMessage.Builder builder = EReplyMessage.builder();
        request.reply((EReplyMessage)((EReplyMessage.Builder)((EReplyMessage.Builder)builder.subject(this.mRequestKey.subject())).timestamp(Instant.now())).replyStatus(EReplyMessage.ReplyStatus.ERROR).replyReason(reason).build());
    }

    private void forwardPublishStatus(EFeedState feedState) {
        try {
            this.mStatusCallback.call(feedState, this);
        }
        catch (Exception jex) {
            sLogger.warn("{}: publisher publish status callback exception:", (Object)this.mName, (Object)jex);
        }
    }

    public static final class Builder
    extends EAbstractHistoricFeed.Builder<IEHistoricPublisher, EHistoricPublishFeed, Builder> {
        private HistoricPublishStatusCallback mStatusCallback;
        private IEMessageStore mMessageStore;
        private int mNotificationsPerReply = 5;

        private Builder(IEHistoricPublisher publisher, EMessageKey key) {
            super(publisher, EHistoricPublishFeed.class, key);
        }

        @Override
        protected Builder self() {
            return this;
        }

        @Override
        protected Validator validate(Validator problems) {
            return super.validate(problems).requireNotNull((Object)this.mMessageStore, "messageStore");
        }

        @Override
        protected String generateName() {
            return EHistoricPublishFeed.DEFAULT_PUBLISH_FEED_NAME + sFeedIndex.getAndIncrement();
        }

        @Override
        protected EHistoricPublishFeed buildImpl() {
            return new EHistoricPublishFeed(this);
        }

        private HistoricPublishStatusCallback getStatusCallback() {
            if (this.mStatusCallback == null) {
                this.mStatusCallback = ((IEHistoricPublisher)this.mOwner)::publishStatus;
            }
            return this.mStatusCallback;
        }

        public Builder statusCallback(@Nullable HistoricPublishStatusCallback cb) {
            this.mStatusCallback = cb;
            return this;
        }

        public Builder messageStore(IEMessageStore store) {
            Objects.requireNonNull(store, "message store is null");
            if (!store.isOpen()) {
                throw new IllegalArgumentException("message store is closed");
            }
            this.mMessageStore = store;
            return this;
        }

        public Builder notificationsPerReply(int n) {
            if (n <= 0) {
                throw new IllegalArgumentException("notificationsPerReply <= zero");
            }
            this.mNotificationsPerReply = n;
            return this;
        }
    }
}

