package ru.yoomoney.tech.dbqueue.config;

import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yoomoney.tech.dbqueue.api.QueueConsumer;
import ru.yoomoney.tech.dbqueue.internal.processing.MillisTimeProvider;
import ru.yoomoney.tech.dbqueue.internal.processing.TimeLimiter;
import ru.yoomoney.tech.dbqueue.settings.QueueConfig;
import ru.yoomoney.tech.dbqueue.settings.QueueId;
import ru.yoomoney.tech.dbqueue.settings.QueueSettings;

@ThreadSafe
/* loaded from: input_file:ru/yoomoney/tech/dbqueue/config/QueueService.class */
public class QueueService {
    private static final Logger log = LoggerFactory.getLogger(QueueService.class);

    @Nonnull
    private final Map<QueueId, Map<QueueShardId, QueueExecutionPool>> registeredQueues;

    @Nonnull
    private final Map<QueueId, QueueConsumer<?>> registeredConsumer;

    @Nonnull
    private final List<QueueShard<?>> queueShards;

    @Nonnull
    private final BiFunction<QueueShard<?>, QueueConsumer<?>, QueueExecutionPool> queueExecutionPoolFactory;

    public QueueService(@Nonnull List<QueueShard<?>> list, @Nonnull ThreadLifecycleListener threadLifecycleListener, @Nonnull TaskLifecycleListener taskLifecycleListener) {
        this(list, (queueShard, queueConsumer) -> {
            return new QueueExecutionPool(queueConsumer, queueShard, taskLifecycleListener, threadLifecycleListener);
        });
    }

    QueueService(@Nonnull List<QueueShard<?>> list, @Nonnull BiFunction<QueueShard<?>, QueueConsumer<?>, QueueExecutionPool> biFunction) {
        this.registeredQueues = new LinkedHashMap();
        this.registeredConsumer = new LinkedHashMap();
        this.queueShards = (List) Objects.requireNonNull(list, "queueShards");
        this.queueExecutionPoolFactory = (BiFunction) Objects.requireNonNull(biFunction, "queueExecutionPoolFactory");
    }

    private Map<QueueShardId, QueueExecutionPool> getQueuePools(@Nonnull QueueId queueId, @Nonnull String str) {
        Objects.requireNonNull(queueId, "queueId");
        Objects.requireNonNull(str, "method");
        if (this.registeredQueues.containsKey(queueId)) {
            return this.registeredQueues.get(queueId);
        }
        throw new IllegalArgumentException("cannot invoke " + str + ", queue is not registered: queueId=" + queueId);
    }

    public synchronized <PayloadT> boolean registerQueue(@Nonnull QueueConsumer<PayloadT> queueConsumer) {
        Objects.requireNonNull(queueConsumer);
        QueueId queueId = queueConsumer.getQueueConfig().getLocation().getQueueId();
        if (this.registeredQueues.containsKey(queueId)) {
            log.info("queue is already registered: queueId={}", queueId);
            return false;
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.queueShards.forEach(queueShard -> {
            linkedHashMap.put(queueShard.getShardId(), this.queueExecutionPoolFactory.apply(queueShard, queueConsumer));
        });
        this.registeredQueues.put(queueId, linkedHashMap);
        this.registeredConsumer.put(queueId, queueConsumer);
        return true;
    }

    public synchronized Map<QueueId, String> updateQueueConfigs(@Nonnull Collection<QueueConfig> collection) {
        Objects.requireNonNull(collection);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        collection.forEach(queueConfig -> {
            if (!this.registeredConsumer.containsKey(queueConfig.getLocation().getQueueId())) {
                throw new IllegalArgumentException("cannot update queue configuration, queue is not registered: queueId=" + queueConfig.getLocation().getQueueId());
            }
            StringJoiner stringJoiner = new StringJoiner(",");
            QueueSettings settings = this.registeredConsumer.get(queueConfig.getLocation().getQueueId()).getQueueConfig().getSettings();
            QueueSettings settings2 = queueConfig.getSettings();
            Optional<String> value = settings.getProcessingSettings().setValue(settings2.getProcessingSettings());
            Objects.requireNonNull(stringJoiner);
            value.ifPresent((v1) -> {
                r1.add(v1);
            });
            Optional<String> value2 = settings.getPollSettings().setValue(settings2.getPollSettings());
            Objects.requireNonNull(stringJoiner);
            value2.ifPresent((v1) -> {
                r1.add(v1);
            });
            Optional<String> value3 = settings.getFailureSettings().setValue(settings2.getFailureSettings());
            Objects.requireNonNull(stringJoiner);
            value3.ifPresent((v1) -> {
                r1.add(v1);
            });
            Optional<String> value4 = settings.getReenqueueSettings().setValue(settings2.getReenqueueSettings());
            Objects.requireNonNull(stringJoiner);
            value4.ifPresent((v1) -> {
                r1.add(v1);
            });
            Optional<String> value5 = settings.getExtSettings().setValue(settings2.getExtSettings());
            Objects.requireNonNull(stringJoiner);
            value5.ifPresent((v1) -> {
                r1.add(v1);
            });
            if (stringJoiner.toString().isEmpty()) {
                return;
            }
            linkedHashMap.put(queueConfig.getLocation().getQueueId(), stringJoiner.toString());
        });
        return linkedHashMap;
    }

    public synchronized void start() {
        log.info("starting all queues");
        this.registeredQueues.keySet().forEach(this::start);
    }

    public synchronized void start(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        log.info("starting queue: queueId={}", queueId);
        getQueuePools(queueId, "start").values().forEach((v0) -> {
            v0.start();
        });
    }

    public synchronized void shutdown() {
        log.info("shutting down all queues");
        this.registeredQueues.keySet().forEach(this::shutdown);
    }

    public synchronized void shutdown(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        log.info("shutting down queue: queueId={}", queueId);
        getQueuePools(queueId, "shutdown").values().forEach((v0) -> {
            v0.shutdown();
        });
    }

    public synchronized boolean isShutdown(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        return getQueuePools(queueId, "isShutdown").values().stream().allMatch((v0) -> {
            return v0.isShutdown();
        });
    }

    public synchronized boolean isShutdown() {
        return this.registeredQueues.keySet().stream().allMatch(this::isShutdown);
    }

    public synchronized boolean isTerminated(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        return getQueuePools(queueId, "isTerminated").values().stream().allMatch((v0) -> {
            return v0.isTerminated();
        });
    }

    public synchronized boolean isTerminated() {
        return this.registeredQueues.keySet().stream().allMatch(this::isTerminated);
    }

    public synchronized void pause(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        log.info("pausing queue: queueId={}", queueId);
        getQueuePools(queueId, "pause").values().forEach((v0) -> {
            v0.pause();
        });
    }

    public synchronized void pause() {
        log.info("pausing all queues");
        this.registeredQueues.keySet().forEach(this::pause);
    }

    public synchronized void unpause(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        log.info("unpausing queue: queueId={}", queueId);
        getQueuePools(queueId, "unpause").values().forEach((v0) -> {
            v0.unpause();
        });
    }

    public synchronized void unpause() {
        log.info("unpausing all queues");
        this.registeredQueues.keySet().forEach(this::unpause);
    }

    public synchronized boolean isPaused() {
        return this.registeredQueues.keySet().stream().allMatch(this::isPaused);
    }

    public synchronized boolean isPaused(@Nonnull QueueId queueId) {
        Objects.requireNonNull(queueId, "queueId");
        return getQueuePools(queueId, "isPaused").values().stream().allMatch((v0) -> {
            return v0.isPaused();
        });
    }

    public synchronized List<QueueId> awaitTermination(@Nonnull Duration duration) {
        Objects.requireNonNull(duration, "timeout");
        log.info("awaiting all queues termination: timeout={}", duration);
        TimeLimiter timeLimiter = new TimeLimiter(new MillisTimeProvider.SystemMillisTimeProvider(), duration);
        this.registeredQueues.keySet().forEach(queueId -> {
            timeLimiter.execute(duration2 -> {
                awaitTermination(queueId, duration2);
            });
        });
        return (List) this.registeredQueues.keySet().stream().filter(queueId2 -> {
            return !isTerminated(queueId2);
        }).collect(Collectors.toList());
    }

    public synchronized List<QueueShardId> awaitTermination(@Nonnull QueueId queueId, @Nonnull Duration duration) {
        Objects.requireNonNull(queueId, "queueId");
        Objects.requireNonNull(duration, "timeout");
        log.info("awaiting queue termination: queueId={}, timeout={}", queueId, duration);
        TimeLimiter timeLimiter = new TimeLimiter(new MillisTimeProvider.SystemMillisTimeProvider(), duration);
        getQueuePools(queueId, "awaitTermination").values().forEach(queueExecutionPool -> {
            Objects.requireNonNull(queueExecutionPool);
            timeLimiter.execute(queueExecutionPool::awaitTermination);
        });
        return (List) getQueuePools(queueId, "awaitTermination").values().stream().filter(queueExecutionPool2 -> {
            return !queueExecutionPool2.isTerminated();
        }).map((v0) -> {
            return v0.getQueueShardId();
        }).collect(Collectors.toList());
    }

    public synchronized void wakeup(@Nonnull QueueId queueId, @Nonnull QueueShardId queueShardId) {
        Objects.requireNonNull(queueId, "queueId");
        Objects.requireNonNull(queueShardId, "queueShardId");
        QueueExecutionPool queueExecutionPool = getQueuePools(queueId, "wakeup").get(queueShardId);
        if (queueExecutionPool == null) {
            throw new IllegalArgumentException("cannot wakeup, unknown shard: queueId=" + queueId + ", shardId=" + queueShardId);
        }
        queueExecutionPool.wakeup();
    }
}
