package com.ibm.mqlight.api.impl.engine;

import com.ibm.mqlight.api.ClientException;
import com.ibm.mqlight.api.NetworkException;
import com.ibm.mqlight.api.NotPermittedException;
import com.ibm.mqlight.api.QOS;
import com.ibm.mqlight.api.ReplacedException;
import com.ibm.mqlight.api.SecurityException;
import com.ibm.mqlight.api.StateException;
import com.ibm.mqlight.api.SubscribedException;
import com.ibm.mqlight.api.impl.ComponentImpl;
import com.ibm.mqlight.api.impl.Message;
import com.ibm.mqlight.api.impl.SubscriptionTopic;
import com.ibm.mqlight.api.impl.engine.EngineConnection;
import com.ibm.mqlight.api.impl.network.ConnectResponse;
import com.ibm.mqlight.api.impl.network.ConnectionError;
import com.ibm.mqlight.api.impl.network.DataRead;
import com.ibm.mqlight.api.impl.network.DisconnectResponse;
import com.ibm.mqlight.api.impl.network.NetworkClosePromiseImpl;
import com.ibm.mqlight.api.impl.network.NetworkConnectPromiseImpl;
import com.ibm.mqlight.api.impl.network.NetworkListenerImpl;
import com.ibm.mqlight.api.impl.network.NetworkWritePromiseImpl;
import com.ibm.mqlight.api.impl.network.WriteResponse;
import com.ibm.mqlight.api.impl.timer.PopResponse;
import com.ibm.mqlight.api.impl.timer.TimerPromiseImpl;
import com.ibm.mqlight.api.logging.FFDCProbeId;
import com.ibm.mqlight.api.logging.Logger;
import com.ibm.mqlight.api.logging.LoggerFactory;
import com.ibm.mqlight.api.network.NetworkService;
import com.ibm.mqlight.api.timer.TimerService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.EnumSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;

/* loaded from: input_file:com/ibm/mqlight/api/impl/engine/Engine.class */
public class Engine extends ComponentImpl implements Handler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) Engine.class);
    private final NetworkService network;
    private final TimerService timer;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private ScheduledFuture<?> receiveScheduledFuture;

    public Engine(NetworkService networkService, TimerService timerService) {
        logger.entry(this, "<init>", networkService, timerService);
        if (networkService == null) {
            IllegalArgumentException illegalArgumentException = new IllegalArgumentException("NetworkService argument cannot be null");
            logger.throwing(this, "<init>", illegalArgumentException);
            throw illegalArgumentException;
        }
        if (timerService == null) {
            IllegalArgumentException illegalArgumentException2 = new IllegalArgumentException("TimerService argument cannot be null");
            logger.throwing(this, "<init>", illegalArgumentException2);
            throw illegalArgumentException2;
        }
        this.network = networkService;
        this.timer = timerService;
        logger.exit(this, "<init>");
    }

    @Override // com.ibm.mqlight.api.impl.ComponentImpl
    protected void onReceive(Message message) {
        Sender sender;
        logger.entry(this, "onReceive", message);
        if (message instanceof OpenRequest) {
            OpenRequest openRequest = (OpenRequest) message;
            this.network.connect(openRequest.endpoint, new NetworkListenerImpl(this), new NetworkConnectPromiseImpl(this, openRequest));
        } else if (message instanceof ConnectResponse) {
            ConnectResponse connectResponse = (ConnectResponse) message;
            OpenRequest openRequest2 = (OpenRequest) connectResponse.context;
            if (connectResponse.exception != null) {
                openRequest2.getSender().tell(new OpenResponse(openRequest2, connectResponse.exception), this);
            } else {
                Connection connection = Proton.connection();
                Transport transport = Proton.transport();
                transport.setIdleTimeout(openRequest2.endpoint.getIdleTimeout());
                transport.bind(connection);
                Collector collector = Proton.collector();
                connection.setContainer(openRequest2.clientId);
                connection.setHostname(openRequest2.endpoint.getHost());
                connection.open();
                Sasl sasl = transport.sasl();
                sasl.client();
                if (openRequest2.endpoint.getUser() == null) {
                    sasl.setMechanisms("ANONYMOUS");
                } else {
                    sasl.plain(openRequest2.endpoint.getUser(), openRequest2.endpoint.getPassword());
                }
                Session session = connection.session();
                session.open();
                connection.collect(collector);
                EngineConnection engineConnection = new EngineConnection(connection, session, openRequest2.getSender(), transport, collector, connectResponse.channel);
                engineConnection.openRequest = openRequest2;
                connection.setContext(engineConnection);
                connectResponse.channel.setContext(engineConnection);
                writeToNetwork(engineConnection);
            }
        } else if (message instanceof CloseRequest) {
            CloseRequest closeRequest = (CloseRequest) message;
            Connection connection2 = closeRequest.connection.connection;
            EngineConnection engineConnection2 = (EngineConnection) connection2.getContext();
            if (engineConnection2.timerPromise != null) {
                TimerPromiseImpl timerPromiseImpl = engineConnection2.timerPromise;
                engineConnection2.timerPromise = null;
                this.timer.cancel(timerPromiseImpl);
            }
            connection2.close();
            engineConnection2.closeRequest = closeRequest;
            writeToNetwork(engineConnection2);
        } else if (message instanceof SendRequest) {
            SendRequest sendRequest = (SendRequest) message;
            EngineConnection engineConnection3 = sendRequest.connection;
            Link linkHead = sendRequest.connection.connection.linkHead(EnumSet.of(EndpointState.ACTIVE), EnumSet.of(EndpointState.ACTIVE, EndpointState.UNINITIALIZED));
            boolean z = false;
            while (true) {
                if (linkHead != null) {
                    if ((linkHead instanceof Sender) && sendRequest.topic.equals(linkHead.getName())) {
                        sendRequest.topic.equals(linkHead.getName());
                        sender = (Sender) linkHead;
                        break;
                    }
                    linkHead = linkHead.next(EnumSet.of(EndpointState.ACTIVE), EnumSet.of(EndpointState.ACTIVE, EndpointState.UNINITIALIZED));
                } else {
                    sender = sendRequest.connection.session.sender(sendRequest.topic);
                    Source source = new Source();
                    Target target = new Target();
                    source.setAddress(sendRequest.topic);
                    target.setAddress(sendRequest.topic);
                    sender.setSource(source);
                    sender.setTarget(target);
                    sender.open();
                    z = true;
                    break;
                }
            }
            long j = engineConnection3.deliveryTag;
            engineConnection3.deliveryTag = j + 1;
            Delivery delivery = sender.delivery(String.valueOf(j).getBytes(Charset.forName("UTF-8")));
            sender.send(sendRequest.buf.array(), 0, sendRequest.length);
            sendRequest.buf.release();
            if (sendRequest.qos == QOS.AT_MOST_ONCE) {
                delivery.settle();
            } else {
                engineConnection3.inProgressOutboundDeliveries.put(delivery, sendRequest);
            }
            sender.advance();
            engineConnection3.drained = false;
            int remaining = engineConnection3.transport.head().remaining();
            if (z) {
                remaining += sendRequest.length;
            }
            if (sendRequest.qos == QOS.AT_MOST_ONCE) {
                engineConnection3.addInflightQos0(remaining, new SendResponse(sendRequest, null), sendRequest.getSender(), this);
            }
            writeToNetwork(engineConnection3);
        } else if (message instanceof SubscribeRequest) {
            SubscribeRequest subscribeRequest = (SubscribeRequest) message;
            EngineConnection engineConnection4 = subscribeRequest.connection;
            if (engineConnection4.subscriptionData.containsKey(subscribeRequest.topic.toString())) {
                subscribeRequest.getSender().tell(new SubscribeResponse(engineConnection4, subscribeRequest.topic, new SubscribedException("Cannot subscribe because the client is already subscribed to topic " + subscribeRequest.topic.toString())), this);
            } else {
                Receiver receiver = subscribeRequest.connection.session.receiver(subscribeRequest.topic.getTopic());
                engineConnection4.subscriptionData.put(subscribeRequest.topic.toString(), new EngineConnection.SubscriptionData(subscribeRequest.getSender(), subscribeRequest.initialCredit, receiver));
                Source source2 = new Source();
                source2.setAddress(subscribeRequest.topic.getTopic());
                Target target2 = new Target();
                target2.setAddress(subscribeRequest.topic.getTopic());
                if (subscribeRequest.ttl > 0) {
                    source2.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
                    source2.setTimeout(new UnsignedInteger(subscribeRequest.ttl));
                    target2.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
                    target2.setTimeout(new UnsignedInteger(subscribeRequest.ttl));
                }
                receiver.setSource(source2);
                receiver.setTarget(target2);
                if (subscribeRequest.qos == QOS.AT_LEAST_ONCE) {
                    receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                    receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                } else {
                    receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
                    receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                }
                receiver.open();
                receiver.flow(subscribeRequest.initialCredit);
                writeToNetwork(engineConnection4);
            }
        } else if (message instanceof UnsubscribeRequest) {
            UnsubscribeRequest unsubscribeRequest = (UnsubscribeRequest) message;
            EngineConnection engineConnection5 = unsubscribeRequest.connection;
            EngineConnection.SubscriptionData subscriptionData = engineConnection5.subscriptionData.get(unsubscribeRequest.topic.toString());
            Target target3 = (Target) subscriptionData.receiver.getTarget();
            Source source3 = (Source) subscriptionData.receiver.getSource();
            if (unsubscribeRequest.zeroTtl) {
                target3.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
                target3.setTimeout(new UnsignedInteger(0));
                source3.setTimeout(new UnsignedInteger(0));
                source3.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
            }
            if (target3.getExpiryPolicy() == TerminusExpiryPolicy.NEVER || target3.getTimeout().longValue() > 0) {
                subscriptionData.receiver.detach();
            } else {
                subscriptionData.receiver.close();
            }
            writeToNetwork(engineConnection5);
        } else if (message instanceof DeliveryResponse) {
            DeliveryResponse deliveryResponse = (DeliveryResponse) message;
            deliveryResponse.request.delivery.settle();
            EngineConnection engineConnection6 = (EngineConnection) deliveryResponse.request.protonConnection.getContext();
            EngineConnection.SubscriptionData subscriptionData2 = engineConnection6.subscriptionData.get(deliveryResponse.request.topicPattern);
            if (subscriptionData2 != null) {
                subscriptionData2.settled++;
                subscriptionData2.unsettled--;
                if ((subscriptionData2.maxLinkCredit - subscriptionData2.unsettled) / subscriptionData2.settled <= 1.25d || (subscriptionData2.unsettled == 0 && subscriptionData2.settled > 0)) {
                    subscriptionData2.receiver.flow(subscriptionData2.settled);
                    subscriptionData2.settled = 0;
                }
            } else if (deliveryResponse.request.qos != QOS.AT_MOST_ONCE) {
                throw new StateException("Client had unsubscribed from '" + deliveryResponse.request.topicPattern + "' before delivery was confirmed");
            }
            writeToNetwork(engineConnection6);
        } else if (message instanceof WriteResponse) {
            WriteResponse writeResponse = (WriteResponse) message;
            EngineConnection engineConnection7 = (EngineConnection) writeResponse.context;
            if (engineConnection7 != null) {
                engineConnection7.bytesWritten += writeResponse.amount;
                engineConnection7.notifyInflightQos0(false);
                if (engineConnection7.transport.pending() > 0) {
                    writeToNetwork(engineConnection7);
                } else if (!engineConnection7.drained) {
                    engineConnection7.drained = true;
                    engineConnection7.requestor.tell(new DrainNotification(), this);
                }
            }
        } else if (message instanceof DataRead) {
            DataRead dataRead = (DataRead) message;
            EngineConnection engineConnection8 = (EngineConnection) dataRead.channel.getContext();
            if (!engineConnection8.closed) {
                ByteBuffer nioBuffer = dataRead.buffer.nioBuffer();
                while (nioBuffer.remaining() > 0) {
                    int limit = nioBuffer.limit();
                    ByteBuffer tail = engineConnection8.transport.tail();
                    nioBuffer.limit(nioBuffer.position() + Math.min(tail.remaining(), nioBuffer.remaining()));
                    tail.put(nioBuffer);
                    nioBuffer.limit(limit);
                    engineConnection8.transport.process();
                    process(engineConnection8.collector);
                }
                dataRead.buffer.release();
                writeToNetwork(engineConnection8);
            }
        } else if (message instanceof DisconnectResponse) {
            CloseRequest closeRequest2 = (CloseRequest) ((DisconnectResponse) message).context;
            if (closeRequest2 != null) {
                closeRequest2.connection.closed = true;
                closeRequest2.connection.notifyInflightQos0(true);
                closeRequest2.getSender().tell(new CloseResponse(closeRequest2), this);
            }
        } else if (message instanceof ConnectionError) {
            ConnectionError connectionError = (ConnectionError) message;
            EngineConnection engineConnection9 = (EngineConnection) connectionError.channel.getContext();
            if (!engineConnection9.closed) {
                if (engineConnection9.timerPromise != null) {
                    TimerPromiseImpl timerPromiseImpl2 = engineConnection9.timerPromise;
                    engineConnection9.timerPromise = null;
                    this.timer.cancel(timerPromiseImpl2);
                }
                engineConnection9.notifyInflightQos0(true);
                engineConnection9.closed = true;
                engineConnection9.transport.close_tail();
                engineConnection9.requestor.tell(new DisconnectNotification(engineConnection9, connectionError.cause), this);
            }
        } else if (message instanceof PopResponse) {
            EngineConnection engineConnection10 = (EngineConnection) ((PopResponse) message).promise.getContext();
            long currentTimeMillis = System.currentTimeMillis();
            long tick = engineConnection10.transport.tick(currentTimeMillis);
            logger.data(this, "onReceive", "Timeout: {}", Long.valueOf(tick));
            if (tick > 0) {
                TimerPromiseImpl timerPromiseImpl3 = new TimerPromiseImpl(this, engineConnection10);
                engineConnection10.timerPromise = timerPromiseImpl3;
                logger.data(this, "onReceive", "Scheduling at: {}", Long.valueOf(tick - currentTimeMillis));
                this.timer.schedule(tick - currentTimeMillis, timerPromiseImpl3);
                writeToNetwork(engineConnection10);
            }
        }
        logger.exit(this, "onReceive");
    }

    private void writeToNetwork(EngineConnection engineConnection) {
        logger.entry(this, "writeToNetwork", engineConnection);
        if (engineConnection.transport.pending() > 0) {
            ByteBuffer head = engineConnection.transport.head();
            int remaining = head.remaining();
            engineConnection.channel.write(head, new NetworkWritePromiseImpl(this, remaining, engineConnection));
            engineConnection.transport.pop(remaining);
            engineConnection.transport.tick(System.currentTimeMillis());
        }
        logger.exit(this, "writeToNetwork");
    }

    private void resetReceiveIdleTimer(Event event) {
        int idleTimeout;
        logger.entry(this, "resetReceiveIdleTimer", event);
        if (this.receiveScheduledFuture != null) {
            this.receiveScheduledFuture.cancel(false);
        }
        final Transport transport = event.getTransport();
        if (transport != null && (idleTimeout = transport.getIdleTimeout()) > 0) {
            this.receiveScheduledFuture = this.scheduler.schedule(new Runnable() { // from class: com.ibm.mqlight.api.impl.engine.Engine.1
                @Override // java.lang.Runnable
                public void run() {
                    Engine.logger.entry(this, "run");
                    transport.process();
                    transport.tick(System.currentTimeMillis());
                    Engine.logger.exit("run");
                }
            }, idleTimeout, TimeUnit.MILLISECONDS);
        }
        logger.exit(this, "resetReceiveIdleTimer");
    }

    private void process(Collector collector) {
        logger.entry(this, "process", collector);
        while (collector.peek() != null) {
            Event peek = collector.peek();
            logger.data(this, "process", "Processing event: {}", peek.getType());
            peek.dispatch(this);
            resetReceiveIdleTimer(peek);
            collector.pop();
        }
        logger.exit(this, "process");
    }

    private void processEventConnectionRemoteState(Event event) {
        logger.entry(this, "processEventConnectionRemoteState", event);
        if (event.getConnection().getRemoteState() == EndpointState.CLOSED) {
            ErrorCondition remoteCondition = event.getConnection().getRemoteCondition();
            EngineConnection engineConnection = (EngineConnection) event.getConnection().getContext();
            if (engineConnection.timerPromise != null) {
                TimerPromiseImpl timerPromiseImpl = engineConnection.timerPromise;
                engineConnection.timerPromise = null;
                this.timer.cancel(timerPromiseImpl);
            }
            if (event.getConnection().getLocalState() != EndpointState.CLOSED && engineConnection.openRequest != null) {
                OpenRequest openRequest = engineConnection.openRequest;
                engineConnection.openRequest = null;
                if (!engineConnection.closed) {
                    engineConnection.notifyInflightQos0(true);
                    engineConnection.closed = true;
                    engineConnection.channel.close(null);
                    openRequest.getSender().tell(new OpenResponse(openRequest, engineConnection.transport.sasl().getOutcome() == Sasl.SaslOutcome.PN_SASL_AUTH ? new SecurityException("Failed to authenticate with server - invalid username or password", getClientException(remoteCondition)) : (remoteCondition == null || remoteCondition.getDescription() == null) ? new NetworkException("The server closed the connection without providing any error information.") : getClientException(remoteCondition)), this);
                }
            } else if (!engineConnection.closed) {
                engineConnection.notifyInflightQos0(true);
                engineConnection.closed = true;
                CloseRequest closeRequest = engineConnection.closeRequest;
                engineConnection.closeRequest = null;
                engineConnection.channel.close(new NetworkClosePromiseImpl(this, closeRequest));
                if (closeRequest == null) {
                    engineConnection.requestor.tell(new DisconnectNotification(engineConnection, getClientException(remoteCondition)), this);
                }
            }
        } else if (event.getConnection().getRemoteState() == EndpointState.ACTIVE) {
            EngineConnection engineConnection2 = (EngineConnection) event.getConnection().getContext();
            long currentTimeMillis = System.currentTimeMillis();
            long tick = engineConnection2.transport.tick(currentTimeMillis);
            if (tick > 0) {
                engineConnection2.timerPromise = new TimerPromiseImpl(this, engineConnection2);
                this.timer.schedule(tick - currentTimeMillis, engineConnection2.timerPromise);
            }
        }
        logger.exit(this, "processEventConnectionRemoteState");
    }

    private ClientException getClientException(ErrorCondition errorCondition) {
        logger.entry(this, "getClientException", errorCondition);
        ClientException clientException = null;
        if (errorCondition != null && errorCondition.getCondition() != null) {
            if (errorCondition.getDescription().toString().contains("_Takeover")) {
                clientException = new ReplacedException(errorCondition.getDescription());
            } else if (errorCondition.getDescription().toString().contains("_InvalidSourceTimeout")) {
                clientException = new NotPermittedException(errorCondition.getDescription());
            }
            if (clientException == null && errorCondition.getDescription() != null && (errorCondition.getDescription().contains("sasl ") || errorCondition.getDescription().contains("SSL "))) {
                clientException = new SecurityException(errorCondition.getDescription());
            }
            if (clientException == null) {
                String symbol = errorCondition.getCondition().toString();
                if (errorCondition.getDescription() != null) {
                    symbol = symbol + ": " + errorCondition.getDescription();
                }
                clientException = new NetworkException(symbol);
            }
        }
        logger.exit(this, "getClientException", clientException);
        return clientException;
    }

    private void processEventLinkLocalState(Event event) {
        logger.entry(this, "processEventLinkLocalState", event);
        Link link = event.getLink();
        logger.data(this, "processEventLinkLocalState", "LINK_LOCAL {} {} {}", link, link.getLocalState(), link.getRemoteState());
        logger.exit(this, "processEventLinkLocalState");
    }

    private void processEventLinkRemoteState(Event event) {
        String str;
        logger.entry(this, "processEventLinkRemoteState", event);
        Link link = event.getLink();
        logger.data(this, "processEventLinkRemoteState", "LINK_REMOTE {} {} {}", link, link.getLocalState(), link.getRemoteState());
        Event.Type type = event.getType();
        if (link instanceof Receiver) {
            if (type == Event.Type.LINK_REMOTE_OPEN) {
                if (link.getLocalState() == EndpointState.ACTIVE && link.getRemoteState() == EndpointState.ACTIVE) {
                    EngineConnection engineConnection = (EngineConnection) event.getConnection().getContext();
                    engineConnection.subscriptionData.get(link.getName()).subscriber.tell(new SubscribeResponse(engineConnection, new SubscriptionTopic(link.getName())), this);
                }
            } else if ((type == Event.Type.LINK_REMOTE_CLOSE || type == Event.Type.LINK_REMOTE_DETACH) && link.getRemoteState() == EndpointState.CLOSED) {
                ClientException clientException = getClientException(link.getRemoteCondition());
                if (link.getLocalState() != EndpointState.CLOSED && !link.detached()) {
                    if (clientException == null) {
                        clientException = new ClientException("The server indicated that the destination was unsubscribed due to an error condition, without providing any further error information.");
                    }
                    link.close();
                }
                link.free();
                EngineConnection engineConnection2 = (EngineConnection) event.getConnection().getContext();
                EngineConnection.SubscriptionData remove = engineConnection2.subscriptionData.remove(link.getName());
                if (remove == null) {
                    logger.ffdc(this, "processEventLinkRemoteState", FFDCProbeId.PROBE_001, null, this, event);
                } else {
                    remove.subscriber.tell(new UnsubscribeResponse(engineConnection2, new SubscriptionTopic(link.getName()), clientException), this);
                }
            }
        } else if ((link instanceof Sender) && type == Event.Type.LINK_REMOTE_CLOSE && link.getRemoteState() == EndpointState.CLOSED) {
            if (link.getLocalState() != EndpointState.CLOSED) {
                ErrorCondition remoteCondition = link.getRemoteCondition();
                if (remoteCondition == null || remoteCondition.getCondition() == null) {
                    str = "The server indicated that our sending link was closed due to an error condition, without providing any further error information.";
                } else {
                    str = "The server indicated that our sending link was closed due to an error condition, " + remoteCondition.getCondition().toString();
                    if (remoteCondition.getDescription() != null) {
                        str = str + " - " + remoteCondition.getDescription();
                    }
                }
                logger.data(this, "processEventLinkRemoteState", str, link.getTarget().getAddress(), this);
                EngineConnection engineConnection3 = (EngineConnection) event.getConnection().getContext();
                Delivery head = link.head();
                while (true) {
                    Delivery delivery = head;
                    if (delivery == null) {
                        break;
                    }
                    SendRequest remove2 = engineConnection3.inProgressOutboundDeliveries.remove(delivery);
                    if (remove2 != null && remove2.getSender() != null) {
                        remove2.getSender().tell(new SendResponse(remove2, new ClientException(str)), this);
                    }
                    head = delivery.next();
                }
                link.close();
            }
            link.free();
        }
        logger.exit(this, "processEventLinkRemoteState");
    }

    private void processEventSessionRemoteState(Event event) {
        logger.entry(this, "processEventSessionRemoteState", event);
        if (event.getSession().getRemoteState() == EndpointState.ACTIVE) {
            if (event.getSession().getLocalState() == EndpointState.ACTIVE) {
                EngineConnection engineConnection = (EngineConnection) event.getConnection().getContext();
                OpenRequest openRequest = engineConnection.openRequest;
                engineConnection.openRequest = null;
                engineConnection.requestor.tell(new OpenResponse(openRequest, engineConnection), this);
            } else {
                Connection connection = event.getConnection();
                connection.setCondition(new ErrorCondition(Symbol.getSymbol("mqlight:session-remote-open-rejected"), "MQ Light client is unable to accept an open session request"));
                connection.close();
            }
        }
        logger.exit(this, "processEventSessionRemoteState");
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onConnectionInit(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onConnectionLocalOpen(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onConnectionRemoteOpen(Event event) {
        processEventConnectionRemoteState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onConnectionLocalClose(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onConnectionRemoteClose(Event event) {
        processEventConnectionRemoteState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onConnectionBound(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onConnectionUnbound(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onConnectionFinal(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onSessionInit(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onSessionLocalOpen(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onSessionRemoteOpen(Event event) {
        processEventSessionRemoteState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onSessionLocalClose(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onSessionRemoteClose(Event event) {
        processEventSessionRemoteState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onSessionFinal(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkInit(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkLocalOpen(Event event) {
        processEventLinkLocalState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkRemoteOpen(Event event) {
        processEventLinkRemoteState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkLocalDetach(Event event) {
        processEventLinkLocalState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkRemoteDetach(Event event) {
        processEventLinkRemoteState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkLocalClose(Event event) {
        processEventLinkLocalState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkRemoteClose(Event event) {
        processEventLinkRemoteState(event);
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkFlow(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onLinkFinal(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onDelivery(Event event) {
        logger.entry(this, "onDelivery", event);
        EngineConnection engineConnection = (EngineConnection) event.getConnection().getContext();
        Delivery delivery = event.getDelivery();
        if (event.getLink() instanceof Sender) {
            SendRequest remove = engineConnection.inProgressOutboundDeliveries.remove(delivery);
            Exception exc = null;
            if (delivery.getRemoteState() instanceof Rejected) {
                ErrorCondition error = ((Rejected) delivery.getRemoteState()).getError();
                exc = (error == null || error.getDescription() == null) ? new Exception("Message was rejected") : new Exception(error.getDescription());
            } else if (delivery.getRemoteState() instanceof Released) {
                exc = new Exception("Message was released");
            } else if (delivery.getRemoteState() instanceof Modified) {
                exc = new Exception("Message was modified");
            }
            remove.getSender().tell(new SendResponse(remove, exc), this);
        } else if (delivery.isReadable() && !delivery.isPartial()) {
            Receiver receiver = (Receiver) event.getLink();
            int pending = delivery.pending();
            byte[] bArr = new byte[pending];
            receiver.recv(bArr, 0, pending);
            receiver.advance();
            ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bArr);
            EngineConnection.SubscriptionData subscriptionData = engineConnection.subscriptionData.get(event.getLink().getName());
            subscriptionData.unsettled++;
            subscriptionData.subscriber.tell(new DeliveryRequest(wrappedBuffer, delivery.remotelySettled() ? QOS.AT_MOST_ONCE : QOS.AT_LEAST_ONCE, event.getLink().getName(), delivery, event.getConnection()), this);
        }
        logger.exit(this, "onDelivery");
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onTransport(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onTransportError(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onTransportHeadClosed(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onTransportTailClosed(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onTransportClosed(Event event) {
    }

    @Override // org.apache.qpid.proton.engine.Handler
    public void onUnhandled(Event event) {
        IllegalStateException illegalStateException = new IllegalStateException("Unknown event type: " + event.getType());
        logger.throwing(this, "onUnhandled", illegalStateException);
        throw illegalStateException;
    }
}
