package com.datastax.oss.pulsar.jms.selectors;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:filters/jms-filter.nar:com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.class
 */
/* loaded from: input_file:interceptors/jms-filter.nar:com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.class */
public class JMSPublishFilters implements BrokerInterceptor {
    private static final Logger log = LoggerFactory.getLogger(JMSPublishFilters.class);
    private static final String JMS_FILTER_METADATA = "jms-msg-metadata";
    private final JMSFilter filter = new JMSFilter();
    private boolean enabled = false;
    private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers;
    private static final Field dispatchMessagesThreadFieldPersistentDispatcherSingleActiveConsumer;

    public void initialize(PulsarService pulsarService) {
        this.enabled = Boolean.parseBoolean(pulsarService.getConfiguration().getProperties().getProperty("jmsApplyFiltersOnPublish", "true"));
        log.info("jmsApplyFiltersOnPublish={}", Boolean.valueOf(this.enabled));
    }

    public void onMessagePublish(Producer producer, ByteBuf byteBuf, Topic.PublishContext publishContext) {
        if (this.enabled && !publishContext.isMarkerMessage() && !publishContext.isChunked() && publishContext.getNumberOfMessages() <= 1) {
            for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
                if ((subscription instanceof PersistentSubscription) && subscription.getSubscriptionProperties().containsKey("jms.selector")) {
                    publishContext.setProperty(JMS_FILTER_METADATA, new MessageMetadata().copyFrom(Commands.peekMessageMetadata(byteBuf, "jms-filter-on-publish", -1L)));
                    return;
                }
            }
        }
    }

    public void messageProduced(ServerCnx serverCnx, Producer producer, long j, long j2, long j3, Topic.PublishContext publishContext) {
        MessageMetadata messageMetadata;
        if (!this.enabled || (messageMetadata = (MessageMetadata) publishContext.getProperty(JMS_FILTER_METADATA)) == null || messageMetadata.hasNumMessagesInBatch()) {
            return;
        }
        for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
            scheduleOnDispatchThread(subscription, () -> {
                FilterContext filterContext = new FilterContext();
                filterContext.setSubscription(subscription);
                filterContext.setMsgMetadata(messageMetadata);
                filterContext.setConsumer((Consumer) null);
                if (this.filter.filterEntry(null, filterContext, true) == EntryFilter.FilterResult.REJECT) {
                    if (log.isDebugEnabled()) {
                        log.debug("Reject message {}:{} for subscription {}", new Object[]{Long.valueOf(j2), Long.valueOf(j3), subscription.getName()});
                    }
                    subscription.acknowledgeMessage(Collections.singletonList(new PositionImpl(j2, j3)), CommandAck.AckType.Individual, (Map) null);
                }
            });
        }
    }

    private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) {
        Executor executor;
        ExecutorService executorService;
        try {
            Dispatcher dispatcher = subscription.getDispatcher();
            if ((dispatcher instanceof PersistentDispatcherMultipleConsumers) && (executorService = (ExecutorService) dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers.get(dispatcher)) != null) {
                executorService.submit(runnable);
            } else if (!(dispatcher instanceof PersistentDispatcherSingleActiveConsumer) || (executor = (Executor) dispatchMessagesThreadFieldPersistentDispatcherSingleActiveConsumer.get(dispatcher)) == null) {
                subscription.getTopic().getBrokerService().getTopicOrderedExecutor().execute(runnable);
            } else {
                executor.execute(runnable);
            }
        } catch (Throwable th) {
            log.error("Error while scheduling on dispatch thread", th);
        }
    }

    public void close() {
        this.filter.close();
    }

    public void onPulsarCommand(BaseCommand baseCommand, ServerCnx serverCnx) throws InterceptException {
    }

    public void onConnectionClosed(ServerCnx serverCnx) {
    }

    public void onWebserviceRequest(ServletRequest servletRequest) throws IOException, ServletException, InterceptException {
    }

    public void onWebserviceResponse(ServletRequest servletRequest, ServletResponse servletResponse) throws IOException, ServletException {
    }

    static {
        Field field = null;
        Field field2 = null;
        try {
            field = PersistentDispatcherMultipleConsumers.class.getDeclaredField("dispatchMessagesThread");
            field.setAccessible(true);
            field2 = PersistentDispatcherSingleActiveConsumer.class.getDeclaredField("executor");
            field2.setAccessible(true);
        } catch (NoSuchFieldException e) {
            log.error("Cannot access thread field: " + e);
        }
        dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers = field;
        dispatchMessagesThreadFieldPersistentDispatcherSingleActiveConsumer = field2;
    }
}
