/*
 * Decompiled with CFR 0.152.
 */
package io.vena.bosk.drivers.mongo;

import com.mongodb.MongoException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.UpdateDescription;
import com.mongodb.lang.Nullable;
import io.vena.bosk.BoskDriver;
import io.vena.bosk.Entity;
import io.vena.bosk.Reference;
import io.vena.bosk.drivers.mongo.Formatter;
import io.vena.bosk.drivers.mongo.MongoReceiver;
import io.vena.bosk.exceptions.InvalidTypeException;
import io.vena.bosk.exceptions.NotYetImplementedException;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SingleDocumentMongoChangeStreamReceiver<R extends Entity>
implements MongoReceiver<R> {
    private final Formatter formatter;
    private final BoskDriver<R> downstream;
    private final Reference<R> rootRef;
    private final ExecutorService ex = Executors.newFixedThreadPool(1);
    private final ConcurrentHashMap<String, BlockingQueue<BsonDocument>> echoListeners = new ConcurrentHashMap();
    private final MongoCollection<Document> collection;
    private final String identityString = String.format("%08x", System.identityHashCode(this));
    private volatile MongoCursor<ChangeStreamDocument<Document>> eventCursor;
    private volatile BsonDocument lastProcessedResumeToken = null;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private static final Set<String> ALREADY_WARNED = Collections.synchronizedSet(new HashSet());
    private static final Logger LOGGER = LoggerFactory.getLogger(SingleDocumentMongoChangeStreamReceiver.class);

    SingleDocumentMongoChangeStreamReceiver(BoskDriver<R> downstream, Reference<R> rootRef, MongoCollection<Document> collection, Formatter formatter) {
        this.downstream = downstream;
        this.rootRef = rootRef;
        this.formatter = formatter;
        this.collection = collection;
        this.eventCursor = collection.watch().iterator();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Initiate event processing loop for mcsr-{}: collection=\"{}\"", new Object[]{this.identityString, collection.getNamespace().getCollectionName(), new Exception("Here's your stack trace")});
        }
        this.ex.submit(this::eventProcessingLoop);
    }

    @Override
    public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException {
        return (R)this.downstream.initialRoot(rootType);
    }

    @Override
    public void flushDownstream() throws InterruptedException, IOException {
        this.downstream.flush();
    }

    @Override
    public void putEchoListener(String echoToken, BlockingQueue<BsonDocument> listener) {
        BlockingQueue<BsonDocument> existing = this.echoListeners.put(echoToken, listener);
        if (existing != null) {
            throw new IllegalStateException("Cannot have two listeners for the same echo token");
        }
    }

    @Override
    public BlockingQueue<BsonDocument> removeEchoListener(String echoToken) {
        return this.echoListeners.remove(echoToken);
    }

    private void eventProcessingLoop() {
        block12: {
            String oldName = Thread.currentThread().getName();
            Thread.currentThread().setName("mcsr-" + this.identityString);
            block9: while (true) {
                try {
                    while (!this.ex.isShutdown()) {
                        ChangeStreamDocument event;
                        try {
                            LOGGER.debug("- Awaiting event");
                            event = (ChangeStreamDocument)this.eventCursor.next();
                        }
                        catch (MongoException e) {
                            if (this.isClosed.get()) {
                                LOGGER.trace("Receiver is closed. Exiting event processing loop", (Throwable)e);
                                break block12;
                            }
                            LOGGER.warn("Lost change stream cursor; reconnecting", (Throwable)e);
                            this.reconnectCursor();
                            continue;
                        }
                        try {
                            this.processEvent((ChangeStreamDocument<Document>)event);
                            continue block9;
                        }
                        catch (Throwable e) {
                            LOGGER.error("Unable to process event: " + event, e);
                        }
                    }
                    break block12;
                }
                catch (Throwable e) {
                    LOGGER.error("Fatal error on MongoDB event processing thread", e);
                    throw e;
                }
            }
            finally {
                LOGGER.debug("Terminating MongoDB event processing thread");
                Thread.currentThread().setName(oldName);
            }
        }
    }

    private void reconnectCursor() {
        try {
            this.eventCursor.close();
        }
        catch (Exception e) {
            LOGGER.warn("Unable to close event stream cursor", (Throwable)e);
        }
        ChangeStreamIterable iterable = this.collection.watch();
        if (this.lastProcessedResumeToken == null) {
            LOGGER.error("No resume token available. Reconnecting cursor from current location. Some update events could be missed.");
        } else {
            LOGGER.debug("Attempting to reconnect cursor with resume token {}", (Object)this.lastProcessedResumeToken);
            iterable = iterable.resumeAfter(this.lastProcessedResumeToken);
        }
        this.eventCursor = iterable.iterator();
        LOGGER.debug("Finished reconnecting");
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void close() {
        if (this.isClosed.compareAndSet(false, true)) {
            LOGGER.debug("Closing {}", (Object)this.identityString);
            try {
                this.eventCursor.close();
                this.ex.shutdown();
                try {
                    LOGGER.debug("Awaiting termination of {}", (Object)this.identityString);
                    boolean success = this.ex.awaitTermination(10L, TimeUnit.SECONDS);
                    if (success) return;
                    LOGGER.warn("Timeout during shutdown of {}", (Object)this.identityString);
                    return;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOGGER.warn("Interrupted during shutdown of {}", (Object)this.identityString, (Object)e);
                }
                return;
            }
            catch (Throwable t) {
                LOGGER.error("Exception attempting to close {}", (Object)this.identityString, (Object)t);
                throw t;
            }
        } else {
            LOGGER.debug("Already closed: {}", (Object)this.identityString);
        }
    }

    private void processEvent(ChangeStreamDocument<Document> event) {
        LOGGER.debug("# EVENT: {}", event);
        switch (event.getOperationType()) {
            case INSERT: 
            case REPLACE: {
                LOGGER.debug("| Replace document - IGNORE");
                break;
            }
            case UPDATE: {
                UpdateDescription updateDescription = event.getUpdateDescription();
                if (updateDescription == null) break;
                this.replaceUpdatedFields(updateDescription.getUpdatedFields());
                this.deleteRemovedFields(updateDescription.getRemovedFields());
                this.notifyIfEcho(updateDescription.getUpdatedFields(), event.getResumeToken());
                break;
            }
            default: {
                throw new NotYetImplementedException("Unknown change stream event: " + event);
            }
        }
        this.lastProcessedResumeToken = event.getResumeToken();
    }

    private void replaceUpdatedFields(@Nullable BsonDocument updatedFields) {
        if (updatedFields != null) {
            for (Map.Entry entry : updatedFields.entrySet()) {
                Reference ref;
                String dottedName = (String)entry.getKey();
                if (!dottedName.startsWith(Formatter.DocumentFields.state.name())) continue;
                try {
                    ref = Formatter.referenceTo(dottedName, this.rootRef);
                }
                catch (InvalidTypeException e) {
                    this.logNonexistentField(dottedName, e);
                    continue;
                }
                LOGGER.debug("| Replace {}", ref);
                Object replacement = this.formatter.bsonValue2object((BsonValue)entry.getValue(), ref);
                this.downstream.submitReplacement(ref, replacement);
            }
        }
    }

    private void deleteRemovedFields(@Nullable List<String> removedFields) {
        if (removedFields != null) {
            for (String dottedName : removedFields) {
                Reference ref;
                try {
                    ref = Formatter.referenceTo(dottedName, this.rootRef);
                }
                catch (InvalidTypeException e) {
                    this.logNonexistentField(dottedName, e);
                    continue;
                }
                LOGGER.debug("| Delete {}", ref);
                this.downstream.submitDeletion(ref);
            }
        }
    }

    private void notifyIfEcho(@Nullable BsonDocument updatedFields, BsonDocument resumeToken) {
        String echoToken;
        BlockingQueue<BsonDocument> listener;
        BsonValue newValue;
        if (updatedFields != null && (newValue = updatedFields.get((Object)Formatter.DocumentFields.echo.name())) != null && (listener = this.removeEchoListener(echoToken = newValue.asString().getValue())) != null) {
            LOGGER.debug("| Echo {}: {}", (Object)echoToken, (Object)resumeToken);
            listener.add(resumeToken);
        }
    }

    private void logNonexistentField(String dottedName, InvalidTypeException e) {
        LOGGER.trace("Nonexistent field \"" + dottedName + "\"", (Throwable)e);
        if (LOGGER.isWarnEnabled() && ALREADY_WARNED.add(dottedName)) {
            LOGGER.warn("Ignoring update of nonexistent field \"" + dottedName + "\"");
        }
    }
}

