package org.springframework.data.mongodb.core.messaging;

import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;

/* loaded from: input_file:BOOT-INF/lib/spring-data-mongodb-2.1.2.RELEASE.jar:org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.class */
public class DefaultMessageListenerContainer implements MessageListenerContainer {
    private final Executor taskExecutor;
    private final TaskFactory taskFactory;
    private final Optional<ErrorHandler> errorHandler;
    private final Object lifecycleMonitor;
    private final Map<SubscriptionRequest, Subscription> subscriptions;
    private boolean running;

    /* loaded from: input_file:BOOT-INF/lib/spring-data-mongodb-2.1.2.RELEASE.jar:org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer$DecoratingLoggingErrorHandler.class */
    private static class DecoratingLoggingErrorHandler implements ErrorHandler {
        private final Log logger = LogFactory.getLog(DecoratingLoggingErrorHandler.class);
        private final ErrorHandler delegate;

        @Override // org.springframework.util.ErrorHandler
        public void handleError(Throwable th) {
            if (this.logger.isErrorEnabled()) {
                this.logger.error("Unexpected error occurred while listening to MongoDB.", th);
            }
            this.delegate.handleError(th);
        }

        DecoratingLoggingErrorHandler(ErrorHandler errorHandler) {
            this.delegate = errorHandler;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-data-mongodb-2.1.2.RELEASE.jar:org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer$TaskSubscription.class */
    public static class TaskSubscription implements Subscription {
        private final Task task;

        TaskSubscription(Task task) {
            this.task = task;
        }

        Task getTask() {
            return this.task;
        }

        @Override // org.springframework.data.mongodb.core.messaging.Subscription
        public boolean isActive() {
            return this.task.isActive();
        }

        @Override // org.springframework.data.mongodb.core.messaging.Subscription
        public boolean await(Duration duration) throws InterruptedException {
            return this.task.awaitStart(duration);
        }

        @Override // org.springframework.data.mongodb.core.messaging.Cancelable
        public void cancel() throws DataAccessResourceFailureException {
            this.task.cancel();
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof TaskSubscription)) {
                return false;
            }
            TaskSubscription taskSubscription = (TaskSubscription) obj;
            if (!taskSubscription.canEqual(this)) {
                return false;
            }
            Task task = getTask();
            Task task2 = taskSubscription.getTask();
            return task == null ? task2 == null : task.equals(task2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof TaskSubscription;
        }

        public int hashCode() {
            Task task = getTask();
            return (1 * 59) + (task == null ? 43 : task.hashCode());
        }
    }

    public DefaultMessageListenerContainer(MongoTemplate mongoTemplate) {
        this(mongoTemplate, new SimpleAsyncTaskExecutor());
    }

    public DefaultMessageListenerContainer(MongoTemplate mongoTemplate, Executor executor) {
        this(mongoTemplate, executor, null);
    }

    public DefaultMessageListenerContainer(MongoTemplate mongoTemplate, Executor executor, @Nullable ErrorHandler errorHandler) {
        this.lifecycleMonitor = new Object();
        this.subscriptions = new LinkedHashMap();
        this.running = false;
        Assert.notNull(mongoTemplate, "Template must not be null!");
        Assert.notNull(executor, "TaskExecutor must not be null!");
        this.taskExecutor = executor;
        this.taskFactory = new TaskFactory(mongoTemplate);
        this.errorHandler = Optional.ofNullable(errorHandler);
    }

    @Override // org.springframework.context.SmartLifecycle
    public boolean isAutoStartup() {
        return false;
    }

    @Override // org.springframework.context.SmartLifecycle
    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        synchronized (this.lifecycleMonitor) {
            if (this.running) {
                return;
            }
            Stream<Subscription> filter = this.subscriptions.values().stream().filter(subscription -> {
                return !subscription.isActive();
            }).filter(subscription2 -> {
                return subscription2 instanceof TaskSubscription;
            });
            Class<TaskSubscription> cls = TaskSubscription.class;
            TaskSubscription.class.getClass();
            Stream map = filter.map((v1) -> {
                return r1.cast(v1);
            }).map((v0) -> {
                return v0.getTask();
            });
            Executor executor = this.taskExecutor;
            executor.getClass();
            map.forEach((v1) -> {
                r1.execute(v1);
            });
            this.running = true;
        }
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        synchronized (this.lifecycleMonitor) {
            if (this.running) {
                this.subscriptions.values().forEach((v0) -> {
                    v0.cancel();
                });
                this.running = false;
            }
        }
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        boolean z;
        synchronized (this.lifecycleMonitor) {
            z = this.running;
        }
        return z;
    }

    @Override // org.springframework.context.SmartLifecycle, org.springframework.context.Phased
    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    @Override // org.springframework.data.mongodb.core.messaging.MessageListenerContainer
    public <S, T> Subscription register(SubscriptionRequest<S, ? super T, ? extends SubscriptionRequest.RequestOptions> subscriptionRequest, Class<T> cls) {
        return register(subscriptionRequest, cls, this.errorHandler.orElseGet(() -> {
            return new DecoratingLoggingErrorHandler(th -> {
                lookup(subscriptionRequest).ifPresent((v0) -> {
                    v0.cancel();
                });
            });
        }));
    }

    @Override // org.springframework.data.mongodb.core.messaging.MessageListenerContainer
    public <S, T> Subscription register(SubscriptionRequest<S, ? super T, ? extends SubscriptionRequest.RequestOptions> subscriptionRequest, Class<T> cls, ErrorHandler errorHandler) {
        return register(subscriptionRequest, this.taskFactory.forRequest(subscriptionRequest, cls, errorHandler));
    }

    @Override // org.springframework.data.mongodb.core.messaging.MessageListenerContainer
    public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> subscriptionRequest) {
        Optional<Subscription> ofNullable;
        synchronized (this.lifecycleMonitor) {
            ofNullable = Optional.ofNullable(this.subscriptions.get(subscriptionRequest));
        }
        return ofNullable;
    }

    public Subscription register(SubscriptionRequest subscriptionRequest, Task task) {
        TaskSubscription taskSubscription = new TaskSubscription(task);
        synchronized (this.lifecycleMonitor) {
            if (this.subscriptions.containsKey(subscriptionRequest)) {
                return this.subscriptions.get(subscriptionRequest);
            }
            this.subscriptions.put(subscriptionRequest, taskSubscription);
            if (this.running) {
                this.taskExecutor.execute(task);
            }
            return taskSubscription;
        }
    }

    @Override // org.springframework.data.mongodb.core.messaging.MessageListenerContainer
    public void remove(Subscription subscription) {
        synchronized (this.lifecycleMonitor) {
            if (this.subscriptions.containsValue(subscription)) {
                if (subscription.isActive()) {
                    subscription.cancel();
                }
                this.subscriptions.values().remove(subscription);
            }
        }
    }
}
