/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.ml.inference.builder;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.ml.inference.Model;
import org.apache.ignite.ml.inference.builder.AsyncModelBuilder;
import org.apache.ignite.ml.inference.parser.ModelParser;
import org.apache.ignite.ml.inference.reader.ModelReader;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;

public class IgniteDistributedModelBuilder
implements AsyncModelBuilder {
    private static final String INFERENCE_SERVICE_NAME_PATTERN = "inference_service_%s";
    private static final String INFERENCE_REQUEST_QUEUE_NAME_PATTERN = "inference_queue_req_%s";
    private static final String INFERENCE_RESPONSE_QUEUE_NAME_PATTERN = "inference_queue_res_%s";
    private static final int QUEUE_CAPACITY = 100;
    private static final CollectionConfiguration queueCfg = new CollectionConfiguration();
    private final Ignite ignite;
    private final int instances;
    private final int maxPerNode;

    public IgniteDistributedModelBuilder(Ignite ignite, int instances, int maxPerNode) {
        this.ignite = ignite;
        this.instances = instances;
        this.maxPerNode = maxPerNode;
    }

    @Override
    public <I extends Serializable, O extends Serializable> Model<I, Future<O>> build(ModelReader reader, ModelParser<I, O, ?> parser) {
        return new DistributedInfModel<I, O>(this.ignite, UUID.randomUUID().toString(), reader, parser, this.instances, this.maxPerNode);
    }

    private static class IgniteDistributedInfModelService<I extends Serializable, O extends Serializable>
    implements Service {
        private static final long serialVersionUID = -3596084917874395597L;
        private final ModelReader reader;
        private final ModelParser<I, O, ?> parser;
        private final String suffix;
        private transient IgniteQueue<I> reqQueue;
        private transient IgniteQueue<O> resQueue;
        private transient Model<I, O> mdl;

        IgniteDistributedInfModelService(ModelReader reader, ModelParser<I, O, ?> parser, String suffix) {
            this.reader = reader;
            this.parser = parser;
            this.suffix = suffix;
        }

        public void init(ServiceContext ctx) {
            Ignite ignite = Ignition.localIgnite();
            this.reqQueue = ignite.queue(String.format(IgniteDistributedModelBuilder.INFERENCE_REQUEST_QUEUE_NAME_PATTERN, this.suffix), 100, queueCfg);
            this.resQueue = ignite.queue(String.format(IgniteDistributedModelBuilder.INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, this.suffix), 100, queueCfg);
            this.mdl = this.parser.parse(this.reader.read());
        }

        public void execute(ServiceContext ctx) {
            while (!ctx.isCancelled()) {
                Serializable req;
                try {
                    req = (Serializable)this.reqQueue.take();
                }
                catch (IllegalStateException e) {
                    if (this.reqQueue.removed()) continue;
                    throw e;
                }
                Serializable res = (Serializable)this.mdl.predict(req);
                try {
                    this.resQueue.put((Object)res);
                }
                catch (IllegalStateException e) {
                    if (this.resQueue.removed()) continue;
                    throw e;
                }
            }
        }

        public void cancel(ServiceContext ctx) {
        }
    }

    private static class DistributedInfModel<I extends Serializable, O extends Serializable>
    implements Model<I, Future<O>> {
        private final Ignite ignite;
        private final String suffix;
        private final IgniteQueue<I> reqQueue;
        private final IgniteQueue<O> resQueue;
        private final BlockingQueue<CompletableFuture<O>> futures = new ArrayBlockingQueue<CompletableFuture<O>>(100);
        private final ExecutorService receiverThreadPool = Executors.newSingleThreadExecutor();
        private final AtomicBoolean running = new AtomicBoolean(false);
        private volatile Future<?> receiverFut;

        DistributedInfModel(Ignite ignite, String suffix, ModelReader reader, ModelParser<I, O, ?> parser, int instances, int maxPerNode) {
            this.ignite = ignite;
            this.suffix = suffix;
            this.reqQueue = ignite.queue(String.format(IgniteDistributedModelBuilder.INFERENCE_REQUEST_QUEUE_NAME_PATTERN, suffix), 100, queueCfg);
            this.resQueue = ignite.queue(String.format(IgniteDistributedModelBuilder.INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, suffix), 100, queueCfg);
            this.startReceiver();
            this.startService(reader, parser, instances, maxPerNode);
            this.running.set(true);
        }

        @Override
        public Future<O> predict(I input) {
            if (!this.running.get()) {
                throw new IllegalStateException("Inference model is not running");
            }
            CompletableFuture fut = new CompletableFuture();
            try {
                this.futures.put(fut);
            }
            catch (InterruptedException e) {
                this.close();
                throw new RuntimeException(e);
            }
            this.reqQueue.put(input);
            return fut;
        }

        private void startService(ModelReader reader, ModelParser<I, O, ?> parser, int instances, int maxPerNode) {
            this.ignite.services().deployMultiple(String.format(IgniteDistributedModelBuilder.INFERENCE_SERVICE_NAME_PATTERN, this.suffix), new IgniteDistributedInfModelService<I, O>(reader, parser, this.suffix), instances, maxPerNode);
        }

        private void stopService() {
            this.ignite.services().cancel(String.format(IgniteDistributedModelBuilder.INFERENCE_SERVICE_NAME_PATTERN, this.suffix));
        }

        private void startReceiver() {
            this.receiverFut = this.receiverThreadPool.submit(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        Serializable res;
                        try {
                            res = (Serializable)this.resQueue.take();
                        }
                        catch (IllegalStateException e) {
                            if (this.resQueue.removed()) continue;
                            throw e;
                        }
                        CompletableFuture fut = (CompletableFuture)this.futures.remove();
                        fut.complete(res);
                    }
                }
                finally {
                    this.close();
                    while (!this.futures.isEmpty()) {
                        CompletableFuture fut = (CompletableFuture)this.futures.remove();
                        fut.cancel(true);
                    }
                }
            });
        }

        private void stopReceiver() {
            if (this.receiverFut != null && !this.receiverFut.isDone()) {
                this.receiverFut.cancel(true);
            }
            this.receiverThreadPool.shutdown();
        }

        private void removeQueues() {
            this.reqQueue.close();
            this.resQueue.close();
        }

        @Override
        public void close() {
            boolean runningBefore = this.running.getAndSet(false);
            if (runningBefore) {
                this.stopService();
                this.stopReceiver();
                this.removeQueues();
            }
        }
    }
}

