/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.vault.common;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.vault.common.Entry;
import org.apache.ignite.internal.vault.common.VaultWatch;
import org.apache.ignite.internal.vault.common.Watcher;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteLogger;
import org.jetbrains.annotations.NotNull;

public class WatcherImpl
implements Watcher {
    private static final IgniteLogger LOG = IgniteLogger.forClass(WatcherImpl.class);
    private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<Entry>();
    private final Map<Long, VaultWatch> watches = new HashMap<Long, VaultWatch>();
    private volatile boolean stop;
    private AtomicLong watchIds;
    private final Object mux = new Object();
    private final ExecutorService exec;

    public WatcherImpl() {
        this.watchIds = new AtomicLong(0L);
        this.exec = Executors.newFixedThreadPool(1);
        this.exec.execute(new WatcherWorker());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Long> register(@NotNull VaultWatch vaultWatch) {
        Object object = this.mux;
        synchronized (object) {
            Long key = this.watchIds.incrementAndGet();
            this.watches.put(key, vaultWatch);
            return CompletableFuture.completedFuture(key);
        }
    }

    @Override
    public void notify(@NotNull Entry val) {
        this.queue.offer(val);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel(@NotNull Long id) {
        Object object = this.mux;
        synchronized (object) {
            this.watches.remove(id);
        }
    }

    public void shutdown() {
        this.stop = true;
        if (this.exec != null) {
            List<Runnable> tasks = this.exec.shutdownNow();
            if (!tasks.isEmpty()) {
                LOG.warn("Runnable tasks outlived thread pool executor service", new Object[0]);
            }
            try {
                this.exec.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ignored) {
                LOG.warn("Got interrupted while waiting for executor service to stop.", new Object[0]);
                this.exec.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    private class WatcherWorker
    implements Runnable {
        private WatcherWorker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!WatcherImpl.this.stop) {
                Object object;
                try {
                    Entry val = WatcherImpl.this.queue.take();
                    object = WatcherImpl.this.mux;
                    synchronized (object) {
                        WatcherImpl.this.watches.forEach((k, w) -> {
                            if (!w.notify(val)) {
                                WatcherImpl.this.cancel((Long)k);
                            }
                        });
                    }
                }
                catch (InterruptedException interruptedException) {
                    object = WatcherImpl.this.mux;
                    synchronized (object) {
                        WatcherImpl.this.watches.forEach((k, w) -> {
                            w.onError((Throwable)new IgniteInternalCheckedException("Error occurred during watches handling ", interruptedException.getCause()));
                            WatcherImpl.this.cancel((Long)k);
                        });
                    }
                }
            }
        }
    }
}

