/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.vertigo.io.connection.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import net.kuujo.vertigo.hook.OutputHook;
import net.kuujo.vertigo.io.connection.OutputConnection;
import net.kuujo.vertigo.io.connection.OutputConnectionContext;
import net.kuujo.vertigo.io.connection.impl.ConnectionOutputGroup;
import net.kuujo.vertigo.io.connection.impl.DefaultOutputConnectionContext;
import net.kuujo.vertigo.io.group.OutputGroup;
import net.kuujo.vertigo.io.impl.OutputSerializer;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.eventbus.ReplyFailure;
import org.vertx.java.core.impl.DefaultFutureResult;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;

public class DefaultOutputConnection
implements OutputConnection {
    private static final int DEFAULT_MAX_QUEUE_SIZE = 10000;
    private final Vertx vertx;
    private final EventBus eventBus;
    private final OutputConnectionContext context;
    private final String outAddress;
    private final String inAddress;
    private final OutputSerializer serializer = new OutputSerializer();
    private List<OutputHook> hooks = new ArrayList<OutputHook>();
    private int maxQueueSize = 10000;
    private Handler<Void> drainHandler;
    private long currentMessage = 1L;
    private final TreeMap<Long, JsonObject> messages = new TreeMap();
    private final Map<String, ConnectionOutputGroup> groups = new HashMap<String, ConnectionOutputGroup>();
    private boolean open;
    private boolean full;
    private boolean paused;
    private final Handler<Message<JsonObject>> internalMessageHandler = new Handler<Message<JsonObject>>(){

        public void handle(Message<JsonObject> message) {
            String action = ((JsonObject)message.body()).getString("action");
            if (action != null) {
                switch (action) {
                    case "start": {
                        DefaultOutputConnection.this.doStart(((JsonObject)message.body()).getString("group"));
                        break;
                    }
                    case "ack": {
                        DefaultOutputConnection.this.doAck(((JsonObject)message.body()).getLong("id"));
                        break;
                    }
                    case "fail": {
                        DefaultOutputConnection.this.doFail(((JsonObject)message.body()).getLong("id"));
                        break;
                    }
                    case "pause": {
                        DefaultOutputConnection.this.doPause(((JsonObject)message.body()).getLong("id"));
                        break;
                    }
                    case "resume": {
                        DefaultOutputConnection.this.doResume(((JsonObject)message.body()).getLong("id"));
                    }
                }
            }
        }
    };

    public DefaultOutputConnection(Vertx vertx, String address) {
        this(vertx, (OutputConnectionContext)((DefaultOutputConnectionContext.Builder)DefaultOutputConnectionContext.Builder.newBuilder().setAddress(address)).build());
    }

    public DefaultOutputConnection(Vertx vertx, OutputConnectionContext context) {
        this.vertx = vertx;
        this.eventBus = vertx.eventBus();
        this.context = context;
        this.hooks = context.hooks();
        this.outAddress = String.format("%s.out", context.address());
        this.inAddress = String.format("%s.in", context.address());
    }

    @Override
    public String address() {
        return this.context.address();
    }

    @Override
    public Vertx vertx() {
        return this.vertx;
    }

    @Override
    public OutputConnection open() {
        return this.open((Handler)null);
    }

    @Override
    public OutputConnection open(final Handler<AsyncResult<Void>> doneHandler) {
        this.eventBus.registerHandler(this.outAddress, this.internalMessageHandler, (Handler)new Handler<AsyncResult<Void>>(){

            public void handle(AsyncResult<Void> result) {
                if (result.failed()) {
                    new DefaultFutureResult(result.cause()).setHandler(doneHandler);
                } else {
                    DefaultOutputConnection.this.connect((Handler<AsyncResult<Void>>)doneHandler);
                }
            }
        });
        return this;
    }

    private void connect(final Handler<AsyncResult<Void>> doneHandler) {
        this.eventBus.sendWithTimeout(this.inAddress, new JsonObject().putString("action", "connect"), 5000L, (Handler)new Handler<AsyncResult<Message<Boolean>>>(){

            public void handle(AsyncResult<Message<Boolean>> result) {
                if (result.failed()) {
                    ReplyException failure = (ReplyException)result.cause();
                    if (failure.failureType().equals((Object)ReplyFailure.RECIPIENT_FAILURE)) {
                        new DefaultFutureResult((Throwable)failure).setHandler(doneHandler);
                    } else {
                        DefaultOutputConnection.this.connect((Handler<AsyncResult<Void>>)doneHandler);
                    }
                } else if (((Boolean)((Message)result.result()).body()).booleanValue()) {
                    DefaultOutputConnection.this.open = true;
                    new DefaultFutureResult((Object)null).setHandler(doneHandler);
                } else {
                    DefaultOutputConnection.this.connect((Handler<AsyncResult<Void>>)doneHandler);
                }
            }
        });
    }

    @Override
    public OutputConnection setSendQueueMaxSize(int maxSize) {
        this.maxQueueSize = maxSize;
        return this;
    }

    @Override
    public int getSendQueueMaxSize() {
        return this.maxQueueSize;
    }

    @Override
    public int size() {
        return this.messages.size();
    }

    @Override
    public boolean sendQueueFull() {
        return this.paused || this.messages.size() >= this.maxQueueSize;
    }

    @Override
    public OutputConnection drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    @Override
    public OutputConnection group(String name, Handler<OutputGroup> handler) {
        ConnectionOutputGroup group = new ConnectionOutputGroup(UUID.randomUUID().toString(), name, this);
        this.groups.put(group.id(), group);
        group.start(handler);
        return this;
    }

    ConnectionOutputGroup group(String name, String parent, Handler<OutputGroup> handler) {
        ConnectionOutputGroup group = new ConnectionOutputGroup(UUID.randomUUID().toString(), name, parent, this);
        this.groups.put(group.id(), group);
        group.start(handler);
        return group;
    }

    @Override
    public void close() {
        this.close(null);
    }

    @Override
    public void close(final Handler<AsyncResult<Void>> doneHandler) {
        this.eventBus.unregisterHandler(this.outAddress, this.internalMessageHandler, (Handler)new Handler<AsyncResult<Void>>(){

            public void handle(AsyncResult<Void> result) {
                if (result.failed()) {
                    new DefaultFutureResult(result.cause()).setHandler(doneHandler);
                } else {
                    DefaultOutputConnection.this.disconnect((Handler<AsyncResult<Void>>)doneHandler);
                }
            }
        });
    }

    private void disconnect(final Handler<AsyncResult<Void>> doneHandler) {
        this.eventBus.sendWithTimeout(this.inAddress, new JsonObject().putString("action", "disconnect"), 5000L, (Handler)new Handler<AsyncResult<Message<Boolean>>>(){

            public void handle(AsyncResult<Message<Boolean>> result) {
                if (result.failed()) {
                    ReplyException failure = (ReplyException)result.cause();
                    if (failure.failureType().equals((Object)ReplyFailure.RECIPIENT_FAILURE)) {
                        new DefaultFutureResult((Throwable)failure).setHandler(doneHandler);
                    } else {
                        DefaultOutputConnection.this.disconnect((Handler<AsyncResult<Void>>)doneHandler);
                    }
                } else if (((Boolean)((Message)result.result()).body()).booleanValue()) {
                    DefaultOutputConnection.this.open = false;
                    new DefaultFutureResult((Object)null).setHandler(doneHandler);
                } else {
                    DefaultOutputConnection.this.disconnect((Handler<AsyncResult<Void>>)doneHandler);
                }
            }
        });
    }

    private void checkOpen() {
        if (!this.open) {
            throw new IllegalStateException("Connection to " + this.context.address() + " not open.");
        }
    }

    private void checkFull() {
        if (!this.full && this.messages.size() >= this.maxQueueSize) {
            this.full = true;
        }
    }

    private void checkDrain() {
        if (this.full && !this.paused && this.messages.size() < this.maxQueueSize / 2) {
            this.full = false;
            if (this.drainHandler != null) {
                this.drainHandler.handle((Object)null);
            }
        }
    }

    private void doStart(String groupID) {
        ConnectionOutputGroup group = this.groups.get(groupID);
        if (group != null) {
            group.handleStart();
        }
    }

    private void doAck(long id) {
        if (this.messages.containsKey(id + 1L)) {
            this.messages.tailMap(id + 1L);
        } else {
            this.messages.clear();
        }
        this.checkDrain();
    }

    private void doFail(long id) {
        if (this.messages.containsKey(id + 1L)) {
            for (long i = id + 1L; i <= this.messages.lastKey(); ++i) {
                this.eventBus.send(this.inAddress, this.messages.get(i));
            }
        }
    }

    private void doPause(long id) {
        this.paused = true;
    }

    private void doResume(long id) {
        if (this.paused) {
            this.paused = false;
            this.checkDrain();
        }
    }

    private OutputConnection doSend(Object value) {
        this.checkOpen();
        JsonObject message = this.createMessage(value).putString("action", "message");
        if (this.open && !this.paused) {
            this.eventBus.send(this.inAddress, message);
        }
        for (OutputHook hook : this.hooks) {
            hook.handleSend(value);
        }
        this.checkFull();
        return this;
    }

    void doGroupStart(String group, String name, String parent) {
        this.checkOpen();
        JsonObject message = this.createMessage().putString("group", group).putString("name", name).putString("parent", parent).putString("action", "start");
        if (this.open && !this.paused) {
            this.eventBus.send(this.inAddress, message);
        }
        this.checkFull();
    }

    void doGroupSend(String group, Object value) {
        this.checkOpen();
        JsonObject message = this.createMessage(value).putString("action", "group").putString("group", group);
        if (this.open && !this.paused) {
            this.eventBus.send(this.inAddress, message);
        }
        for (OutputHook hook : this.hooks) {
            hook.handleSend(value);
        }
        this.checkFull();
    }

    void doGroupEnd(String group) {
        this.checkOpen();
        JsonObject message = this.createMessage().putString("action", "end").putString("group", group);
        if (this.open && !this.paused) {
            this.eventBus.send(this.inAddress, message);
        }
        this.groups.remove(group);
    }

    private JsonObject createMessage() {
        JsonObject message = new JsonObject();
        long id = this.currentMessage++;
        message.putNumber("id", (Number)id);
        this.messages.put(id, message);
        return message;
    }

    private JsonObject createMessage(Object value) {
        JsonObject message = this.serializer.serialize(value);
        long id = this.currentMessage++;
        message.putNumber("id", (Number)id);
        this.messages.put(id, message);
        return message;
    }

    @Override
    public OutputConnection send(Object message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(String message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Boolean message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Character message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Short message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Integer message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Long message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Double message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Float message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Buffer message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(JsonObject message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(JsonArray message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(Byte message) {
        return this.doSend(message);
    }

    @Override
    public OutputConnection send(byte[] message) {
        return this.doSend(message);
    }
}

