package org.apache.beam.sdk.io.jms;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.amqp.AmqpTransportFactory;
import org.apache.beam.sdk.io.jms.JmsIO;

/* loaded from: input_file:org/apache/beam/sdk/io/jms/CommonJms.class */
public class CommonJms implements Serializable {
    private static final String BROKER_WITHOUT_PREFETCH_PARAM = "?jms.prefetchPolicy.all=0&";
    static final String USERNAME = "test_user";
    static final String PASSWORD = "test_password";
    static final String QUEUE = "test_queue";
    static final String TOPIC = "test_topic";
    private final String brokerUrl;
    private final Integer brokerPort;
    private final String forceAsyncAcksParam;
    private transient BrokerService broker;
    protected ConnectionFactory connectionFactory;
    protected final Class<? extends ConnectionFactory> connectionFactoryClass;
    protected ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;

    /* loaded from: input_file:org/apache/beam/sdk/io/jms/CommonJms$BytesMessageToStringMessageMapper.class */
    public static class BytesMessageToStringMessageMapper implements JmsIO.MessageMapper<String> {
        /* renamed from: mapMessage, reason: merged with bridge method [inline-methods] */
        public String m0mapMessage(Message message) throws Exception {
            return new String(new byte[(int) ((BytesMessage) message).getBodyLength()], StandardCharsets.UTF_8);
        }
    }

    public CommonJms(String str, Integer num, String str2, Class<? extends ConnectionFactory> cls) {
        this.brokerUrl = str;
        this.brokerPort = num;
        this.forceAsyncAcksParam = str2;
        this.connectionFactoryClass = cls;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
        TransportFactory.registerTransportFactory("amqp", new AmqpTransportFactory());
        if (this.connectionFactoryClass != ActiveMQConnectionFactory.class) {
            this.broker.addConnector(String.format("%s:%d?transport.transformer=jms", this.brokerUrl, this.brokerPort));
        } else {
            this.broker.addConnector(this.brokerUrl);
        }
        this.broker.setBrokerName("localhost");
        this.broker.setPopulateJMSXUserID(true);
        this.broker.setUseAuthenticatedPrincipalForJMSXUserID(true);
        this.broker.getManagementContext().setCreateConnector(false);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
        this.broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(arrayList)});
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionFactory = this.connectionFactoryClass.getConstructor(String.class).newInstance(this.brokerUrl);
        this.connectionFactoryWithSyncAcksAndWithoutPrefetch = this.connectionFactoryClass.getConstructor(String.class).newInstance(this.brokerUrl + BROKER_WITHOUT_PREFETCH_PARAM + this.forceAsyncAcksParam);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.broker = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Class<? extends ConnectionFactory> getConnectionFactoryClass() {
        return this.connectionFactoryClass;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionFactory getConnectionFactoryWithSyncAcksAndWithoutPrefetch() {
        return this.connectionFactoryWithSyncAcksAndWithoutPrefetch;
    }
}
