package nstream.adapter.mongodb;

import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import nstream.adapter.common.ingress.IngestorMetricsAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import org.bson.Document;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/mongodb/MongoDbChangeStreamIngestingAgent.class */
public abstract class MongoDbChangeStreamIngestingAgent extends IngestorMetricsAgent<MongoDbIngressSettings, ChangeStreamDocument<Document>> {
    protected MongoClient client;
    protected volatile MongoCursor<ChangeStreamDocument<Document>> cursor;

    protected void assignClient(MongoClient mongoClient) {
        this.client = mongoClient;
    }

    protected void subscribe(Runnable runnable) {
        try {
            this.cursor = changeStream().cursor();
            runnable.run();
            while (true) {
                ingestOrCancel((ChangeStreamDocument) this.cursor.next());
            }
        } catch (Exception e) {
            didFail(new RuntimeException(nodeUri() + ": exception reading from cursor; stopping", e));
        }
    }

    protected abstract ChangeStreamIterable<Document> changeStream();

    protected void cancel() {
        if (this.cursor != null) {
            this.cursor.close();
            this.cursor = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void didFailIngest(ChangeStreamDocument<Document> changeStreamDocument, Exception exc) {
        didFail(new RuntimeException(nodeUri() + ": " + changeStreamDocument + " triggered fatal exception; stopping ", exc));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public MongoDbIngressSettings m0parseIngressSettings(Value value) {
        MongoDbIngressSettings mongoDbIngressSettings = (MongoDbIngressSettings) MongoDbIngressSettings.form().cast(value);
        return mongoDbIngressSettings == null ? MongoDbIngressSettings.defaultSettings() : mongoDbIngressSettings;
    }

    protected void didStageReception() {
        super.didStageReception();
        info(nodeUri() + ": successfully staged change stream for reception");
    }

    protected void stageReception() {
        loadSettings("mongoDbIngressConf");
        assignClient((MongoClient) ProvisionLoader.getProvision(((MongoDbIngressSettings) this.ingressSettings).clientProvisionName()).value());
        subscribe(this::didStageReception);
    }

    public void didStart() {
        info(nodeUri() + ": didStart");
        execute(this::stageReception);
    }
}
