package org.apache.activemq.apollo.stomp.perf;

import java.util.concurrent.TimeUnit;
import org.apache.activemq.apollo.broker.OverflowSink;
import org.apache.activemq.apollo.broker.perf.RemoteConsumer;
import org.apache.activemq.apollo.dto.QueueDestinationDTO;
import org.apache.activemq.apollo.dto.TopicDestinationDTO;
import org.apache.activemq.apollo.stomp.Stomp$;
import org.apache.activemq.apollo.stomp.StompFrame;
import org.apache.activemq.apollo.stomp.StompFrame$;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtdispatch.package$;
import scala.MatchError;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;

/* compiled from: StompRemoteClients.scala */
@ScalaSignature(bytes = "\u0006\u0001%3A!\u0001\u0002\u0001\u001f\t\u00192\u000b^8naJ+Wn\u001c;f\u0007>t7/^7fe*\u00111\u0001B\u0001\u0005a\u0016\u0014hM\u0003\u0002\u0006\r\u0005)1\u000f^8na*\u0011q\u0001C\u0001\u0007CB|G\u000e\\8\u000b\u0005%Q\u0011\u0001C1di&4X-\\9\u000b\u0005-a\u0011AB1qC\u000eDWMC\u0001\u000e\u0003\ry'oZ\u0002\u0001'\r\u0001\u0001c\u0006\t\u0003#Ui\u0011A\u0005\u0006\u0003\u0007MQ!\u0001\u0006\u0004\u0002\r\t\u0014xn[3s\u0013\t1\"C\u0001\bSK6|G/Z\"p]N,X.\u001a:\u0011\u0005aYR\"A\r\u000b\u0003i\tQa]2bY\u0006L!\u0001H\r\u0003\u0017M\u001b\u0017\r\\1PE*,7\r\u001e\u0005\u0006=\u0001!\taH\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\u0001\u0002\"!\t\u0001\u000e\u0003\tAqa\t\u0001A\u0002\u0013\u0005A%\u0001\u0007pkR\u0014w.\u001e8e'&t7.F\u0001&!\r1s%K\u0007\u0002'%\u0011\u0001f\u0005\u0002\r\u001fZ,'O\u001a7poNKgn\u001b\t\u0003U-j\u0011\u0001B\u0005\u0003Y\u0011\u0011!b\u0015;p[B4%/Y7f\u0011\u001dq\u0003\u00011A\u0005\u0002=\n\u0001c\\;uE>,h\u000eZ*j].|F%Z9\u0015\u0005A\u001a\u0004C\u0001\r2\u0013\t\u0011\u0014D\u0001\u0003V]&$\bb\u0002\u001b.\u0003\u0003\u0005\r!J\u0001\u0004q\u0012\n\u0004B\u0002\u001c\u0001A\u0003&Q%A\u0007pkR\u0014w.\u001e8e'&t7\u000e\t\u0005\u0006q\u0001!\t!O\u0001\f_:\u001cuN\u001c8fGR,G\rF\u00011\u0011\u0015Y\u0004\u0001\"\u0011=\u0003Qygn\u0018;sC:\u001c\bo\u001c:u?\u000e|W.\\1oIR\u0011\u0001'\u0010\u0005\u0006}i\u0002\raP\u0001\bG>lW.\u00198e!\t\u0001U)D\u0001B\u0015\t\u00115)\u0001\u0003mC:<'\"\u0001#\u0002\t)\fg/Y\u0005\u0003\r\u0006\u0013aa\u00142kK\u000e$\b\"\u0002%\u0001\t#I\u0014aD7fgN\fw-\u001a*fG\u0016Lg/\u001a3")
/* loaded from: input_file:org/apache/activemq/apollo/stomp/perf/StompRemoteConsumer.class */
public class StompRemoteConsumer extends RemoteConsumer implements ScalaObject {
    private OverflowSink<StompFrame> outboundSink = null;

    public OverflowSink<StompFrame> outboundSink() {
        return this.outboundSink;
    }

    public void outboundSink_$eq(OverflowSink<StompFrame> overflowSink) {
        this.outboundSink = overflowSink;
    }

    public void onConnected() {
        AsciiBuffer ascii;
        outboundSink_$eq(new OverflowSink<>(transport_sink().map(new StompRemoteConsumer$$anonfun$onConnected$3(this))));
        outboundSink().refiller_$eq(package$.MODULE$.$up(new StompRemoteConsumer$$anonfun$onConnected$1(this)));
        QueueDestinationDTO destination = destination();
        if (destination instanceof QueueDestinationDTO) {
            ascii = AsciiBuffer.ascii(new StringBuilder().append("/queue/").append(destination.path).toString());
        } else {
            if (!(destination instanceof TopicDestinationDTO)) {
                throw new MatchError(destination);
            }
            ascii = AsciiBuffer.ascii(new StringBuilder().append("/topic/").append(((TopicDestinationDTO) destination).path).toString());
        }
        AsciiBuffer asciiBuffer = ascii;
        outboundSink().offer(new StompFrame(Stomp$.MODULE$.CONNECT(), StompFrame$.MODULE$.apply$default$2(), StompFrame$.MODULE$.apply$default$3(), StompFrame$.MODULE$.apply$default$4()));
        List $colon$colon = Nil$.MODULE$.$colon$colon(new Tuple2(Stomp$.MODULE$.DESTINATION(), asciiBuffer)).$colon$colon(new Tuple2(Stomp$.MODULE$.ID(), AsciiBuffer.ascii(new StringBuilder().append("stomp-sub-").append(name()).toString())));
        if (persistent()) {
            $colon$colon = $colon$colon.$colon$colon(new Tuple2(Stomp$.MODULE$.ACK_MODE(), Stomp$.MODULE$.ACK_MODE_CLIENT_INDIVIDUAL()));
        }
        outboundSink().offer(new StompFrame(Stomp$.MODULE$.SUBSCRIBE(), $colon$colon, StompFrame$.MODULE$.apply$default$3(), StompFrame$.MODULE$.apply$default$4()));
    }

    public void on_transport_command(Object obj) {
        StompFrame stompFrame = (StompFrame) obj;
        if (stompFrame != null) {
            AsciiBuffer action = stompFrame.action();
            AsciiBuffer CONNECTED = Stomp$.MODULE$.CONNECTED();
            if (CONNECTED == null) {
                if (action == null) {
                    return;
                }
            } else if (CONNECTED.equals(action)) {
                return;
            }
            AsciiBuffer MESSAGE = Stomp$.MODULE$.MESSAGE();
            if (MESSAGE != null ? MESSAGE.equals(action) : action == null) {
                messageReceived();
                if (persistent()) {
                    outboundSink().offer(new StompFrame(Stomp$.MODULE$.ACK(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(Stomp$.MODULE$.MESSAGE_ID(), stompFrame.header(Stomp$.MODULE$.MESSAGE_ID()))})), StompFrame$.MODULE$.apply$default$3(), StompFrame$.MODULE$.apply$default$4()));
                    return;
                }
                return;
            }
            AsciiBuffer ERROR = Stomp$.MODULE$.ERROR();
            if (ERROR != null ? ERROR.equals(action) : action == null) {
                on_failure(new Exception(new StringBuilder().append("Server reported an error: ").append(stompFrame.content()).toString()));
                return;
            }
        }
        on_failure(new Exception(new StringBuilder().append("Unexpected stomp command: ").append(stompFrame.action()).toString()));
    }

    public void messageReceived() {
        if (thinkTime() <= 0) {
            rate().increment();
        } else {
            transport().suspendRead();
            dispatch_queue().executeAfter(thinkTime(), TimeUnit.MILLISECONDS, package$.MODULE$.$up(new StompRemoteConsumer$$anonfun$messageReceived$1(this)));
        }
    }
}
