/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services.rlserver;

import com.mongodb.ConnectionString;
import com.mongodb.client.model.Indexes;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.nustaq.kontraktor.services.ClusterCfg;
import org.nustaq.kontraktor.services.ServiceRegistry;
import org.nustaq.kontraktor.services.datacluster.DataCfg;
import org.nustaq.kontraktor.services.datacluster.DataShard;
import org.nustaq.kontraktor.services.datacluster.DataShardArgs;
import org.nustaq.kontraktor.services.rlserver.SimpleRLConfig;
import org.nustaq.kontraktor.services.rlserver.SingleProcessRLClusterArgs;
import org.nustaq.kontraktor.services.rlserver.mongodb.MongoPersistance;
import org.nustaq.reallive.api.RecordStorage;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.client.EmbeddedRealLive;
import org.nustaq.reallive.server.storage.CachedOffHeapStorage;
import org.nustaq.reallive.server.storage.HeapRecordStorage;
import org.nustaq.reallive.server.storage.RecordPersistance;

public class SingleProcessRLCluster {
    private static SingleProcessRLClusterArgs options;
    public static MongoClient mongo;
    public static MongoDatabase mongoDB;

    public static void main(String[] args) throws InterruptedException {
        if (!new File("./etc").exists()) {
            System.out.println("Please start with working dir [project]");
            System.exit(1);
        }
        options = (SingleProcessRLClusterArgs)ServiceRegistry.parseCommandLine(args, null, new SingleProcessRLClusterArgs());
        SimpleRLConfig scfg = SimpleRLConfig.read();
        if (scfg.mongoConnection != null) {
            EmbeddedRealLive.sCustomRecordStorage.put("MONGO", desc -> SingleProcessRLCluster.createOrConnectMongoDBStorage(desc, scfg));
        }
        ClusterCfg cfg = new ClusterCfg();
        DataCfg datacfg = new DataCfg();
        datacfg.schema(scfg.tables);
        String[] dirs = new String[scfg.numNodes];
        for (int i = 0; i < dirs.length; ++i) {
            dirs[i] = scfg.dataDir;
        }
        datacfg.dataDir(dirs);
        cfg.dataCluster(datacfg);
        ServiceRegistry.start(options, cfg);
        Thread.sleep(1000L);
        ExecutorService ex = Executors.newCachedThreadPool();
        int i = 0;
        while (i < cfg.getDataCluster().getNumberOfShards()) {
            int finalI = i++;
            ex.execute(() -> DataShard.start(DataShardArgs.from(options, finalI)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static RecordStorage createOrConnectMongoDBStorage(TableDescription desc, SimpleRLConfig scfg) {
        Class<SingleProcessRLCluster> clazz = SingleProcessRLCluster.class;
        synchronized (SingleProcessRLCluster.class) {
            if (mongoDB == null) {
                ConnectionString con = new ConnectionString(scfg.mongoConnection);
                mongo = MongoClients.create((String)scfg.mongoConnection);
                mongoDB = mongo.getDatabase(con.getDatabase());
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            MongoCollection collection = mongoDB.getCollection(desc.getName());
            collection.createIndex(Indexes.hashed((String)"key"));
            return new CachedOffHeapStorage((RecordPersistance)new MongoPersistance(collection, desc), new HeapRecordStorage());
        }
    }
}

