/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.metastorage.client;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.CursorImpl;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.EntryEvent;
import org.apache.ignite.internal.metastorage.client.EntryImpl;
import org.apache.ignite.internal.metastorage.client.MetaStorageService;
import org.apache.ignite.internal.metastorage.client.Operation;
import org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
import org.apache.ignite.internal.metastorage.common.command.GetCommand;
import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
import org.apache.ignite.internal.metastorage.common.command.PutCommand;
import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorsCloseCommand;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lang.IgniteUuidGenerator;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class MetaStorageServiceImpl
implements MetaStorageService {
    private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageServiceImpl.class);
    private static final IgniteUuidGenerator uuidGenerator = new IgniteUuidGenerator(UUID.randomUUID(), 0L);
    private final RaftGroupService metaStorageRaftGrpSvc;
    private final WatchProcessor watchProcessor;
    private final String localNodeId;

    public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, String localNodeId) {
        this.metaStorageRaftGrpSvc = metaStorageRaftGrpSvc;
        this.watchProcessor = new WatchProcessor();
        this.localNodeId = localNodeId;
    }

    @Override
    @NotNull
    public CompletableFuture<Entry> get(@NotNull ByteArray key) {
        return this.metaStorageRaftGrpSvc.run((Command)new GetCommand(key)).thenApply(MetaStorageServiceImpl::singleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<Entry> get(@NotNull ByteArray key, long revUpperBound) {
        return this.metaStorageRaftGrpSvc.run((Command)new GetCommand(key, revUpperBound)).thenApply(MetaStorageServiceImpl::singleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) {
        return this.metaStorageRaftGrpSvc.run((Command)new GetAllCommand(keys)).thenApply(MetaStorageServiceImpl::multipleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
        return this.metaStorageRaftGrpSvc.run((Command)new GetAllCommand(keys, revUpperBound)).thenApply(MetaStorageServiceImpl::multipleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] value) {
        return this.metaStorageRaftGrpSvc.run((Command)new PutCommand(key, value));
    }

    @Override
    @NotNull
    public CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, @NotNull byte[] value) {
        return this.metaStorageRaftGrpSvc.run((Command)new GetAndPutCommand(key, value)).thenApply(MetaStorageServiceImpl::singleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
        return this.metaStorageRaftGrpSvc.run((Command)new PutAllCommand(vals));
    }

    @Override
    @NotNull
    public CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
        return this.metaStorageRaftGrpSvc.run((Command)new GetAndPutAllCommand(vals)).thenApply(MetaStorageServiceImpl::multipleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<Void> remove(@NotNull ByteArray key) {
        return this.metaStorageRaftGrpSvc.run((Command)new RemoveCommand(key));
    }

    @Override
    @NotNull
    public CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) {
        return this.metaStorageRaftGrpSvc.run((Command)new GetAndRemoveCommand(key)).thenApply(MetaStorageServiceImpl::singleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) {
        return this.metaStorageRaftGrpSvc.run((Command)new RemoveAllCommand(keys));
    }

    @Override
    @NotNull
    public CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) {
        return this.metaStorageRaftGrpSvc.run((Command)new GetAndRemoveAllCommand(keys)).thenApply(MetaStorageServiceImpl::multipleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<Boolean> invoke(@NotNull Condition condition, @NotNull Operation success, @NotNull Operation failure) {
        return this.invoke(condition, List.of(success), List.of(failure));
    }

    @Override
    @NotNull
    public CompletableFuture<Boolean> invoke(@NotNull Condition condition, @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure) {
        ConditionInfo cond = MetaStorageServiceImpl.toConditionInfo(condition);
        List<OperationInfo> successOps = MetaStorageServiceImpl.toOperationInfos(success);
        List<OperationInfo> failureOps = MetaStorageServiceImpl.toOperationInfos(failure);
        return this.metaStorageRaftGrpSvc.run((Command)new InvokeCommand(cond, successOps, failureOps));
    }

    @Override
    @NotNull
    public Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) {
        return new CursorImpl<Entry>(this.metaStorageRaftGrpSvc, this.metaStorageRaftGrpSvc.run((Command)new RangeCommand(keyFrom, keyTo, revUpperBound, this.localNodeId, uuidGenerator.randomUuid())), MetaStorageServiceImpl::singleEntryResult);
    }

    @Override
    @NotNull
    public Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
        return new CursorImpl<Entry>(this.metaStorageRaftGrpSvc, this.metaStorageRaftGrpSvc.run((Command)new RangeCommand(keyFrom, keyTo, this.localNodeId, uuidGenerator.randomUuid())), MetaStorageServiceImpl::singleEntryResult);
    }

    @Override
    @NotNull
    public CompletableFuture<IgniteUuid> watch(@Nullable ByteArray keyFrom, @Nullable ByteArray keyTo, long revision, @NotNull WatchListener lsnr) {
        CompletableFuture watchRes = this.metaStorageRaftGrpSvc.run((Command)new WatchRangeKeysCommand(keyFrom, keyTo, revision, this.localNodeId, uuidGenerator.randomUuid()));
        watchRes.thenAccept(watchId -> this.watchProcessor.addWatch((IgniteUuid)watchId, new CursorImpl<WatchEvent>(this.metaStorageRaftGrpSvc, watchRes, MetaStorageServiceImpl::watchResponse), lsnr));
        return watchRes;
    }

    @Override
    @NotNull
    public CompletableFuture<IgniteUuid> watch(@NotNull ByteArray key, long revision, @NotNull WatchListener lsnr) {
        return this.watch(key, null, revision, lsnr);
    }

    @Override
    @NotNull
    public CompletableFuture<IgniteUuid> watch(@NotNull Set<ByteArray> keys, long revision, @NotNull WatchListener lsnr) {
        CompletableFuture watchRes = this.metaStorageRaftGrpSvc.run((Command)new WatchExactKeysCommand(keys, revision, this.localNodeId, uuidGenerator.randomUuid()));
        watchRes.thenAccept(watchId -> this.watchProcessor.addWatch((IgniteUuid)watchId, new CursorImpl<WatchEvent>(this.metaStorageRaftGrpSvc, watchRes, MetaStorageServiceImpl::watchResponse), lsnr));
        return watchRes;
    }

    @Override
    @NotNull
    public CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) {
        return CompletableFuture.runAsync(() -> this.watchProcessor.stopWatch(id));
    }

    @Override
    @NotNull
    public CompletableFuture<Void> compact() {
        return null;
    }

    @Override
    @NotNull
    public CompletableFuture<Void> closeCursors(@NotNull String nodeId) {
        return this.metaStorageRaftGrpSvc.run((Command)new CursorsCloseCommand(nodeId));
    }

    private static List<OperationInfo> toOperationInfos(Collection<Operation> ops) {
        ArrayList<OperationInfo> res = new ArrayList<OperationInfo>(ops.size());
        for (Operation op : ops) {
            OperationInfo info = null;
            if (op.type() == OperationType.NO_OP) {
                info = new OperationInfo(null, null, OperationType.NO_OP);
            } else if (op.type() == OperationType.REMOVE) {
                info = new OperationInfo(((Operation.RemoveOp)op.inner()).key(), null, OperationType.REMOVE);
            } else if (op.type() == OperationType.PUT) {
                Operation.PutOp inner = (Operation.PutOp)op.inner();
                info = new OperationInfo(inner.key(), inner.value(), OperationType.PUT);
            } else assert (false) : "Unknown operation type " + op.type();
            res.add(info);
        }
        return res;
    }

    private static ConditionInfo toConditionInfo(@NotNull Condition condition) {
        ConditionInfo cnd = null;
        Condition.InnerCondition obj = condition.inner();
        if (obj instanceof Condition.ExistenceCondition) {
            Condition.ExistenceCondition inner = (Condition.ExistenceCondition)obj;
            cnd = new ConditionInfo(inner.key(), inner.type(), null, 0L);
        } else if (obj instanceof Condition.TombstoneCondition) {
            Condition.TombstoneCondition inner = (Condition.TombstoneCondition)obj;
            cnd = new ConditionInfo(inner.key(), inner.type(), null, 0L);
        } else if (obj instanceof Condition.RevisionCondition) {
            Condition.RevisionCondition inner = (Condition.RevisionCondition)obj;
            cnd = new ConditionInfo(inner.key(), inner.type(), null, inner.revision());
        } else if (obj instanceof Condition.ValueCondition) {
            Condition.ValueCondition inner = (Condition.ValueCondition)obj;
            cnd = new ConditionInfo(inner.key(), inner.type(), inner.value(), 0L);
        } else assert (false) : "Unknown condition type: " + obj.getClass().getSimpleName();
        return cnd;
    }

    private static Map<ByteArray, Entry> multipleEntryResult(Object obj) {
        MultipleEntryResponse resp = (MultipleEntryResponse)obj;
        HashMap<ByteArray, Entry> res = new HashMap<ByteArray, Entry>();
        for (SingleEntryResponse e : resp.entries()) {
            ByteArray key = new ByteArray(e.key());
            res.put(key, new EntryImpl(key, e.value(), e.revision(), e.updateCounter()));
        }
        return res;
    }

    private static Entry singleEntryResult(Object obj) {
        SingleEntryResponse resp = (SingleEntryResponse)obj;
        return new EntryImpl(new ByteArray(resp.key()), resp.value(), resp.revision(), resp.updateCounter());
    }

    private static WatchEvent watchResponse(Object obj) {
        MultipleEntryResponse resp = (MultipleEntryResponse)obj;
        ArrayList<EntryEvent> evts = new ArrayList<EntryEvent>(resp.entries().size() / 2);
        EntryImpl o = null;
        for (int i = 0; i < resp.entries().size(); ++i) {
            SingleEntryResponse s = (SingleEntryResponse)resp.entries().get(i);
            EntryImpl e = new EntryImpl(new ByteArray(s.key()), s.value(), s.revision(), s.updateCounter());
            if (i % 2 == 0) {
                o = e;
                continue;
            }
            EntryImpl n = e;
            evts.add(new EntryEvent(o, n));
        }
        return new WatchEvent(evts);
    }

    private final class WatchProcessor {
        private final Map<IgniteUuid, Watcher> watchers = new ConcurrentHashMap<IgniteUuid, Watcher>();

        private WatchProcessor() {
        }

        private void addWatch(IgniteUuid watchId, CursorImpl<WatchEvent> cursor, WatchListener lsnr) {
            Watcher watcher = new Watcher(cursor, lsnr);
            this.watchers.put(watchId, watcher);
            watcher.start();
        }

        private void stopWatch(IgniteUuid watchId) {
            this.watchers.computeIfPresent(watchId, (k, v) -> {
                CompletableFuture.runAsync(() -> {
                    v.stop = true;
                    v.interrupt();
                }).thenRun(() -> {
                    try {
                        Thread.sleep(100L);
                        v.cursor.close();
                    }
                    catch (InterruptedException e) {
                        throw new IgniteInternalException((Throwable)e);
                    }
                    catch (Exception e) {
                        if (e instanceof IgniteInternalException && e.getCause().getCause() instanceof RejectedExecutionException) {
                            LOG.warn("Cursor close command was rejected because raft executor has been already stopped.", new Object[0]);
                            return;
                        }
                        LOG.error("Unexpected exception", (Throwable)e);
                    }
                });
                return null;
            });
        }

        private final class Watcher
        extends Thread {
            private volatile boolean stop = false;
            private Cursor<WatchEvent> cursor;
            private WatchListener lsnr;

            Watcher(Cursor<WatchEvent> cursor, WatchListener lsnr) {
                this.cursor = cursor;
                this.lsnr = lsnr;
            }

            @Override
            public void run() {
                Iterator watchEvtsIter = this.cursor.iterator();
                while (!this.stop) {
                    try {
                        if (watchEvtsIter.hasNext()) {
                            WatchEvent watchEvt = null;
                            try {
                                watchEvt = (WatchEvent)watchEvtsIter.next();
                            }
                            catch (Throwable e) {
                                this.lsnr.onError(e);
                            }
                            assert (watchEvt != null);
                            this.lsnr.onUpdate(watchEvt);
                            continue;
                        }
                        Thread.sleep(10L);
                    }
                    catch (Throwable e) {
                        if (e instanceof NodeStoppingException || e.getCause() instanceof NodeStoppingException) break;
                        if ((e instanceof InterruptedException || e.getCause() instanceof InterruptedException) && this.stop) {
                            LOG.debug("Watcher has been stopped during node's stop", new Object[0]);
                            break;
                        }
                        LOG.error("Unexpected exception", e);
                    }
                }
            }
        }
    }
}

