/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.catalog.server.state;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import net.kuujo.catalog.client.request.PublishRequest;
import net.kuujo.catalog.client.response.PublishResponse;
import net.kuujo.catalog.client.response.Response;
import net.kuujo.catalog.client.session.Session;
import net.kuujo.catalog.server.state.ServerStateMachineContext;
import net.kuujo.catalyst.transport.Connection;
import net.kuujo.catalyst.util.Assert;
import net.kuujo.catalyst.util.Listener;
import net.kuujo.catalyst.util.Listeners;

class ServerSession
implements Session {
    protected final Listeners<Object> listeners = new Listeners();
    private final long id;
    private final UUID connectionId;
    private final ServerStateMachineContext context;
    private final long timeout;
    private Connection connection;
    private long version;
    private long command;
    private long commandLowWaterMark;
    private long eventVersion;
    private long eventSequence;
    private long eventAckVersion;
    private long timestamp;
    private final Queue<List<Runnable>> queriesPool = new ArrayDeque<List<Runnable>>();
    private final Map<Long, List<Runnable>> queries = new HashMap<Long, List<Runnable>>();
    private final Map<Long, Runnable> commands = new HashMap<Long, Runnable>();
    private final Map<Long, Object> responses = new HashMap<Long, Object>();
    private final Queue<EventHolder> events = new ArrayDeque<EventHolder>();
    private final Queue<EventHolder> eventsPool = new ArrayDeque<EventHolder>();
    private boolean expired;
    private boolean closed;
    private final Listeners<Session> openListeners = new Listeners();
    private final Listeners<Session> closeListeners = new Listeners();

    ServerSession(long id, UUID connectionId, ServerStateMachineContext context, long timeout) {
        if (connectionId == null) {
            throw new NullPointerException("connection cannot be null");
        }
        this.id = id;
        this.version = id;
        this.connectionId = connectionId;
        this.context = context;
        this.timeout = timeout;
    }

    public long id() {
        return this.id;
    }

    UUID connection() {
        return this.connectionId;
    }

    long timeout() {
        return this.timeout;
    }

    long getTimestamp() {
        return this.timestamp;
    }

    ServerSession setTimestamp(long timestamp) {
        this.timestamp = Math.max(this.timestamp, timestamp);
        return this;
    }

    long getVersion() {
        return this.version;
    }

    ServerSession setVersion(long version) {
        if (version > this.version) {
            long i = this.version + 1L;
            while (i <= version) {
                List<Runnable> queries = this.queries.remove(i);
                if (queries != null) {
                    for (Runnable query : queries) {
                        query.run();
                    }
                    queries.clear();
                    this.queriesPool.add(queries);
                }
                this.version = i++;
            }
        }
        return this;
    }

    ServerSession registerCommand(long sequence, Runnable runnable) {
        this.commands.put(sequence, runnable);
        return this;
    }

    long getSequence() {
        return this.command;
    }

    long nextSequence() {
        return this.command + 1L;
    }

    ServerSession setSequence(long sequence) {
        if (sequence > this.command) {
            for (long i = 0L; i < sequence - this.command; ++i) {
                ++this.command;
                Runnable command = this.commands.remove(this.command + 1L);
                if (command == null) continue;
                command.run();
            }
        }
        return this;
    }

    ServerSession registerQuery(long version, Runnable query) {
        List queries = this.queries.computeIfAbsent(version, v -> {
            ArrayList q = this.queriesPool.poll();
            return q != null ? q : new ArrayList(128);
        });
        queries.add(query);
        return this;
    }

    ServerSession registerResponse(long sequence, Object response) {
        this.responses.put(sequence, response);
        return this;
    }

    ServerSession clearResponses(long version) {
        if (version > this.commandLowWaterMark) {
            long i = this.commandLowWaterMark + 1L;
            while (i <= version) {
                this.responses.remove(i);
                this.commandLowWaterMark = i++;
            }
        }
        return this;
    }

    Object getResponse(long sequence) {
        return this.responses.get(sequence);
    }

    ServerSession setConnection(Connection connection) {
        this.connection = connection;
        if (connection != null) {
            if (!connection.id().equals(this.connectionId)) {
                throw new IllegalArgumentException("connection must match session connection ID");
            }
            connection.handler(PublishRequest.class, this::handlePublish);
        }
        return this;
    }

    public CompletableFuture<Void> publish(Object event) {
        if (this.eventAckVersion > this.context.version()) {
            return CompletableFuture.completedFuture(null);
        }
        long previousVersion = this.eventVersion;
        long previousSequence = this.eventSequence++;
        if (this.eventVersion != this.context.version()) {
            this.eventVersion = this.context.version();
            this.eventSequence = 1L;
        }
        EventHolder holder = this.eventsPool.poll();
        if (holder == null) {
            holder = new EventHolder();
        }
        holder.eventVersion = this.eventVersion;
        holder.eventSequence = this.eventSequence;
        holder.previousVersion = previousVersion;
        holder.previousSequence = previousSequence;
        holder.event = event;
        this.events.add(holder);
        this.sendEvent(holder);
        return CompletableFuture.completedFuture(null);
    }

    public Listener onEvent(Consumer listener) {
        return this.listeners.add((Consumer)Assert.notNull((Object)listener, (String)"listener"));
    }

    ServerSession clearEvents(long version, long sequence) {
        if (version >= this.eventAckVersion) {
            this.eventAckVersion = version;
            EventHolder holder = this.events.peek();
            while (holder != null && (holder.eventVersion < version || holder.eventVersion == version && holder.eventSequence <= sequence)) {
                this.events.remove();
                this.eventsPool.add(holder);
            }
        }
        return this;
    }

    private ServerSession resendEvents(long version, long sequence) {
        if (version > this.eventAckVersion) {
            this.clearEvents(version, sequence);
            for (EventHolder holder : this.events) {
                this.sendEvent(holder);
            }
        }
        return this;
    }

    private void sendEvent(EventHolder event) {
        if (this.connection != null) {
            this.connection.send((Object)((PublishRequest.Builder)PublishRequest.builder().withSession(this.id())).withEventVersion(event.eventVersion).withEventSequence(event.eventSequence).withPreviousVersion(event.previousVersion).withPreviousSequence(event.previousSequence).withMessage(event.event).build()).whenComplete((response, error) -> {
                if (this.isOpen() && error == null) {
                    if (response.status() == Response.Status.OK) {
                        this.clearEvents(response.version(), response.sequence());
                    } else {
                        this.clearEvents(response.version(), response.sequence());
                        this.resendEvents(response.version(), response.sequence());
                    }
                }
            });
        }
    }

    protected CompletableFuture<PublishResponse> handlePublish(PublishRequest request) {
        for (Listener listener : this.listeners) {
            listener.accept(request.message());
        }
        return CompletableFuture.completedFuture(((PublishResponse.Builder)PublishResponse.builder().withStatus(Response.Status.OK)).build());
    }

    public boolean isOpen() {
        return !this.closed;
    }

    public Listener<Session> onOpen(Consumer<Session> listener) {
        return this.openListeners.add((Consumer)Assert.notNull(listener, (String)"listener"));
    }

    void close() {
        this.closed = true;
        for (Listener listener : this.closeListeners) {
            listener.accept((Object)this);
        }
    }

    public Listener<Session> onClose(Consumer<Session> listener) {
        Listener context = this.closeListeners.add((Consumer)Assert.notNull(listener, (String)"listener"));
        if (this.closed) {
            context.accept((Object)this);
        }
        return context;
    }

    public boolean isClosed() {
        return this.closed;
    }

    void expire() {
        this.closed = true;
        this.expired = true;
        for (Listener listener : this.closeListeners) {
            listener.accept((Object)this);
        }
    }

    public boolean isExpired() {
        return this.expired;
    }

    public String toString() {
        return String.format("Session[id=%d]", this.id);
    }

    private static class EventHolder {
        private long eventVersion;
        private long eventSequence;
        private long previousVersion;
        private long previousSequence;
        private Object event;

        private EventHolder() {
        }
    }
}

