/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.catalyst.transport;

import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import net.kuujo.catalyst.buffer.Buffer;
import net.kuujo.catalyst.transport.Connection;
import net.kuujo.catalyst.transport.MessageHandler;
import net.kuujo.catalyst.transport.TransportException;
import net.kuujo.catalyst.util.Assert;
import net.kuujo.catalyst.util.Listener;
import net.kuujo.catalyst.util.Listeners;
import net.kuujo.catalyst.util.ReferenceCounted;
import net.kuujo.catalyst.util.concurrent.Context;
import net.kuujo.catalyst.util.concurrent.Futures;

public class LocalConnection
implements Connection {
    private final UUID id;
    private final Context context;
    private final Set<LocalConnection> connections;
    private LocalConnection connection;
    private final Map<Class, HandlerHolder> handlers = new ConcurrentHashMap<Class, HandlerHolder>();
    private final Listeners<Throwable> exceptionListeners = new Listeners();
    private final Listeners<Connection> closeListeners = new Listeners();

    public LocalConnection(UUID id, Context context) {
        this(id, context, null);
    }

    public LocalConnection(UUID id, Context context, Set<LocalConnection> connections) {
        this.id = id;
        this.context = context;
        this.connections = connections;
    }

    public LocalConnection connect(LocalConnection connection) {
        this.connection = connection;
        return this;
    }

    public UUID id() {
        return this.id;
    }

    private Context getContext() {
        Context context = Context.currentContext();
        Assert.state((context != null ? 1 : 0) != 0, (String)"not on a Catalyst thread", (Object[])new Object[0]);
        return context;
    }

    public <T, U> CompletableFuture<U> send(T request) {
        Assert.notNull(request, (String)"request");
        Context context = this.getContext();
        CompletableFuture future = new CompletableFuture();
        Buffer requestBuffer = context.serializer().writeObject(request);
        this.connection.receive(requestBuffer.flip()).whenCompleteAsync((responseBuffer, error) -> {
            int status = responseBuffer.readByte();
            if (status == 1) {
                future.complete(context.serializer().readObject(responseBuffer));
            } else {
                future.completeExceptionally((Throwable)context.serializer().readObject(responseBuffer));
            }
            responseBuffer.release();
        }, context.executor());
        if (request instanceof ReferenceCounted) {
            ((ReferenceCounted)request).release();
        }
        return future;
    }

    private CompletableFuture<Buffer> receive(Buffer requestBuffer) {
        Context context = this.getContext();
        Object request = context.serializer().readObject(requestBuffer);
        requestBuffer.release();
        HandlerHolder holder = this.handlers.get(request.getClass());
        if (holder != null) {
            MessageHandler handler = holder.handler;
            CompletableFuture<Buffer> future = new CompletableFuture<Buffer>();
            holder.context.executor().execute(() -> handler.handle(request).whenCompleteAsync((response, error) -> {
                Buffer responseBuffer = context.serializer().allocate();
                if (error == null) {
                    responseBuffer.writeByte(1);
                    context.serializer().writeObject(response, responseBuffer);
                } else {
                    responseBuffer.writeByte(0);
                    context.serializer().writeObject(error, responseBuffer);
                }
                future.complete(responseBuffer.flip());
                if (response instanceof ReferenceCounted) {
                    ((ReferenceCounted)response).release();
                }
            }, context.executor()));
            return future;
        }
        return Futures.exceptionalFuture((Throwable)new TransportException("no handler registered"));
    }

    public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
        Assert.notNull(type, (String)"type");
        if (handler != null) {
            this.handlers.put(type, new HandlerHolder(handler, this.getContext()));
        } else {
            this.handlers.remove(type);
        }
        return this;
    }

    public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
        return this.exceptionListeners.add((Consumer)Assert.notNull(listener, (String)"listener"));
    }

    public Listener<Connection> closeListener(Consumer<Connection> listener) {
        return this.closeListeners.add((Consumer)Assert.notNull(listener, (String)"listener"));
    }

    public CompletableFuture<Void> close() {
        this.doClose();
        this.connection.doClose();
        return this.getContext().execute(() -> null);
    }

    private void doClose() {
        if (this.connections != null) {
            this.connections.remove(this);
        }
        for (Consumer closeListener : this.closeListeners) {
            this.context.executor().execute(() -> closeListener.accept(this));
        }
    }

    protected static class HandlerHolder {
        private final MessageHandler handler;
        private final Context context;

        private HandlerHolder(MessageHandler handler, Context context) {
            this.handler = handler;
            this.context = context;
        }
    }
}

