/*
 * Decompiled with CFR 0.152.
 */
package io.vena.bosk.drivers.mongo.v2;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.vena.bosk.drivers.mongo.MongoDriverSettings;
import io.vena.bosk.drivers.mongo.v2.ChangeEventListener;
import io.vena.bosk.drivers.mongo.v2.ReceiverInitializationException;
import io.vena.bosk.exceptions.NotYetImplementedException;
import java.beans.ConstructorProperties;
import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Generated;
import org.bson.BsonDocument;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChangeEventReceiver
implements Closeable {
    private final String boskName;
    private final MongoDriverSettings settings;
    private final MongoCollection<Document> collection;
    private final ExecutorService ex = Executors.newFixedThreadPool(1);
    private final Lock lock = new ReentrantLock();
    private volatile Session currentSession;
    private volatile BsonDocument lastProcessedResumeToken;
    private volatile Future<?> eventProcessingTask;
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventReceiver.class);

    public boolean initialize(ChangeEventListener listener) throws ReceiverInitializationException {
        LOGGER.debug("Initializing receiver");
        try {
            this.lock.lock();
            this.stop();
            this.setupNewSession(listener);
            boolean bl = this.lastProcessedResumeToken != null;
            return bl;
        }
        catch (InterruptedException | RuntimeException | TimeoutException e) {
            throw new ReceiverInitializationException(e);
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean isReady() {
        return this.currentSession != null;
    }

    public void start() {
        try {
            this.lock.lock();
            if (this.currentSession == null) {
                throw new IllegalStateException("Receiver is not initialized");
            }
            if (this.eventProcessingTask == null) {
                this.eventProcessingTask = this.ex.submit(() -> this.eventProcessingLoop(this.currentSession));
            } else {
                LOGGER.debug("Already running");
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public void stop() throws InterruptedException, TimeoutException {
        try {
            this.lock.lock();
            Future<?> task = this.eventProcessingTask;
            if (task != null) {
                LOGGER.debug("Canceling event processing task");
                task.cancel(true);
                try {
                    task.get(10L, TimeUnit.SECONDS);
                    LOGGER.warn("Normal completion of event processing task was not expected");
                }
                catch (CancellationException e) {
                    LOGGER.debug("Cancellation succeeded; event loop interrupted");
                    this.eventProcessingTask = null;
                }
            }
        }
        catch (ExecutionException e) {
            throw new NotYetImplementedException("Event processing loop isn't supposed to throw!", (Throwable)e);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void close() {
        try {
            this.stop();
        }
        catch (InterruptedException | TimeoutException e) {
            LOGGER.info("Ignoring exception while closing ChangeEventReceiver", (Throwable)e);
        }
        this.ex.shutdown();
    }

    private void setupNewSession(ChangeEventListener newListener) {
        assert (this.eventProcessingTask == null);
        LOGGER.debug("Setup new session");
        this.currentSession = null;
        for (int attempt = 1; attempt <= 2; ++attempt) {
            ChangeStreamDocument initialEvent;
            LOGGER.debug("Attempt #{}", (Object)attempt);
            BsonDocument resumePoint = this.lastProcessedResumeToken;
            if (resumePoint == null) {
                LOGGER.debug("Acquire initial resume token");
                try (MongoChangeStreamCursor initialCursor = this.collection.watch().maxAwaitTime(20L, TimeUnit.MILLISECONDS).cursor();){
                    initialEvent = (ChangeStreamDocument)initialCursor.tryNext();
                    if (initialEvent == null) {
                        this.lastProcessedResumeToken = resumePoint = Objects.requireNonNull(initialCursor.getResumeToken(), "Cannot proceed without an initial resume token");
                    }
                    LOGGER.debug("Received event while acquiring initial resume token; storing it for processing in event loop");
                    resumePoint = initialEvent.getResumeToken();
                }
            } else {
                LOGGER.debug("Use existing resume token");
                initialEvent = null;
            }
            try {
                MongoChangeStreamCursor cursor = this.collection.watch().resumeAfter(resumePoint).cursor();
                this.currentSession = new Session((MongoChangeStreamCursor<ChangeStreamDocument<Document>>)cursor, newListener, (ChangeStreamDocument<Document>)initialEvent);
                return;
            }
            catch (MongoCommandException e) {
                LOGGER.error("Change stream cursor command failed; discarding resume token", (Throwable)e);
                this.lastProcessedResumeToken = null;
                continue;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void eventProcessingLoop(Session session) {
        String oldThreadName = Thread.currentThread().getName();
        Thread.currentThread().setName(this.getClass().getSimpleName() + " [" + this.boskName + "]");
        try {
            if (session.initialEvent != null) {
                this.processEvent(session, session.initialEvent);
                session.initialEvent = null;
            }
            while (true) {
                if (this.settings.testing().eventDelayMS() > 0L) {
                    LOGGER.debug("- Sleeping");
                    try {
                        Thread.sleep(this.settings.testing().eventDelayMS());
                    }
                    catch (InterruptedException e) {
                        LOGGER.debug("| Interrupted");
                    }
                }
                this.processEvent(session, (ChangeStreamDocument<Document>)((ChangeStreamDocument)session.cursor.next()));
            }
        }
        catch (MongoInterruptedException e) {
            LOGGER.debug("Event loop interrupted", (Throwable)e);
            session.listener.onException((Exception)((Object)e));
            Thread.currentThread().setName(oldThreadName);
        }
        catch (RuntimeException e) {
            try {
                LOGGER.warn("Unexpected exception while processing events; event loop aborted", (Throwable)e);
                session.listener.onException(e);
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                Thread.currentThread().setName(oldThreadName);
            }
        }
    }

    private void processEvent(Session session, ChangeStreamDocument<Document> event) {
        session.listener.onEvent(event);
        this.lastProcessedResumeToken = event.getResumeToken();
    }

    @ConstructorProperties(value={"boskName", "settings", "collection"})
    @Generated
    public ChangeEventReceiver(String boskName, MongoDriverSettings settings, MongoCollection<Document> collection) {
        this.boskName = boskName;
        this.settings = settings;
        this.collection = collection;
    }

    private static final class Session {
        final MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
        final ChangeEventListener listener;
        ChangeStreamDocument<Document> initialEvent;

        @ConstructorProperties(value={"cursor", "listener", "initialEvent"})
        @Generated
        public Session(MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor, ChangeEventListener listener, ChangeStreamDocument<Document> initialEvent) {
            this.cursor = cursor;
            this.listener = listener;
            this.initialEvent = initialEvent;
        }
    }
}

