package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch;

import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.LoggingContext;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.BsonUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonType;
import org.bson.BsonValue;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.class */
public class MongodbFetchTaskContext implements FetchTask.Context {
    private final MongodbDialect dialect;
    private final MongodbSourceConfig sourceConfig;
    private final ChangeStreamDescriptor changeStreamDescriptor;
    private ChangeEventQueue<DataChangeEvent> changeEventQueue;

    public MongodbFetchTaskContext(MongodbDialect mongodbDialect, MongodbSourceConfig mongodbSourceConfig, ChangeStreamDescriptor changeStreamDescriptor) {
        this.dialect = mongodbDialect;
        this.sourceConfig = mongodbSourceConfig;
        this.changeStreamDescriptor = changeStreamDescriptor;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public void configure(@Nonnull SourceSplitBase sourceSplitBase) {
        this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(this.sourceConfig.getPollAwaitTimeMillis())).maxBatchSize(this.sourceConfig.getPollMaxBatchSize()).maxQueueSize(sourceSplitBase.isSnapshotSplit() ? Integer.MAX_VALUE : this.sourceConfig.getBatchSize()).loggingContextSupplier(() -> {
            return LoggingContext.forConnector("mongodb-cdc", "mongodb-cdc-connector", "mongodb-cdc-connector-task");
        }).build();
    }

    public MongodbSourceConfig getSourceConfig() {
        return this.sourceConfig;
    }

    public MongodbDialect getDialect() {
        return this.dialect;
    }

    public ChangeStreamDescriptor getChangeStreamDescriptor() {
        return this.changeStreamDescriptor;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.changeEventQueue;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public TableId getTableId(SourceRecord sourceRecord) {
        return MongodbRecordUtils.getTableId(sourceRecord);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public Tables.TableFilter getTableFilter() {
        return Tables.TableFilter.includeAll();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public boolean isExactlyOnce() {
        return true;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return new ChangeStreamOffset(MongodbRecordUtils.getResumeToken(sourceRecord));
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public boolean isDataChangeRecord(SourceRecord sourceRecord) {
        return MongodbRecordUtils.isDataChangeRecord(sourceRecord);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public boolean isRecordBetween(SourceRecord sourceRecord, @Nonnull Object[] objArr, @Nonnull Object[] objArr2) {
        BsonDocument documentKey = MongodbRecordUtils.getDocumentKey(sourceRecord);
        String firstKey = ((BsonDocument) objArr[0]).getFirstKey();
        BsonValue bsonValue = documentKey.get((Object) firstKey);
        BsonValue bsonValue2 = ((BsonDocument) objArr[1]).get((Object) firstKey);
        BsonValue bsonValue3 = ((BsonDocument) objArr2[1]).get((Object) firstKey);
        if (isFullRange(bsonValue2, bsonValue3)) {
            return true;
        }
        return isValueInRange(bsonValue2, bsonValue, bsonValue3);
    }

    private boolean isFullRange(@Nonnull BsonValue bsonValue, BsonValue bsonValue2) {
        return bsonValue.getBsonType() == BsonType.MIN_KEY && bsonValue2.getBsonType() == BsonType.MAX_KEY;
    }

    private boolean isValueInRange(BsonValue bsonValue, BsonValue bsonValue2, BsonValue bsonValue3) {
        return BsonUtils.compareBsonValue(bsonValue, bsonValue2) <= 0 && BsonUtils.compareBsonValue(bsonValue2, bsonValue3) < 0;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public void rewriteOutputBuffer(Map<Struct, SourceRecord> map, @Nonnull SourceRecord sourceRecord) {
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        if (struct2 != null) {
            String string = struct2.getString(MongodbSourceOptions.OPERATION_TYPE);
            switch (OperationType.fromString(string)) {
                case INSERT:
                    map.put(struct, sourceRecord);
                    return;
                case UPDATE:
                case REPLACE:
                    BsonDocument extractBsonDocument = MongodbRecordUtils.extractBsonDocument(struct2, sourceRecord.valueSchema(), MongodbSourceOptions.FULL_DOCUMENT);
                    if (extractBsonDocument == null) {
                        return;
                    }
                    map.put(struct, MongodbRecordUtils.buildSourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), normalizeSnapshotDocument(extractBsonDocument, struct2)));
                    return;
                case DELETE:
                    map.remove(struct);
                    return;
                default:
                    throw new MongodbConnectorException(CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Data change record meet UNKNOWN operation: " + string);
            }
        }
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public List<SourceRecord> formatMessageTimestamp(@Nonnull Collection<SourceRecord> collection) {
        return (List) collection.stream().peek(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.value();
            Struct struct2 = new Struct(struct.schema().field("source").schema());
            struct2.put("ts_ms", (Object) 0L);
            struct2.put("snapshot", MongodbSourceOptions.SNAPSHOT_TRUE);
            struct.put("source", struct2);
        }).collect(Collectors.toList());
    }

    private BsonDocument normalizeSnapshotDocument(@Nonnull BsonDocument bsonDocument, Struct struct) {
        return new BsonDocument().append("_id", new BsonString(struct.getString(MongodbSourceOptions.DOCUMENT_KEY))).append(MongodbSourceOptions.OPERATION_TYPE, new BsonString(MongodbSourceOptions.OPERATION_TYPE_INSERT)).append("ns", new BsonDocument("db", new BsonString(struct.getStruct("ns").getString("db"))).append(MongodbSourceOptions.COLL_FIELD, new BsonString(struct.getStruct("ns").getString(MongodbSourceOptions.COLL_FIELD)))).append(MongodbSourceOptions.DOCUMENT_KEY, new BsonString(struct.getString(MongodbSourceOptions.DOCUMENT_KEY))).append(MongodbSourceOptions.FULL_DOCUMENT, bsonDocument).append("ts_ms", new BsonInt64(struct.getInt64("ts_ms").longValue())).append("source", new BsonDocument("snapshot", new BsonString(MongodbSourceOptions.SNAPSHOT_TRUE)).append("ts_ms", new BsonInt64(0L)));
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public void close() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            MongodbUtils.createMongoClient(this.sourceConfig).close();
        }));
    }
}
