/*
 * Decompiled with CFR 0.152.
 */
package io.vena.bosk.drivers.mongo.v2;

import com.mongodb.MongoCommandException;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.vena.bosk.drivers.mongo.MongoDriverSettings;
import io.vena.bosk.drivers.mongo.v2.ChangeEventListener;
import io.vena.bosk.drivers.mongo.v2.ReceiverInitializationException;
import io.vena.bosk.drivers.mongo.v2.UnprocessableEventException;
import io.vena.bosk.exceptions.NotYetImplementedException;
import java.beans.ConstructorProperties;
import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Generated;
import org.bson.BsonDocument;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChangeEventReceiver
implements Closeable {
    private final String boskName;
    private final MongoDriverSettings settings;
    private final MongoCollection<Document> collection;
    private final ExecutorService ex = Executors.newFixedThreadPool(1);
    private final Lock lock = new ReentrantLock();
    private volatile Session currentSession;
    private volatile BsonDocument lastProcessedResumeToken;
    private volatile Future<?> eventProcessingTask;
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventReceiver.class);

    public boolean initialize(ChangeEventListener listener) throws ReceiverInitializationException {
        LOGGER.debug("Initializing receiver");
        try {
            this.lock.lock();
            this.stop();
            boolean bl = this.setupNewSession(listener);
            return bl;
        }
        catch (InterruptedException | RuntimeException | TimeoutException e) {
            throw new ReceiverInitializationException(e);
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean isReady() {
        return this.currentSession != null;
    }

    public void start() {
        try {
            this.lock.lock();
            if (this.currentSession == null) {
                throw new IllegalStateException("Receiver is not initialized");
            }
            if (this.eventProcessingTask == null) {
                this.eventProcessingTask = this.ex.submit(() -> this.eventProcessingLoop(this.currentSession));
            } else {
                LOGGER.debug("Already running");
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public void stop() throws InterruptedException, TimeoutException {
        try {
            Future<?> task;
            this.lock.lock();
            Session session = this.currentSession;
            if (session != null) {
                session.isClosed = true;
                session.cursor.close();
            }
            if ((task = this.eventProcessingTask) != null) {
                LOGGER.debug("Canceling event processing task");
                task.cancel(false);
                try {
                    task.get(this.settings.flushTimeoutMS(), TimeUnit.MILLISECONDS);
                    LOGGER.debug("Cancellation succeeded; event loop exited normally");
                    this.eventProcessingTask = null;
                }
                catch (CancellationException e) {
                    LOGGER.debug("Cancellation succeeded; event loop interrupted");
                    this.eventProcessingTask = null;
                }
            }
        }
        catch (ExecutionException e) {
            throw new NotYetImplementedException("Event processing loop isn't supposed to throw!", (Throwable)e);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void close() {
        try {
            this.stop();
        }
        catch (InterruptedException | TimeoutException e) {
            LOGGER.info("Ignoring exception while closing ChangeEventReceiver", (Throwable)e);
        }
        this.ex.shutdown();
    }

    private boolean setupNewSession(ChangeEventListener newListener) {
        int attempt;
        assert (this.eventProcessingTask == null);
        LOGGER.debug("Setup new session");
        this.currentSession = null;
        for (attempt = 1; attempt <= 2; ++attempt) {
            ChangeStreamDocument initialEvent;
            LOGGER.debug("Attempt #{}", (Object)attempt);
            BsonDocument resumePoint = null;
            if (resumePoint == null) {
                if (this.settings.testing().eventDelayMS() < 0L) {
                    LOGGER.debug("- Sleeping");
                    try {
                        Thread.sleep(-this.settings.testing().eventDelayMS());
                    }
                    catch (InterruptedException e) {
                        LOGGER.debug("Sleep interrupted; continuing", (Throwable)e);
                        Thread.interrupted();
                    }
                }
                LOGGER.debug("Acquire initial resume token");
                try (MongoChangeStreamCursor initialCursor = this.collection.watch().maxAwaitTime(this.settings.experimental().changeStreamInitialWaitMS(), TimeUnit.MILLISECONDS).cursor();){
                    initialEvent = (ChangeStreamDocument)initialCursor.tryNext();
                    if (initialEvent == null) {
                        this.lastProcessedResumeToken = resumePoint = Objects.requireNonNull(initialCursor.getResumeToken(), "Cannot proceed without an initial resume token");
                    }
                    LOGGER.debug("Received event while acquiring initial resume token; storing it for processing in event loop");
                    resumePoint = initialEvent.getResumeToken();
                }
            } else {
                LOGGER.debug("Use existing resume token");
                initialEvent = null;
            }
            try {
                MongoChangeStreamCursor cursor = this.collection.watch().resumeAfter(resumePoint).cursor();
                this.currentSession = new Session((MongoChangeStreamCursor<ChangeStreamDocument<Document>>)cursor, newListener, initialEvent, false);
                return true;
            }
            catch (MongoCommandException e) {
                LOGGER.error("Change stream cursor command failed; discarding resume token", (Throwable)e);
                this.lastProcessedResumeToken = null;
                continue;
            }
        }
        LOGGER.debug("Giving up initializing session after attempt #{}", (Object)(attempt - 1));
        return false;
    }

    /*
     * Exception decompiling
     */
    private void eventProcessingLoop(Session session) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 3 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void processEvent(Session session, ChangeStreamDocument<Document> event) throws UnprocessableEventException {
        session.listener.onEvent(event);
        this.lastProcessedResumeToken = event.getResumeToken();
    }

    @ConstructorProperties(value={"boskName", "settings", "collection"})
    @Generated
    public ChangeEventReceiver(String boskName, MongoDriverSettings settings, MongoCollection<Document> collection) {
        this.boskName = boskName;
        this.settings = settings;
        this.collection = collection;
    }

    private static final class Session {
        final MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
        final ChangeEventListener listener;
        ChangeStreamDocument<Document> initialEvent;
        volatile boolean isClosed;

        @ConstructorProperties(value={"cursor", "listener", "initialEvent", "isClosed"})
        @Generated
        public Session(MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor, ChangeEventListener listener, ChangeStreamDocument<Document> initialEvent, boolean isClosed) {
            this.cursor = cursor;
            this.listener = listener;
            this.initialEvent = initialEvent;
            this.isClosed = isClosed;
        }
    }
}

