/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.vertigo.io.port.impl;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import net.kuujo.vertigo.hook.ComponentHook;
import net.kuujo.vertigo.hook.OutputHook;
import net.kuujo.vertigo.io.batch.OutputBatch;
import net.kuujo.vertigo.io.batch.impl.BaseOutputBatch;
import net.kuujo.vertigo.io.group.OutputGroup;
import net.kuujo.vertigo.io.group.impl.BaseOutputGroup;
import net.kuujo.vertigo.io.port.OutputPort;
import net.kuujo.vertigo.io.port.OutputPortContext;
import net.kuujo.vertigo.io.stream.OutputStream;
import net.kuujo.vertigo.io.stream.OutputStreamContext;
import net.kuujo.vertigo.io.stream.impl.DefaultOutputStream;
import net.kuujo.vertigo.util.Args;
import net.kuujo.vertigo.util.CountingCompletionHandler;
import net.kuujo.vertigo.util.Observer;
import net.kuujo.vertigo.util.Task;
import net.kuujo.vertigo.util.TaskRunner;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.impl.DefaultFutureResult;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;

public class DefaultOutputPort
implements OutputPort,
Observer<OutputPortContext> {
    private static final Logger log = LoggerFactory.getLogger(DefaultOutputPort.class);
    private static final int DEFAULT_SEND_QUEUE_MAX_SIZE = 10000;
    private final Vertx vertx;
    private OutputPortContext context;
    private final List<OutputStream> streams = new ArrayList<OutputStream>();
    private List<OutputHook> hooks = new ArrayList<OutputHook>();
    private final TaskRunner tasks = new TaskRunner();
    private int maxQueueSize = 10000;
    private Handler<Void> drainHandler;
    private boolean open;

    public DefaultOutputPort(Vertx vertx, OutputPortContext context) {
        this.vertx = vertx;
        this.context = context;
        this.hooks = context.hooks();
        for (ComponentHook hook : context.output().instance().component().hooks()) {
            this.hooks.add(hook);
        }
        context.registerObserver(this);
    }

    @Override
    public String name() {
        return this.context.name();
    }

    @Override
    public Vertx vertx() {
        return this.vertx;
    }

    @Override
    public void update(final OutputPortContext update) {
        this.tasks.runTask(new Handler<Task>(){

            public void handle(final Task task) {
                Iterator iter = DefaultOutputPort.this.streams.iterator();
                while (iter.hasNext()) {
                    final OutputStream stream = (OutputStream)iter.next();
                    boolean exists = false;
                    for (OutputStreamContext outputStreamContext : update.streams()) {
                        if (!outputStreamContext.address().equals(stream.address())) continue;
                        exists = true;
                        break;
                    }
                    if (exists) continue;
                    stream.close(new Handler<AsyncResult<Void>>(){

                        public void handle(AsyncResult<Void> result) {
                            if (result.failed()) {
                                log.error((Object)("Failed to close output stream " + stream.address()));
                            }
                        }
                    });
                    iter.remove();
                }
                ArrayList<DefaultOutputStream> newStreams = new ArrayList<DefaultOutputStream>();
                for (OutputStreamContext outputStreamContext : update.streams()) {
                    boolean bl = false;
                    for (OutputStream stream : DefaultOutputPort.this.streams) {
                        if (!stream.address().equals(outputStreamContext.address())) continue;
                        bl = true;
                        break;
                    }
                    if (!bl) {
                        newStreams.add(new DefaultOutputStream(DefaultOutputPort.this.vertx, outputStreamContext));
                    }
                    if (bl) continue;
                    DefaultOutputStream stream = new DefaultOutputStream(DefaultOutputPort.this.vertx, outputStreamContext);
                    if (DefaultOutputPort.this.open) {
                        stream.open();
                    }
                    DefaultOutputPort.this.streams.add(stream);
                }
                if (DefaultOutputPort.this.open) {
                    CountingCompletionHandler<Void> counter = new CountingCompletionHandler<Void>(newStreams.size());
                    counter.setHandler(new Handler<AsyncResult<Void>>(){

                        public void handle(AsyncResult<Void> result) {
                            DefaultOutputPort.this.hooks = update.hooks();
                            task.complete();
                        }
                    });
                    for (final OutputStream outputStream : newStreams) {
                        outputStream.open(new Handler<AsyncResult<Void>>(){

                            public void handle(AsyncResult<Void> result) {
                                if (result.failed()) {
                                    log.error((Object)("Failed to open output stream " + outputStream.address()));
                                } else {
                                    DefaultOutputPort.this.streams.add(outputStream);
                                }
                            }
                        });
                    }
                } else {
                    for (OutputStream outputStream : newStreams) {
                        DefaultOutputPort.this.streams.add(outputStream);
                    }
                    DefaultOutputPort.this.hooks = update.hooks();
                    task.complete();
                }
            }
        });
    }

    @Override
    public OutputPort setSendQueueMaxSize(int maxSize) {
        Args.checkPositive(maxSize, "max size must be a positive number", new Object[0]);
        this.maxQueueSize = maxSize;
        for (OutputStream stream : this.streams) {
            stream.setSendQueueMaxSize(this.maxQueueSize);
        }
        return this;
    }

    @Override
    public int getSendQueueMaxSize() {
        return this.maxQueueSize;
    }

    @Override
    public int size() {
        int highest = 0;
        for (OutputStream stream : this.streams) {
            highest = Math.max(highest, stream.size());
        }
        return highest;
    }

    @Override
    public boolean sendQueueFull() {
        for (OutputStream stream : this.streams) {
            if (!stream.sendQueueFull()) continue;
            return true;
        }
        return false;
    }

    @Override
    public OutputPort drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        for (OutputStream stream : this.streams) {
            stream.drainHandler(handler);
        }
        return this;
    }

    @Override
    public OutputPort open() {
        return this.open((Handler)null);
    }

    @Override
    public OutputPort open(final Handler<AsyncResult<Void>> doneHandler) {
        this.tasks.runTask(new Handler<Task>(){

            public void handle(final Task task) {
                if (!DefaultOutputPort.this.open) {
                    DefaultOutputPort.this.streams.clear();
                    DefaultOutputPort.this.open = true;
                    final CountingCompletionHandler<Void> counter = new CountingCompletionHandler<Void>(DefaultOutputPort.this.context.streams().size());
                    counter.setHandler(new Handler<AsyncResult<Void>>(){

                        public void handle(AsyncResult<Void> result) {
                            if (doneHandler != null) {
                                doneHandler.handle(result);
                            }
                            task.complete();
                        }
                    });
                    for (OutputStreamContext output : DefaultOutputPort.this.context.streams()) {
                        final DefaultOutputStream stream = new DefaultOutputStream(DefaultOutputPort.this.vertx, output);
                        stream.setSendQueueMaxSize(DefaultOutputPort.this.maxQueueSize);
                        stream.drainHandler((Handler<Void>)DefaultOutputPort.this.drainHandler);
                        stream.open(new Handler<AsyncResult<Void>>(){

                            public void handle(AsyncResult<Void> result) {
                                if (result.failed()) {
                                    log.error((Object)("Failed to open output stream " + stream.address()));
                                    counter.fail(result.cause());
                                } else {
                                    DefaultOutputPort.this.streams.add(stream);
                                    counter.succeed();
                                }
                            }
                        });
                    }
                } else {
                    new DefaultFutureResult((Object)null).setHandler(doneHandler);
                    task.complete();
                }
            }
        });
        return this;
    }

    @Override
    public void close() {
        this.close(null);
    }

    @Override
    public void close(final Handler<AsyncResult<Void>> doneHandler) {
        this.tasks.runTask(new Handler<Task>(){

            public void handle(final Task task) {
                if (DefaultOutputPort.this.open) {
                    ArrayList streams = new ArrayList(DefaultOutputPort.this.streams);
                    DefaultOutputPort.this.streams.clear();
                    DefaultOutputPort.this.open = false;
                    CountingCompletionHandler<Void> counter = new CountingCompletionHandler<Void>(streams.size());
                    counter.setHandler(new Handler<AsyncResult<Void>>(){

                        public void handle(AsyncResult<Void> result) {
                            if (doneHandler != null) {
                                doneHandler.handle(result);
                            }
                            task.complete();
                        }
                    });
                    for (OutputStream stream : streams) {
                        stream.close(counter);
                    }
                } else {
                    new DefaultFutureResult((Object)null).setHandler(doneHandler);
                    task.complete();
                }
            }
        });
    }

    @Override
    public OutputPort batch(Handler<OutputBatch> handler) {
        return this.batch(UUID.randomUUID().toString(), (Object)null, (Handler)handler);
    }

    @Override
    public OutputPort batch(Object args, Handler<OutputBatch> handler) {
        return this.batch(UUID.randomUUID().toString(), args, (Handler)handler);
    }

    @Override
    public OutputPort batch(final String id, Object args, final Handler<OutputBatch> handler) {
        final ArrayList<OutputBatch> batches = new ArrayList<OutputBatch>();
        final int streamsSize = this.streams.size();
        if (streamsSize == 0) {
            handler.handle((Object)new BaseOutputBatch(id, this.vertx, batches));
        } else {
            for (OutputStream stream : this.streams) {
                stream.batch(id, args, new Handler<OutputBatch>(){

                    public void handle(OutputBatch batch) {
                        batches.add(batch);
                        if (batches.size() == streamsSize) {
                            handler.handle((Object)new BaseOutputBatch(id, DefaultOutputPort.this.vertx, batches));
                        }
                    }
                });
            }
        }
        return this;
    }

    @Override
    public OutputPort group(Handler<OutputGroup> handler) {
        return this.group(UUID.randomUUID().toString(), (Object)null, (Handler)handler);
    }

    @Override
    public OutputPort group(String name, Handler<OutputGroup> handler) {
        return this.group(name, (Object)null, (Handler)handler);
    }

    @Override
    public OutputPort group(final String name, Object args, final Handler<OutputGroup> handler) {
        final ArrayList<OutputGroup> groups = new ArrayList<OutputGroup>();
        final int streamsSize = this.streams.size();
        if (streamsSize == 0) {
            handler.handle((Object)new BaseOutputGroup(name, this.vertx, groups));
        } else {
            for (OutputStream stream : this.streams) {
                stream.group(name, args, new Handler<OutputGroup>(){

                    public void handle(OutputGroup group) {
                        groups.add(group);
                        if (groups.size() == streamsSize) {
                            handler.handle((Object)new BaseOutputGroup(name, DefaultOutputPort.this.vertx, groups));
                        }
                    }
                });
            }
        }
        return this;
    }

    private void triggerSend(Object message) {
        for (OutputHook hook : this.hooks) {
            hook.handleSend(message);
        }
    }

    @Override
    public OutputPort send(Object message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(String message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Boolean message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Character message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Short message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Integer message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Long message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Double message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Float message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(JsonObject message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(JsonArray message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Byte message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(byte[] message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }

    @Override
    public OutputPort send(Buffer message) {
        for (OutputStream stream : this.streams) {
            stream.send(message);
        }
        this.triggerSend(message);
        return this;
    }
}

