package nstream.adapter.mongodb;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import nstream.adapter.common.ingress.IngestorMetricsAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import org.bson.Document;
import swim.concurrent.TimerRef;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/mongodb/MongoDbIngestingAgent.class */
public abstract class MongoDbIngestingAgent extends IngestorMetricsAgent<MongoDbIngressSettings, Document> {
    protected MongoClient client;
    protected TimerRef pollTimer;

    protected void assignClient(MongoClient mongoClient) {
        this.client = mongoClient;
    }

    protected void fetchAndIngest() {
        MongoCursor cursor = find().cursor();
        while (cursor.hasNext()) {
            try {
                ingestOrCancel((Document) cursor.next());
            } catch (Throwable th) {
                if (cursor != null) {
                    try {
                        cursor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (cursor != null) {
            cursor.close();
        }
    }

    protected abstract FindIterable<Document> find();

    protected void cancel() {
        if (this.pollTimer != null) {
            this.pollTimer.cancel();
        }
    }

    public void didFailIngest(Document document, Exception exc) {
        didFail(new RuntimeException(nodeUri() + ": " + document.toJson() + " triggered fatal exception; stopping", exc));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public MongoDbIngressSettings m3parseIngressSettings(Value value) {
        MongoDbIngressSettings mongoDbIngressSettings = (MongoDbIngressSettings) MongoDbIngressSettings.form().cast(value);
        return mongoDbIngressSettings == null ? MongoDbIngressSettings.defaultSettings() : mongoDbIngressSettings;
    }

    protected void stageReception() {
        loadSettings("mongoDbIngressConf");
        assignClient((MongoClient) ProvisionLoader.getProvision(((MongoDbIngressSettings) this.ingressSettings).clientProvisionName()).value());
        this.pollTimer = scheduleWithFixedDelay(() -> {
            return this.pollTimer;
        }, ((MongoDbIngressSettings) this.ingressSettings).firstFetchDelayMillis(), ((MongoDbIngressSettings) this.ingressSettings).fetchIntervalMillis(), this::fetchAndIngest);
    }
}
