package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collections;
import javax.annotation.Nonnull;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.class */
public class MongodbScanFetchTask implements FetchTask<SourceSplitBase> {
    private static final Logger log = LoggerFactory.getLogger(MongodbScanFetchTask.class);
    private final SnapshotSplit snapshotSplit;
    private volatile boolean taskRunning = false;

    public MongodbScanFetchTask(SnapshotSplit snapshotSplit) {
        this.snapshotSplit = snapshotSplit;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
    public void execute(FetchTask.Context context) throws Exception {
        MongodbFetchTaskContext mongodbFetchTaskContext = (MongodbFetchTaskContext) context;
        MongodbSourceConfig sourceConfig = mongodbFetchTaskContext.getSourceConfig();
        MongodbDialect dialect = mongodbFetchTaskContext.getDialect();
        ChangeEventQueue<DataChangeEvent> queue = mongodbFetchTaskContext.getQueue();
        this.taskRunning = true;
        TableId tableId = this.snapshotSplit.getTableId();
        ChangeStreamOffset displayCurrentOffset = dialect.displayCurrentOffset(sourceConfig);
        log.info("Snapshot step 1 - Determining low watermark {} for split {}", displayCurrentOffset, this.snapshotSplit);
        queue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongodbRecordUtils.createWatermarkPartitionMap(tableId.identifier()), "__mongodb_watermarks", this.snapshotSplit.splitId(), WatermarkKind.LOW, displayCurrentOffset)));
        log.info("Snapshot step 2 - Snapshotting data");
        try {
            try {
                MongoCursor<RawBsonDocument> snapshotCursor = getSnapshotCursor(this.snapshotSplit, sourceConfig);
                while (snapshotCursor.hasNext()) {
                    try {
                        checkTaskRunning();
                        BsonDocument normalizeSnapshotDocument = normalizeSnapshotDocument(tableId, snapshotCursor.next());
                        queue.enqueue(new DataChangeEvent(buildSourceRecord(sourceConfig, tableId, new BsonDocument("_id", normalizeSnapshotDocument.get("_id")), normalizeSnapshotDocument)));
                    } catch (Throwable th) {
                        if (snapshotCursor != null) {
                            try {
                                snapshotCursor.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                ChangeStreamOffset displayCurrentOffset2 = dialect.displayCurrentOffset(sourceConfig);
                log.info("Snapshot step 3 - Determining high watermark {} for split {}", displayCurrentOffset2, this.snapshotSplit);
                queue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongodbRecordUtils.createWatermarkPartitionMap(tableId.identifier()), "__mongodb_watermarks", this.snapshotSplit.splitId(), WatermarkKind.HIGH, displayCurrentOffset2)));
                log.info("Snapshot step 4 - Back fill stream split for snapshot split {}", this.snapshotSplit);
                IncrementalSplit createBackfillStreamSplit = createBackfillStreamSplit(displayCurrentOffset, displayCurrentOffset2);
                if (createBackfillStreamSplit.getStopOffset().isAfter(createBackfillStreamSplit.getStartupOffset())) {
                    new MongodbStreamFetchTask(createBackfillStreamSplit).execute(mongodbFetchTaskContext);
                } else {
                    queue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongodbRecordUtils.createWatermarkPartitionMap(tableId.identifier()), "__mongodb_watermarks", createBackfillStreamSplit.splitId(), WatermarkKind.END, createBackfillStreamSplit.getStopOffset())));
                }
                if (snapshotCursor != null) {
                    snapshotCursor.close();
                }
            } catch (Exception e) {
                throw new MongodbConnectorException(CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("Execute snapshot read subtask for mongodb split %s fail", this.snapshotSplit));
            }
        } finally {
            this.taskRunning = false;
        }
    }

    @Nonnull
    private MongoCursor<RawBsonDocument> getSnapshotCursor(@Nonnull SnapshotSplit snapshotSplit, MongodbSourceConfig mongodbSourceConfig) {
        MongoCollection mongoCollection = MongodbUtils.getMongoCollection(MongodbUtils.createMongoClient(mongodbSourceConfig), snapshotSplit.getTableId(), RawBsonDocument.class);
        BsonDocument bsonDocument = (BsonDocument) snapshotSplit.getSplitStart()[1];
        BsonDocument bsonDocument2 = (BsonDocument) snapshotSplit.getSplitEnd()[1];
        BsonDocument bsonDocument3 = (BsonDocument) snapshotSplit.getSplitStart()[0];
        log.info("Initializing snapshot split processing: TableId={}, StartKey={}, EndKey={}, Hint={}", new Object[]{snapshotSplit.getTableId(), bsonDocument, bsonDocument2, bsonDocument3});
        return mongoCollection.find().min(bsonDocument).max(bsonDocument2).hint(bsonDocument3).batchSize2(mongodbSourceConfig.getBatchSize()).noCursorTimeout(true).cursor();
    }

    @Nonnull
    private SourceRecord buildSourceRecord(@Nonnull MongodbSourceConfig mongodbSourceConfig, @Nonnull TableId tableId, BsonDocument bsonDocument, BsonDocument bsonDocument2) {
        return MongodbRecordUtils.buildSourceRecord(MongodbRecordUtils.createPartitionMap(mongodbSourceConfig.getHosts(), tableId.catalog(), tableId.table()), MongodbRecordUtils.createSourceOffsetMap(bsonDocument.getDocument("_id"), true), tableId.identifier(), bsonDocument, bsonDocument2);
    }

    private void checkTaskRunning() {
        if (!this.taskRunning) {
            throw new MongodbConnectorException(CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Interrupted while snapshotting collection");
        }
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
    public void shutdown() {
        this.taskRunning = false;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
    /* renamed from: getSplit, reason: merged with bridge method [inline-methods] */
    public SourceSplitBase getSplit2() {
        return this.snapshotSplit;
    }

    private IncrementalSplit createBackfillStreamSplit(ChangeStreamOffset changeStreamOffset, ChangeStreamOffset changeStreamOffset2) {
        return new IncrementalSplit(this.snapshotSplit.splitId(), Collections.singletonList(this.snapshotSplit.getTableId()), changeStreamOffset, changeStreamOffset2, new ArrayList());
    }

    private BsonDocument normalizeSnapshotDocument(@Nonnull TableId tableId, @Nonnull BsonDocument bsonDocument) {
        return new BsonDocument().append("_id", new BsonDocument("_id", bsonDocument.get("_id"))).append(MongodbSourceOptions.OPERATION_TYPE, new BsonString(MongodbSourceOptions.OPERATION_TYPE_INSERT)).append("ns", new BsonDocument("db", new BsonString(tableId.catalog())).append(MongodbSourceOptions.COLL_FIELD, new BsonString(tableId.table()))).append(MongodbSourceOptions.DOCUMENT_KEY, new BsonDocument("_id", bsonDocument.get("_id"))).append(MongodbSourceOptions.FULL_DOCUMENT, bsonDocument).append("ts_ms", new BsonInt64(System.currentTimeMillis())).append("source", new BsonDocument("snapshot", new BsonString(MongodbSourceOptions.SNAPSHOT_TRUE)).append("ts_ms", new BsonInt64(0L)));
    }
}
