package org.apache.activemq.apollo.stomp.perf;

import java.util.concurrent.TimeUnit;
import org.apache.activemq.apollo.broker.OverflowSink;
import org.apache.activemq.apollo.broker.perf.RemoteConsumer;
import org.apache.activemq.apollo.dto.DestinationDTO;
import org.apache.activemq.apollo.dto.QueueDestinationDTO;
import org.apache.activemq.apollo.dto.TopicDestinationDTO;
import org.apache.activemq.apollo.stomp.Stomp$;
import org.apache.activemq.apollo.stomp.StompFrame;
import org.apache.activemq.apollo.stomp.StompFrame$;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtdispatch.package$;
import scala.MatchError;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;

/* compiled from: StompRemoteClients.scala */
@ScalaSignature(bytes = "\u0006\u0001%3\u0001\"\u0001\u0002\u0005\u0002\u0003\u0005\ta\u0004\u0002\u0014'R|W\u000e\u001d*f[>$XmQ8ogVlWM\u001d\u0006\u0003\u0007\u0011\tA\u0001]3sM*\u0011QAB\u0001\u0006gR|W\u000e\u001d\u0006\u0003\u000f!\ta!\u00199pY2|'BA\u0005\u000b\u0003!\t7\r^5wK6\f(BA\u0006\r\u0003\u0019\t\u0007/Y2iK*\tQ\"A\u0002pe\u001e\u001c\u0001aE\u0002\u0001!]\u0001\"!E\u000b\u000e\u0003IQ!aA\n\u000b\u0005Q1\u0011A\u00022s_.,'/\u0003\u0002\u0017%\tq!+Z7pi\u0016\u001cuN\\:v[\u0016\u0014\bC\u0001\r\u001c\u001b\u0005I\"\"\u0001\u000e\u0002\u000bM\u001c\u0017\r\\1\n\u0005qI\"aC*dC2\fwJ\u00196fGRDQA\b\u0001\u0005\u0002}\ta\u0001P5oSRtD#\u0001\u0011\u0011\u0005\u0005\u0002Q\"\u0001\u0002\t\u000f\r\u0002\u0001\u0019!C\u0001I\u0005aq.\u001e;c_VtGmU5oWV\tQ\u0005E\u0002'O%j\u0011aE\u0005\u0003QM\u0011Ab\u0014<fe\u001adwn^*j].\u0004\"AK\u0016\u000e\u0003\u0011I!\u0001\f\u0003\u0003\u0015M#x.\u001c9Ge\u0006lW\rC\u0004/\u0001\u0001\u0007I\u0011A\u0018\u0002!=,HOY8v]\u0012\u001c\u0016N\\6`I\u0015\fHC\u0001\u00194!\tA\u0012'\u0003\u000233\t!QK\\5u\u0011\u001d!T&!AA\u0002\u0015\n1\u0001\u001f\u00132\u0011\u00191\u0004\u0001)Q\u0005K\u0005iq.\u001e;c_VtGmU5oW\u0002BQ\u0001\u000f\u0001\u0005\u0002e\n1b\u001c8D_:tWm\u0019;fIR\t\u0001\u0007C\u0003<\u0001\u0011\u0005C(\u0001\u000bp]~#(/\u00198ta>\u0014HoX2p[6\fg\u000e\u001a\u000b\u0003auBQA\u0010\u001eA\u0002}\nqaY8n[\u0006tG\r\u0005\u0002A\u000b6\t\u0011I\u0003\u0002C\u0007\u0006!A.\u00198h\u0015\u0005!\u0015\u0001\u00026bm\u0006L!AR!\u0003\r=\u0013'.Z2u\u0011\u0015A\u0005\u0001\"\u0005:\u0003=iWm]:bO\u0016\u0014VmY3jm\u0016$\u0007")
/* loaded from: input_file:org/apache/activemq/apollo/stomp/perf/StompRemoteConsumer.class */
public class StompRemoteConsumer extends RemoteConsumer implements ScalaObject {
    private OverflowSink<StompFrame> outboundSink = null;

    public OverflowSink<StompFrame> outboundSink() {
        return this.outboundSink;
    }

    public void outboundSink_$eq(OverflowSink<StompFrame> overflowSink) {
        this.outboundSink = overflowSink;
    }

    public void onConnected() {
        AsciiBuffer ascii;
        outboundSink_$eq(new OverflowSink<>(transport_sink().map(new StompRemoteConsumer$$anonfun$onConnected$3(this))));
        outboundSink().refiller_$eq(package$.MODULE$.$up(new StompRemoteConsumer$$anonfun$onConnected$1(this)));
        QueueDestinationDTO destination = destination();
        if (destination instanceof QueueDestinationDTO) {
            ascii = AsciiBuffer.ascii(new StringBuilder().append("/queue/").append(((DestinationDTO) destination).name).toString());
        } else {
            if (!(destination instanceof TopicDestinationDTO)) {
                throw new MatchError(destination);
            }
            ascii = AsciiBuffer.ascii(new StringBuilder().append("/topic/").append(((DestinationDTO) ((TopicDestinationDTO) destination)).name).toString());
        }
        AsciiBuffer asciiBuffer = ascii;
        outboundSink().offer(new StompFrame(Stomp$.MODULE$.CONNECT(), StompFrame$.MODULE$.apply$default$2(), StompFrame$.MODULE$.apply$default$3(), StompFrame$.MODULE$.apply$default$4()));
        List $colon$colon = Nil$.MODULE$.$colon$colon(new Tuple2(Stomp$.MODULE$.DESTINATION(), asciiBuffer)).$colon$colon(new Tuple2(Stomp$.MODULE$.ID(), AsciiBuffer.ascii(new StringBuilder().append("stomp-sub-").append(name()).toString())));
        if (persistent()) {
            $colon$colon = $colon$colon.$colon$colon(new Tuple2(Stomp$.MODULE$.ACK_MODE(), Stomp$.MODULE$.ACK_MODE_CLIENT()));
        }
        outboundSink().offer(new StompFrame(Stomp$.MODULE$.SUBSCRIBE(), $colon$colon, StompFrame$.MODULE$.apply$default$3(), StompFrame$.MODULE$.apply$default$4()));
    }

    public void on_transport_command(Object obj) {
        StompFrame stompFrame = (StompFrame) obj;
        if (stompFrame != null) {
            AsciiBuffer action = stompFrame.action();
            stompFrame.headers();
            stompFrame.content();
            AsciiBuffer CONNECTED = Stomp$.MODULE$.CONNECTED();
            if (action == null) {
                if (CONNECTED == null) {
                    return;
                }
            } else if (action.equals(CONNECTED)) {
                return;
            }
            AsciiBuffer MESSAGE = Stomp$.MODULE$.MESSAGE();
            if (action != null ? action.equals(MESSAGE) : MESSAGE == null) {
                messageReceived();
                if (persistent()) {
                    outboundSink().offer(new StompFrame(Stomp$.MODULE$.ACK(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(Stomp$.MODULE$.MESSAGE_ID(), stompFrame.header(Stomp$.MODULE$.MESSAGE_ID()))})), StompFrame$.MODULE$.apply$default$3(), StompFrame$.MODULE$.apply$default$4()));
                    return;
                }
                return;
            }
            AsciiBuffer ERROR = Stomp$.MODULE$.ERROR();
            if (action != null ? action.equals(ERROR) : ERROR == null) {
                on_failure(new Exception(new StringBuilder().append("Server reported an error: ").append(stompFrame.content()).toString()));
                return;
            }
        }
        on_failure(new Exception(new StringBuilder().append("Unexpected stomp command: ").append(stompFrame.action()).toString()));
    }

    public void messageReceived() {
        if (thinkTime() <= 0) {
            rate().increment();
        } else {
            transport().suspendRead();
            dispatch_queue().executeAfter(thinkTime(), TimeUnit.MILLISECONDS, package$.MODULE$.$up(new StompRemoteConsumer$$anonfun$messageReceived$1(this)));
        }
    }
}
