/*
 * Decompiled with CFR 0.152.
 */
package io.vena.bosk.drivers.mongo;

import com.mongodb.MongoException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.UpdateDescription;
import com.mongodb.lang.Nullable;
import io.vena.bosk.BoskDriver;
import io.vena.bosk.Reference;
import io.vena.bosk.StateTreeNode;
import io.vena.bosk.drivers.mongo.Formatter;
import io.vena.bosk.drivers.mongo.MongoDriverSettings;
import io.vena.bosk.drivers.mongo.MongoReceiver;
import io.vena.bosk.exceptions.FlushFailureException;
import io.vena.bosk.exceptions.InvalidTypeException;
import io.vena.bosk.exceptions.NotYetImplementedException;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SingleDocumentMongoChangeStreamReceiver<R extends StateTreeNode>
implements MongoReceiver<R> {
    private final Formatter formatter;
    private final BoskDriver<R> downstream;
    private final Reference<R> rootRef;
    private final MongoDriverSettings settings;
    private final ExecutorService ex = Executors.newFixedThreadPool(1);
    private final ConcurrentHashMap<String, BlockingQueue<BsonDocument>> echoListeners = new ConcurrentHashMap();
    private final Map<BsonInt64, Runnable> updateListeners = new TreeMap<BsonInt64, Runnable>();
    private final MongoCollection<Document> collection;
    private final String identityString = String.format("%08x", System.identityHashCode(this));
    private volatile MongoCursor<ChangeStreamDocument<Document>> eventCursor;
    private volatile BsonDocument lastProcessedResumeToken = null;
    private volatile BsonInt64 lastProcessedRevision = null;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private static final Set<String> ALREADY_WARNED = Collections.newSetFromMap(new ConcurrentHashMap());
    private static final BsonDocument DOCUMENT_FILTER = new BsonDocument("_id", (BsonValue)new BsonString("boskDocument"));
    private static final Logger LOGGER = LoggerFactory.getLogger(SingleDocumentMongoChangeStreamReceiver.class);

    SingleDocumentMongoChangeStreamReceiver(BoskDriver<R> downstream, Reference<R> rootRef, MongoCollection<Document> collection, Formatter formatter, MongoDriverSettings settings) {
        this.downstream = downstream;
        this.rootRef = rootRef;
        this.formatter = formatter;
        this.settings = settings;
        this.collection = collection;
        this.eventCursor = collection.watch().iterator();
        LOGGER.debug("Initiate event processing loop for mcsr-{}: collection=\"{}\"", (Object)this.identityString, (Object)collection.getNamespace().getCollectionName());
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("mcsr-{} initialization", (Object)this.identityString, (Object)new Exception("Stack trace"));
        }
        this.ex.submit(this::eventProcessingLoop);
    }

    @Override
    public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException {
        return (R)this.downstream.initialRoot(rootType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void awaitLatestRevision() throws InterruptedException, IOException {
        BsonInt64 requiredRevision = this.readRevisionNumber();
        BsonInt64 actualRevision = this.lastProcessedRevision;
        if (actualRevision != null && actualRevision.compareTo(requiredRevision) >= 0) {
            LOGGER.debug("| Already seen {}", (Object)requiredRevision);
            return;
        }
        Semaphore finished = new Semaphore(0);
        LOGGER.debug("| Waiting for {}", (Object)requiredRevision);
        Map<BsonInt64, Runnable> map = this.updateListeners;
        synchronized (map) {
            this.updateListeners.compute(requiredRevision, (seq, nextListener) -> () -> {
                finished.release();
                if (nextListener == null) {
                    LOGGER.debug("| Done waiting for {}", (Object)requiredRevision);
                } else {
                    nextListener.run();
                }
            });
        }
        this.runUpdateListeners();
        if (!finished.tryAcquire(this.settings.flushTimeoutMS(), TimeUnit.MILLISECONDS)) {
            LOGGER.debug("| Flush timeout on mcsr-{} awaiting revision {}", (Object)this.identityString, (Object)requiredRevision);
            throw new FlushFailureException("Flush timeout on after " + this.settings.flushTimeoutMS() + "ms on receiver " + this.identityString + " awaiting revision " + requiredRevision);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private BsonInt64 readRevisionNumber() {
        try (MongoCursor cursor = this.collection.find((Bson)DOCUMENT_FILTER).limit(1).projection(Projections.fields((Bson[])new Bson[]{Projections.include((String[])new String[]{Formatter.DocumentFields.revision.name()})})).cursor();){
            Document doc = (Document)cursor.next();
            Long result = (Long)doc.get((Object)Formatter.DocumentFields.revision.name(), Long.class);
            if (result == null) {
                BsonInt64 bsonInt642 = Formatter.REVISION_ZERO;
                return bsonInt642;
            }
            BsonInt64 bsonInt64 = new BsonInt64(result.longValue());
            return bsonInt64;
        }
        catch (NoSuchElementException e) {
            return Formatter.REVISION_ZERO;
        }
    }

    private void bumpLastProcessedRevision(@Nullable BsonDocument updatedFields) {
        if (updatedFields != null) {
            BsonInt64 newValue = updatedFields.getInt64((Object)Formatter.DocumentFields.revision.name(), null);
            if (newValue == null) {
                LOGGER.warn("| No revision field");
            } else {
                LOGGER.debug("| Revision {}", (Object)newValue);
                this.lastProcessedRevision = newValue;
                this.runUpdateListeners();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runUpdateListeners() {
        BsonInt64 lastProcessedRevision = this.lastProcessedRevision;
        if (lastProcessedRevision == null) {
            return;
        }
        Map<BsonInt64, Runnable> map = this.updateListeners;
        synchronized (map) {
            Map.Entry<BsonInt64, Runnable> entry;
            Iterator<Map.Entry<BsonInt64, Runnable>> iter = this.updateListeners.entrySet().iterator();
            while (iter.hasNext() && (entry = iter.next()).getKey().compareTo(lastProcessedRevision) <= 0) {
                entry.getValue().run();
                iter.remove();
            }
        }
    }

    @Override
    public void flushDownstream() throws InterruptedException, IOException {
        LOGGER.debug("| Downstream flush");
        this.downstream.flush();
    }

    @Override
    public void putEchoListener(String echoToken, BlockingQueue<BsonDocument> listener) {
        BlockingQueue<BsonDocument> existing = this.echoListeners.put(echoToken, listener);
        if (existing != null) {
            throw new IllegalStateException("Cannot have two listeners for the same echo token");
        }
    }

    @Override
    public BlockingQueue<BsonDocument> removeEchoListener(String echoToken) {
        return this.echoListeners.remove(echoToken);
    }

    private void eventProcessingLoop() {
        block15: {
            String oldName = Thread.currentThread().getName();
            Thread.currentThread().setName("mcsr-" + this.identityString);
            block11: while (true) {
                try {
                    while (!this.ex.isShutdown()) {
                        ChangeStreamDocument event;
                        try {
                            if (this.settings.testing().eventDelayMS() > 0L) {
                                LOGGER.debug("- Sleeping");
                                try {
                                    Thread.sleep(this.settings.testing().eventDelayMS());
                                }
                                catch (InterruptedException e) {
                                    LOGGER.debug("| Interrupted");
                                }
                            }
                            LOGGER.debug("- Awaiting event");
                            event = (ChangeStreamDocument)this.eventCursor.next();
                        }
                        catch (MongoException e) {
                            if (this.isClosed.get()) {
                                LOGGER.trace("Receiver is closed. Exiting event processing loop", (Throwable)e);
                                break block15;
                            }
                            LOGGER.warn("Lost change stream cursor; reconnecting", (Throwable)e);
                            this.reconnectCursor();
                            continue;
                        }
                        try {
                            this.processEvent((ChangeStreamDocument<Document>)event);
                            continue block11;
                        }
                        catch (Throwable e) {
                            LOGGER.error("Unable to process event: " + event, e);
                        }
                    }
                    break block15;
                }
                catch (Throwable e) {
                    LOGGER.error("Fatal error on MongoDB event processing thread", e);
                    throw e;
                }
            }
            finally {
                LOGGER.debug("Terminating MongoDB event processing thread");
                Thread.currentThread().setName(oldName);
            }
        }
    }

    private void reconnectCursor() {
        try {
            this.eventCursor.close();
        }
        catch (Exception e) {
            LOGGER.warn("Unable to close event stream cursor", (Throwable)e);
        }
        ChangeStreamIterable iterable = this.collection.watch();
        if (this.lastProcessedResumeToken == null) {
            LOGGER.error("No resume token available. Reconnecting cursor from current location. Some update events could be missed.");
        } else {
            LOGGER.debug("Attempting to reconnect cursor with resume token {}", (Object)this.lastProcessedResumeToken);
            iterable = iterable.resumeAfter(this.lastProcessedResumeToken);
        }
        this.eventCursor = iterable.iterator();
        LOGGER.debug("Finished reconnecting");
    }

    @Override
    public void close() {
        if (this.isClosed.compareAndSet(false, true)) {
            LOGGER.debug("Closing {}", (Object)this.identityString);
            try {
                this.eventCursor.close();
                this.ex.shutdownNow();
            }
            catch (Throwable t) {
                LOGGER.error("Exception attempting to close {}", (Object)this.identityString, (Object)t);
            }
        } else {
            LOGGER.debug("Already closed: {}", (Object)this.identityString);
        }
    }

    private void processEvent(ChangeStreamDocument<Document> event) {
        LOGGER.debug("# EVENT: {}", event);
        switch (event.getOperationType()) {
            case INSERT: 
            case REPLACE: {
                LOGGER.debug("| Replace document - IGNORE");
                break;
            }
            case UPDATE: {
                UpdateDescription updateDescription = event.getUpdateDescription();
                if (updateDescription == null) break;
                this.replaceUpdatedFields(updateDescription.getUpdatedFields());
                this.deleteRemovedFields(updateDescription.getRemovedFields());
                this.notifyIfEcho(updateDescription.getUpdatedFields(), event.getResumeToken());
                this.bumpLastProcessedRevision(updateDescription.getUpdatedFields());
                break;
            }
            default: {
                throw new NotYetImplementedException("Unknown change stream event: " + event);
            }
        }
        this.lastProcessedResumeToken = event.getResumeToken();
    }

    private void replaceUpdatedFields(@Nullable BsonDocument updatedFields) {
        if (updatedFields != null) {
            for (Map.Entry entry : updatedFields.entrySet()) {
                Reference ref;
                String dottedName = (String)entry.getKey();
                if (!dottedName.startsWith(Formatter.DocumentFields.state.name())) continue;
                try {
                    ref = Formatter.referenceTo(dottedName, this.rootRef);
                }
                catch (InvalidTypeException e) {
                    this.logNonexistentField(dottedName, e);
                    continue;
                }
                LOGGER.debug("| Replace {}", ref);
                Object replacement = this.formatter.bsonValue2object((BsonValue)entry.getValue(), ref);
                this.downstream.submitReplacement(ref, replacement);
            }
        }
    }

    private void deleteRemovedFields(@Nullable List<String> removedFields) {
        if (removedFields != null) {
            for (String dottedName : removedFields) {
                Reference ref;
                if (!dottedName.startsWith(Formatter.DocumentFields.state.name())) continue;
                try {
                    ref = Formatter.referenceTo(dottedName, this.rootRef);
                }
                catch (InvalidTypeException e) {
                    this.logNonexistentField(dottedName, e);
                    continue;
                }
                LOGGER.debug("| Delete {}", ref);
                this.downstream.submitDeletion(ref);
            }
        }
    }

    private void notifyIfEcho(@Nullable BsonDocument updatedFields, BsonDocument resumeToken) {
        String echoToken;
        BlockingQueue<BsonDocument> listener;
        BsonValue newValue;
        if (updatedFields != null && (newValue = updatedFields.get((Object)Formatter.DocumentFields.echo.name())) != null && (listener = this.removeEchoListener(echoToken = newValue.asString().getValue())) != null) {
            LOGGER.debug("| Echo {}: {}", (Object)echoToken, (Object)resumeToken);
            listener.add(resumeToken);
        }
    }

    private void logNonexistentField(String dottedName, InvalidTypeException e) {
        LOGGER.trace("Nonexistent field {}", (Object)dottedName, (Object)e);
        if (LOGGER.isWarnEnabled() && ALREADY_WARNED.add(dottedName)) {
            LOGGER.warn("Ignoring updates of nonexistent field {}", (Object)dottedName);
        }
    }
}

