/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.server;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.MorphiumConfig;
import de.caluga.morphium.Utils;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.driver.Doc;
import de.caluga.morphium.driver.DriverTailableIterationCallback;
import de.caluga.morphium.driver.MorphiumDriverException;
import de.caluga.morphium.driver.bson.MongoTimestamp;
import de.caluga.morphium.driver.commands.GenericCommand;
import de.caluga.morphium.driver.commands.MongoCommand;
import de.caluga.morphium.driver.commands.WatchCommand;
import de.caluga.morphium.driver.inmem.InMemoryDriver;
import de.caluga.morphium.driver.wire.HelloResult;
import de.caluga.morphium.driver.wireprotocol.OpCompressed;
import de.caluga.morphium.driver.wireprotocol.OpMsg;
import de.caluga.morphium.driver.wireprotocol.OpQuery;
import de.caluga.morphium.driver.wireprotocol.OpReply;
import de.caluga.morphium.driver.wireprotocol.WireProtocolMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MorphiumServer {
    private static Logger log = LoggerFactory.getLogger(MorphiumServer.class);
    private InMemoryDriver drv;
    private int port;
    private String host;
    private AtomicInteger msgId = new AtomicInteger(1000);
    private AtomicInteger cursorId = new AtomicInteger(1000);
    private ThreadPoolExecutor executor;
    private boolean running = true;
    private ServerSocket serverSocket;
    private static int compressorId = 0;
    private static String rsName;
    private static String hostSeed;
    private static List<String> hosts;

    public MorphiumServer(int port, String host, int maxThreads, int minThreads) {
        this.drv = new InMemoryDriver();
        this.port = port;
        this.host = host;
        this.drv.connect();
        this.executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
        this.executor.setMaximumPoolSize(maxThreads);
        this.executor.setCorePoolSize(minThreads);
    }

    public MorphiumServer() {
        this(17017, "localhost", 100, 10);
    }

    public static void main(String[] args) throws Exception {
        int rsport;
        int idx = 0;
        log.info("Starting up server... parsing commandline params");
        String host = "localhost";
        int port = 17017;
        int maxThreads = 1000;
        int minThreads = 10;
        rsName = "";
        hostSeed = "";
        block24: while (idx < args.length) {
            switch (args[idx]) {
                case "-p": 
                case "--port": {
                    port = Integer.parseInt(args[idx + 1]);
                    idx += 2;
                    break;
                }
                case "-mt": 
                case "--maxThreads": {
                    maxThreads = Integer.parseInt(args[idx + 1]);
                    idx += 2;
                    break;
                }
                case "-mint": 
                case "--minThreads": {
                    minThreads = Integer.parseInt(args[idx + 1]);
                    idx += 2;
                    break;
                }
                case "-h": 
                case "--host": {
                    host = args[idx + 1];
                    idx += 2;
                    break;
                }
                case "-rs": 
                case "--replicaset": {
                    rsName = args[idx + 1];
                    hostSeed = args[idx + 2];
                    idx += 3;
                    hosts = new ArrayList<String>();
                    for (String s : hostSeed.split(",")) {
                        rsport = 27017;
                        String hst = s;
                        if (s.contains(":")) {
                            rsport = Integer.parseInt(s.split(":")[1]);
                            hst = s.split(":")[0];
                        }
                        hosts.add(hst + ":" + rsport);
                    }
                    continue block24;
                }
                case "-c": 
                case "--compressor": {
                    if (args[idx + 1].equals("snappy")) {
                        compressorId = 1;
                    } else if (args[idx + 1].equals("zstd")) {
                        compressorId = 3;
                    } else if (args[idx + 1].equals("none")) {
                        compressorId = 0;
                    } else if (args[idx + 1].equals("zlib")) {
                        compressorId = 2;
                    } else {
                        log.error("Unknown parameter for compressor {}", (Object)args[idx + 1]);
                        System.exit(1);
                    }
                    idx += 2;
                    break;
                }
                default: {
                    log.error("unknown parameter " + args[idx]);
                    System.exit(1);
                }
            }
        }
        log.info("Starting server...");
        MorphiumServer srv = new MorphiumServer(port, host, maxThreads, minThreads);
        srv.start();
        if (!rsName.isEmpty()) {
            log.info("Building replicaset with seed {}", (Object)hostSeed);
            InetAddress inetAddress = InetAddress.getLocalHost();
            String hostname = inetAddress.getHostName();
            String ipAddress = inetAddress.getHostAddress();
            block26: for (String h : hosts) {
                rsport = 17017;
                if (h.contains(":")) {
                    rsport = Integer.parseInt(h.split(":")[1]);
                    h = h.split(":")[0];
                }
                if (h.equals(ipAddress) || h.equals(hostname)) continue;
                while (true) {
                    try {
                        MorphiumConfig cfg = new MorphiumConfig("admin", 10, 10000, 1000);
                        cfg.setDriverName("SingleMongoConnectDriver");
                        cfg.setHostSeed(h + ":" + rsport);
                        Morphium morphium = new Morphium(cfg);
                        for (String db : morphium.listDatabases()) {
                            MorphiumConfig cfg2 = new MorphiumConfig(db, 10, 10000, 1000);
                            cfg2.setDriverName("SingleMongoConnectDriver");
                            cfg2.setHostSeed(h + ":" + rsport);
                            Morphium morphium2 = new Morphium(cfg2);
                            ChangeStreamMonitor mtr = new ChangeStreamMonitor(morphium2, null, true);
                            mtr.addListener(evt -> {
                                if (evt.getOperationType().equals("insert")) {
                                    try {
                                        srv.getDriver().insert(db, evt.getCollectionName(), List.of(evt.getFullDocument()), null);
                                    }
                                    catch (MorphiumDriverException e) {
                                        log.error("Exception", (Throwable)e);
                                    }
                                } else if (evt.getOperationType().equals("delete")) {
                                    try {
                                        srv.getDriver().delete(db, evt.getCollectionName(), Map.of("_id", evt.getDocumentKey()), null, false, null, null);
                                    }
                                    catch (MorphiumDriverException e) {
                                        log.error("Exception", (Throwable)e);
                                    }
                                } else if (evt.getOperationType().equals("update")) {
                                    try {
                                        srv.getDriver().insert(db, evt.getCollectionName(), List.of(evt.getFullDocument()), null);
                                    }
                                    catch (MorphiumDriverException e) {
                                        log.error("Exception", (Throwable)e);
                                    }
                                } else {
                                    log.error("Cannot process command {}", (Object)evt.getOperationType());
                                }
                                return true;
                            });
                            mtr.start();
                        }
                        morphium.close();
                        continue block26;
                    }
                    catch (Exception e) {
                        log.error("Could not setup replicaset", (Throwable)e);
                        Thread.sleep(5000L);
                        continue;
                    }
                    break;
                }
            }
        }
        while (srv.running) {
            log.info("Alive and kickin'");
            Thread.sleep(10000L);
        }
    }

    private InMemoryDriver getDriver() {
        return this.drv;
    }

    private HelloResult getHelloResult() {
        HelloResult res = new HelloResult();
        res.setHelloOk(true);
        res.setLocalTime(new Date());
        res.setOk(1.0);
        if (hosts == null || hosts.isEmpty()) {
            res.setHosts(Arrays.asList(this.host + ":" + this.port));
        } else {
            res.setHosts(hosts);
        }
        res.setConnectionId(1);
        res.setMaxWireVersion(17);
        res.setMinWireVersion(13);
        res.setMaxMessageSizeBytes(100000);
        res.setMaxBsonObjectSize(10000);
        res.setWritablePrimary(true);
        res.setMe(this.host + ":" + this.port);
        res.setMsg("MorphiumServer V0.1ALPHA");
        return res;
    }

    public int getConnectionCount() {
        return this.executor.getActiveCount();
    }

    public void start() throws IOException, InterruptedException {
        log.info("Opening port " + this.port);
        this.serverSocket = new ServerSocket(this.port);
        this.drv.setHostSeed(this.host + ":" + this.port);
        this.executor.prestartAllCoreThreads();
        log.info("Port opened, waiting for incoming connections");
        new Thread(() -> {
            while (this.running) {
                Socket s = null;
                try {
                    s = this.serverSocket.accept();
                }
                catch (IOException e) {
                    if (e.getMessage().contains("Socket closed")) {
                        log.info("Server socket closed");
                        break;
                    }
                    log.error("Serversocket error", (Throwable)e);
                    this.terminate();
                    break;
                }
                log.info("Incoming connection: " + this.executor.getPoolSize());
                Socket finalS = s;
                this.executor.execute(() -> this.incoming(finalS));
            }
        }).start();
    }

    public void incoming(Socket s) {
        log.info("handling incoming connection...{}", (Object)this.executor.getPoolSize());
        try {
            s.setSoTimeout(0);
            s.setTcpNoDelay(true);
            s.setKeepAlive(true);
            InputStream in = s.getInputStream();
            OutputStream out = s.getOutputStream();
            int id = 0;
            Map<String, Object> answer = this.getHelloResult().toMsg();
            while (s.isConnected()) {
                log.info("Thread {} waiting for incoming message", (Object)Thread.currentThread().getId());
                WireProtocolMessage msg = WireProtocolMessage.parseFromStream(in);
                log.info("---> Thread {} got message", (Object)Thread.currentThread().getId());
                if (msg == null) {
                    log.info("Null");
                    break;
                }
                Map<String, Object> doc = null;
                if (msg instanceof OpQuery) {
                    OpReply r;
                    OpQuery q = (OpQuery)msg;
                    id = q.getMessageId();
                    doc = q.getDoc();
                    if (doc.containsKey("ismaster") || doc.containsKey("isMaster")) {
                        log.info("OpQuery->isMaster");
                        r = new OpReply();
                        r.setFlags(2);
                        r.setMessageId(this.msgId.incrementAndGet());
                        r.setResponseTo(id);
                        r.setNumReturned(1);
                        HelloResult res = this.getHelloResult();
                        log.info("Sending isMaster response via OpReply: {}", res.toMsg());
                        r.setDocuments(Arrays.asList(res.toMsg()));
                        if (compressorId != 0) {
                            OpCompressed cmp = new OpCompressed();
                            cmp.setMessageId(r.getMessageId());
                            cmp.setResponseTo(id);
                            cmp.setOriginalOpCode(r.getOpCode());
                            cmp.setCompressorId(compressorId);
                            byte[] originalPayload = r.getPayload();
                            cmp.setUncompressedSize(originalPayload.length);
                            cmp.setCompressedMessage(originalPayload);
                            log.info("Sending compressed OpReply: {} bytes (uncompressed: {} bytes)", (Object)cmp.bytes().length, (Object)originalPayload.length);
                            out.write(cmp.bytes());
                        } else {
                            log.info("Sending OpReply: {} bytes", (Object)r.bytes().length);
                            out.write(r.bytes());
                        }
                        out.flush();
                        log.info("Sent isMaster result via OpQuery");
                        continue;
                    }
                    r = new OpReply();
                    Object d = Doc.of("$err", "OP_QUERY is no longer supported. The client driver may require an upgrade.", "code", (Object)5739101, "ok", (Object)0.0);
                    r.setFlags(2);
                    r.setMessageId(this.msgId.incrementAndGet());
                    r.setResponseTo(id);
                    r.setNumReturned(1);
                    r.setDocuments(Arrays.asList(d));
                    out.write(r.bytes());
                    out.flush();
                    log.info("Sent out error because OPQuery not allowed anymore!");
                    log.info(Utils.toJsonString(doc));
                    continue;
                }
                if (msg instanceof OpMsg) {
                    OpMsg m = (OpMsg)msg;
                    doc = ((OpMsg)msg).getFirstDoc();
                    log.info("Message flags: " + m.getFlags());
                    id = m.getMessageId();
                }
                log.info("Incoming " + Utils.toJsonString(doc));
                String cmd = (String)doc.keySet().stream().findFirst().get();
                log.info("Handling command " + cmd);
                OpMsg reply = new OpMsg();
                reply.setResponseTo(msg.getMessageId());
                reply.setMessageId(this.msgId.incrementAndGet());
                switch (cmd) {
                    case "getCmdLineOpts": {
                        answer = Doc.of("argv", List.of(), "parsed", Map.of());
                        break;
                    }
                    case "buildInfo": {
                        answer = Doc.of("version", "5.0.0-ALPHA", "buildEnvironment", Doc.of("distarch", "java", "targetarch", "java"));
                        answer.put("ok", 1.0);
                        reply.setFirstDoc(answer);
                        break;
                    }
                    case "ismaster": 
                    case "isMaster": 
                    case "hello": {
                        log.info("OpMsg->hello/ismaster");
                        answer = this.getHelloResult().toMsg();
                        log.info("Hello response: {}", (Object)answer);
                        break;
                    }
                    case "getFreeMonitoringStatus": {
                        answer = Doc.of("state", "disabled", "message", "", "url", "", "userReminder", "", "ok", (Object)1.0);
                        break;
                    }
                    case "ping": {
                        answer = Doc.of("ok", (Object)1.0);
                        break;
                    }
                    case "endSessions": {
                        answer = Doc.of("ok", (Object)1.0);
                        break;
                    }
                    case "startSession": {
                        answer = Doc.of("id", Map.of("id", UUID.randomUUID()), "timeoutMinutes", (Object)30, "ok", (Object)1.0);
                        break;
                    }
                    case "refreshSessions": {
                        answer = Doc.of("ok", (Object)1.0);
                        break;
                    }
                    case "abortTransaction": 
                    case "commitTransaction": {
                        answer = Doc.of("ok", (Object)1.0);
                        break;
                    }
                    case "getLog": {
                        if (doc.get(cmd).equals("startupWarnings")) {
                            answer = Doc.of("totalLinesWritten", (Object)0, "log", List.of(), "ok", (Object)1.0);
                            break;
                        }
                        log.warn("Unknown log " + String.valueOf(doc.get(cmd)));
                        answer = Doc.of("ok", (Object)0, "errmsg", "unknown logr");
                        break;
                    }
                    case "getParameter": {
                        if (Integer.valueOf(1).equals(doc.get("featureCompatibilityVersion"))) {
                            answer = Doc.of("version", "5.0", "ok", (Object)1.0);
                            break;
                        }
                        answer = Doc.of("ok", (Object)0, "errmsg", "no such parameter");
                        break;
                    }
                    default: {
                        try {
                            AtomicInteger msgid = new AtomicInteger(0);
                            if (doc.containsKey("pipeline") && ((Map)((List)doc.get("pipeline")).get(0)).containsKey("$changeStream")) {
                                MongoCommand wcmd = new WatchCommand(this.drv).fromMap((Map)doc);
                                int myCursorId = this.cursorId.incrementAndGet();
                                ((WatchCommand)wcmd).setCb(new DriverTailableIterationCallback(){
                                    private boolean first = true;
                                    private String batch = "firstBatch";
                                    final /* synthetic */ WatchCommand val$wcmd;
                                    final /* synthetic */ int val$myCursorId;
                                    final /* synthetic */ OpMsg val$reply;
                                    final /* synthetic */ OutputStream val$out;
                                    {
                                        this.val$wcmd = watchCommand;
                                        this.val$myCursorId = n;
                                        this.val$reply = opMsg;
                                        this.val$out = outputStream;
                                    }

                                    @Override
                                    public void incomingData(Map<String, Object> data, long dur) {
                                        try {
                                            Doc crs = Doc.of(this.batch, List.of(data), "ns", this.val$wcmd.getDb() + "." + this.val$wcmd.getColl(), "id", (Object)this.val$myCursorId);
                                            Doc answer = Doc.of("ok", (Object)1.0);
                                            if (crs != null) {
                                                answer.put("cursor", crs);
                                            }
                                            answer.put("$clusterTime", Doc.of("clusterTime", new MongoTimestamp(System.currentTimeMillis())));
                                            answer.put("operationTime", new MongoTimestamp(System.currentTimeMillis()));
                                            this.val$reply.setFirstDoc(answer);
                                            if (this.first) {
                                                this.first = false;
                                                this.batch = "nextBatch";
                                            }
                                            if (compressorId != 0) {
                                                OpCompressed cmp = new OpCompressed();
                                                cmp.setMessageId(this.val$reply.getMessageId());
                                                cmp.setResponseTo(this.val$reply.getResponseTo());
                                                cmp.setCompressedMessage(this.val$reply.bytes());
                                                this.val$out.write(cmp.bytes());
                                            } else {
                                                this.val$out.write(this.val$reply.bytes());
                                            }
                                            this.val$out.flush();
                                        }
                                        catch (Exception e) {
                                            log.error("Errror during watch", (Throwable)e);
                                        }
                                    }

                                    @Override
                                    public boolean isContinued() {
                                        return true;
                                    }
                                });
                                int mid = this.drv.runCommand((WatchCommand)wcmd);
                                msgid.set(mid);
                            } else {
                                msgid.set(this.drv.runCommand((GenericCommand)new GenericCommand(this.drv).fromMap((Map)doc)));
                            }
                            Map<String, Object> crs = this.drv.readSingleAnswer(msgid.get());
                            answer = Doc.of("ok", (Object)1.0);
                            if (crs == null) break;
                            answer.putAll(crs);
                            break;
                        }
                        catch (Exception e) {
                            answer = Doc.of("ok", (Object)0, "errmsg", "no such command: '{}" + cmd + "'");
                            log.error("No such command {}", (Object)cmd, (Object)e);
                        }
                    }
                }
                answer.put("$clusterTime", Doc.of("clusterTime", new MongoTimestamp(System.currentTimeMillis())));
                answer.put("operationTime", new MongoTimestamp(System.currentTimeMillis()));
                reply.setFirstDoc(answer);
                log.info("Final response being sent: {}", answer);
                if (compressorId != 0) {
                    OpCompressed cmsg = new OpCompressed();
                    cmsg.setMessageId(reply.getMessageId());
                    cmsg.setResponseTo(reply.getResponseTo());
                    cmsg.setCompressorId(compressorId);
                    cmsg.setOriginalOpCode(reply.getOpCode());
                    byte[] originalPayload = reply.getPayload();
                    cmsg.setUncompressedSize(originalPayload.length);
                    cmsg.setCompressedMessage(originalPayload);
                    byte[] b = cmsg.bytes();
                    log.info("Server sending {} bytes (compressed), uncompressed: {} bytes, responseTo: {}", new Object[]{b.length, originalPayload.length, cmsg.getResponseTo()});
                    out.write(b);
                } else {
                    byte[] b = reply.bytes();
                    log.info("Server sending {} bytes, responseTo: {}", (Object)b.length, (Object)reply.getResponseTo());
                    out.write(b);
                }
                out.flush();
                log.info("Sent answer for cmd: {}", (Object)cmd);
            }
            s.close();
            in.close();
            out.close();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        log.info("Thread finished!");
        s = null;
    }

    public void terminate() {
        this.running = false;
        if (this.serverSocket != null) {
            try {
                this.serverSocket.close();
                this.serverSocket = null;
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        this.executor.shutdownNow();
        this.executor = null;
    }
}

