/*
 * Decompiled with CFR 0.152.
 */
package software.coley.instrument.sock;

import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import software.coley.instrument.io.ByteBufferAllocator;
import software.coley.instrument.io.ByteBufferCompat;
import software.coley.instrument.io.ByteBufferDataInput;
import software.coley.instrument.io.ByteBufferDataOutput;
import software.coley.instrument.io.codec.StructureCodec;
import software.coley.instrument.message.AbstractMessage;
import software.coley.instrument.message.MessageFactory;
import software.coley.instrument.message.broadcast.AbstractBroadcastMessage;
import software.coley.instrument.sock.BroadcastListener;
import software.coley.instrument.sock.ResponseListener;
import software.coley.instrument.sock.WriteListener;
import software.coley.instrument.sock.WriteResult;
import software.coley.instrument.util.Logger;
import software.coley.instrument.util.NamedThreadFactory;

public class ChannelHandler {
    private static final int HEADER_SIZE = 10;
    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
    public static String threadNameClientAccept = "agent-client-accept-loop";
    public static String threadNameEventHandle = "agent-event-handling";
    public static String threadNameEventLoop = "agent-event-loop";
    public static String threadNameRead = "agent-read-loop";
    public static String threadNameWrite = "agent-write-loop";
    private final ExecutorService eventTaskRunner = Executors.newCachedThreadPool(new NamedThreadFactory(threadNameEventHandle));
    private final BlockingQueue<WriteResult<?>> writeQueue = new LinkedBlockingQueue();
    private final BlockingQueue<Runnable> eventQueue = new LinkedBlockingQueue<Runnable>();
    private final ByteChannel channel;
    private final ByteBufferAllocator allocator;
    private final MessageFactory factory;
    private final Consumer<ChannelHandler> closeHandler;
    private final Map<Integer, ResponseListener> responseListeners = new ConcurrentHashMap<Integer, ResponseListener>();
    private final AtomicInteger nextFrameId = new AtomicInteger(0);
    private ResponseListener allResponsesListener;
    private BroadcastListener broadcastListener;
    private WriteListener writeListener;
    private Future<?> readLoopFuture;
    private Future<?> writeLoopFuture;
    private Future<?> eventLoopFuture;
    private boolean running;

    public ChannelHandler(ByteChannel channel, ByteBufferAllocator allocator, MessageFactory factory, Consumer<ChannelHandler> closeHandler) {
        this.channel = channel;
        this.allocator = allocator;
        this.factory = factory;
        this.closeHandler = closeHandler;
    }

    public void start() {
        if (!this.running) {
            this.running = true;
            this.readLoopFuture = Executors.newSingleThreadExecutor(new NamedThreadFactory(threadNameRead)).submit(this::readLoop);
            this.writeLoopFuture = Executors.newSingleThreadExecutor(new NamedThreadFactory(threadNameWrite)).submit(this::writeLoop);
            this.eventLoopFuture = Executors.newSingleThreadExecutor(new NamedThreadFactory(threadNameEventLoop)).submit(this::eventLoop);
        }
    }

    public void shutdown() {
        if (this.running) {
            Logger.info("Closing channel " + this.channel.toString());
            this.running = false;
            this.eventQueue.clear();
            this.writeQueue.clear();
            this.readLoopFuture.cancel(true);
            this.writeLoopFuture.cancel(true);
            this.eventLoopFuture.cancel(true);
            if (this.closeHandler != null) {
                this.closeHandler.accept(this);
            }
        }
    }

    public <T extends AbstractMessage> WriteResult<T> write(T value, int frameId) {
        MessageFactory.MessageInfo info = this.factory.getInfo(value);
        WriteResult writeResult = new WriteResult(info.getCodec(), frameId, info.getId(), value);
        this.writeQueue.add(writeResult);
        return writeResult;
    }

    private void eventLoop() {
        while (this.running) {
            try {
                Runnable take = this.eventQueue.take();
                Logger.debug("Channel event fire");
                this.eventTaskRunner.submit(take);
            }
            catch (InterruptedException take) {
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }

    private void readLoop() {
        try {
            ByteBuffer headerBuffer = this.allocator.allocate(10);
            while (this.running) {
                ByteBuffer contentBuffer;
                while (headerBuffer.position() < 10) {
                    this.channel.read(headerBuffer);
                }
                ByteBufferCompat.compatPosition(headerBuffer, 0);
                int readFrameId = headerBuffer.getInt();
                short messageType = headerBuffer.getShort();
                int messageLength = headerBuffer.getInt();
                Logger.debug("Channel read-header: id=" + readFrameId + ", type=" + messageType + ", length=" + messageLength);
                ByteBufferCompat.compatClear(headerBuffer);
                ByteBuffer byteBuffer = contentBuffer = messageLength > 0 ? ByteBuffer.allocate(messageLength) : EMPTY_BUFFER;
                while (contentBuffer.position() < messageLength) {
                    int reads = this.channel.read(contentBuffer);
                    if (reads != -1) continue;
                    throw new ClosedChannelException();
                }
                ByteBufferCompat.compatPosition(contentBuffer, 0);
                MessageFactory.MessageInfo info = this.factory.getInfo(messageType);
                StructureCodec decoder = info.getCodec();
                AbstractMessage value = (AbstractMessage)decoder.decode(new ByteBufferDataInput(contentBuffer));
                Logger.debug("Channel read-body: " + value);
                if (readFrameId == -1) {
                    if (this.broadcastListener == null || this.eventQueue.offer(() -> this.broadcastListener.onReceive(messageType, (AbstractBroadcastMessage)value))) continue;
                    Logger.warn("Cannot post-event of read-completion[broadcast], event-queue is full");
                    continue;
                }
                ResponseListener responseListener = this.responseListeners.remove(readFrameId);
                if (responseListener != null && !this.eventQueue.offer(() -> responseListener.onReceive(readFrameId, value))) {
                    Logger.warn("Cannot post-event of read-completion[response], event-queue is full");
                }
                if (this.allResponsesListener == null || this.eventQueue.offer(() -> this.allResponsesListener.onReceive(readFrameId, value))) continue;
                Logger.warn("Cannot post-event of read-completion[all-response], event-queue is full");
            }
        }
        catch (SocketException ex) {
            this.shutdown();
        }
        catch (Throwable t) {
            if (!this.running) {
                return;
            }
            t.printStackTrace();
            this.shutdown();
        }
    }

    private void writeLoop() {
        try {
            ByteBufferDataOutput output = new ByteBufferDataOutput(this.allocator);
            while (this.running) {
                WriteResult<?> write = this.writeQueue.take();
                int writeFrameId = write.getFrameId();
                Logger.debug("Channel write-header: id=" + writeFrameId + ", type=" + write.getDecoderKey() + ", value=" + write.getValue());
                output.reset();
                write.writeHeader(output);
                int contentStart = output.getBuffer().position();
                write.writeTo(output);
                int contentEnd = output.getBuffer().position();
                ByteBuffer buffer = output.consume();
                int contentLength = contentEnd - contentStart;
                buffer.putInt(6, contentLength);
                while (buffer.position() < buffer.limit()) {
                    this.channel.write(buffer);
                }
                write.complete();
                Logger.debug("Channel write-body: length=" + contentLength);
                if (this.writeListener == null || this.eventQueue.offer(() -> this.writeListener.onWrite(writeFrameId, (AbstractMessage)write.getValue()))) continue;
                Logger.warn("Cannot post-event of write-completion, event-queue is full");
            }
        }
        catch (InterruptedException output) {
        }
        catch (SocketException ex) {
            this.shutdown();
        }
        catch (Throwable t) {
            if (!this.running) {
                return;
            }
            t.printStackTrace();
        }
    }

    public int getNextFrameId() {
        return this.nextFrameId.getAndIncrement();
    }

    public void setBroadcastListener(BroadcastListener broadcastListener) {
        this.broadcastListener = broadcastListener;
    }

    public void setWriteListener(WriteListener writeListener) {
        this.writeListener = writeListener;
    }

    public void addResponseListener(int frameId, ResponseListener listener) {
        this.responseListeners.put(frameId, listener);
    }

    public void setAllResponsesListener(ResponseListener allResponsesListener) {
        this.allResponsesListener = allResponsesListener;
    }
}

