/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.internal;

import com.mongodb.ConnectionString;
import com.mongodb.MongoNamespace;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.kafka.connect.source.MongoSourceTask;
import io.debezium.connector.SnapshotRecord;
import java.lang.reflect.Field;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import org.apache.flink.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTaskContext;
import org.bson.conversions.Bson;
import org.bson.json.JsonReader;

public class MongoDBConnectorSourceTask
extends SourceTask {
    public static final String DATABASE_INCLUDE_LIST = "database.include.list";
    public static final String COLLECTION_INCLUDE_LIST = "collection.include.list";
    public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG = "startup.mode.copy.existing.max.threads";
    public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG = "startup.mode.copy.existing.queue.size";
    public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG = "startup.mode.copy.existing.pipeline";
    public static final String STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG = "startup.mode.timestamp.start.at.operation.time";
    private final MongoSourceTask target = new MongoSourceTask();
    private final Field startedTaskField = MongoSourceTask.class.getDeclaredField("startedTask");
    private SourceRecord currentLastSnapshotRecord;
    private boolean isInSnapshotPhase = false;

    public MongoDBConnectorSourceTask() throws NoSuchFieldException {
        this.startedTaskField.setAccessible(true);
    }

    @Override
    public String version() {
        return this.target.version();
    }

    @Override
    public void initialize(SourceTaskContext context) {
        this.target.initialize(context);
        this.context = context;
    }

    @Override
    public void start(Map<String, String> props) {
        this.initCapturedCollections(props);
        this.target.start(props);
        this.isInSnapshotPhase = this.isCopying();
    }

    @Override
    public void commit() throws InterruptedException {
        this.target.commit();
    }

    @Override
    public void commitRecord(SourceRecord record) throws InterruptedException {
        this.target.commitRecord(record);
    }

    @Override
    public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
        this.target.commitRecord(record, metadata);
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        LinkedList<SourceRecord> outSourceRecords;
        block9: {
            List<SourceRecord> sourceRecords;
            block7: {
                block8: {
                    sourceRecords = this.target.poll();
                    outSourceRecords = new LinkedList<SourceRecord>();
                    if (!this.isInSnapshotPhase) break block7;
                    if (sourceRecords == null || sourceRecords.isEmpty()) break block8;
                    for (SourceRecord sourceRecord : sourceRecords) {
                        SourceRecord current = this.markRecordTimestamp(sourceRecord);
                        if (MongoRecordUtils.isSnapshotRecord(current)) {
                            this.markSnapshotRecord(current);
                            if (this.currentLastSnapshotRecord != null) {
                                outSourceRecords.add(this.currentLastSnapshotRecord);
                            }
                            this.currentLastSnapshotRecord = current;
                            continue;
                        }
                        if (this.currentLastSnapshotRecord != null) {
                            outSourceRecords.add(this.markLastSnapshotRecordOfAll(this.currentLastSnapshotRecord));
                            this.currentLastSnapshotRecord = null;
                            this.isInSnapshotPhase = false;
                        }
                        outSourceRecords.add(current);
                    }
                    break block9;
                }
                if (this.isCopying()) break block9;
                if (this.currentLastSnapshotRecord != null) {
                    outSourceRecords.add(this.markLastSnapshotRecordOfAll(this.currentLastSnapshotRecord));
                    this.currentLastSnapshotRecord = null;
                }
                this.isInSnapshotPhase = false;
                break block9;
            }
            if (sourceRecords != null && !sourceRecords.isEmpty()) {
                for (SourceRecord current : sourceRecords) {
                    outSourceRecords.add(this.markRecordTimestamp(current));
                }
            }
        }
        return outSourceRecords;
    }

    @Override
    public void stop() {
        this.target.stop();
    }

    private SourceRecord markRecordTimestamp(SourceRecord record) {
        if (MongoRecordUtils.isHeartbeatEvent(record)) {
            return this.markTimestampForHeartbeatRecord(record);
        }
        return this.markTimestampForDataRecord(record);
    }

    private SourceRecord markTimestampForDataRecord(SourceRecord record) {
        String clusterTime;
        Struct value = (Struct)record.value();
        value.put("ts_ms", (Object)System.currentTimeMillis());
        Struct source = new Struct(value.schema().field("source").schema());
        long timestamp = 0L;
        if (value.schema().field("clusterTime") != null && (clusterTime = value.getString("clusterTime")) != null) {
            timestamp = (long)new JsonReader(clusterTime).readTimestamp().getTime() * 1000L;
        }
        source.put("ts_ms", (Object)timestamp);
        value.put("source", (Object)source);
        return record;
    }

    private SourceRecord markTimestampForHeartbeatRecord(SourceRecord record) {
        Struct heartbeatValue = new Struct(MongoDBEnvelope.HEARTBEAT_VALUE_SCHEMA);
        heartbeatValue.put("ts_ms", (Object)Instant.now().toEpochMilli());
        return new SourceRecord(record.sourcePartition(), record.sourceOffset(), record.topic(), record.keySchema(), record.key(), MongoDBEnvelope.HEARTBEAT_VALUE_SCHEMA, heartbeatValue);
    }

    private void markSnapshotRecord(SourceRecord record) {
        Struct value = (Struct)record.value();
        Struct source = value.getStruct("source");
        SnapshotRecord.TRUE.toSource(source);
    }

    private SourceRecord markLastSnapshotRecordOfAll(SourceRecord record) {
        Struct value = (Struct)record.value();
        Struct source = value.getStruct("source");
        SnapshotRecord snapshot = SnapshotRecord.fromSource(source);
        if (snapshot == SnapshotRecord.TRUE) {
            SnapshotRecord.LAST.toSource(source);
        }
        return record;
    }

    private boolean isCopying() {
        try {
            Object startedTask = this.startedTaskField.get(this.target);
            Field isCopyingField = startedTask.getClass().getDeclaredField("isCopying");
            isCopyingField.setAccessible(true);
            return (Boolean)isCopyingField.get(startedTask);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new IllegalStateException("Cannot access isCopying field of SourceTask", e);
        }
    }

    private void initCapturedCollections(Map<String, String> props) {
        ConnectionString connectionString = new ConnectionString(props.get("connection.uri"));
        String databaseIncludeList = props.get(DATABASE_INCLUDE_LIST);
        String collectionIncludeList = props.get(COLLECTION_INCLUDE_LIST);
        List databaseList = Optional.ofNullable(databaseIncludeList).map(input -> Arrays.asList(input.split(","))).orElse(null);
        List collectionList = Optional.ofNullable(collectionIncludeList).map(input -> Arrays.asList(input.split(","))).orElse(null);
        if (collectionList != null) {
            List<String> discoveredCollections;
            List<String> discoveredDatabases;
            try (MongoClient mongoClient = MongoClients.create(connectionString);){
                discoveredDatabases = CollectionDiscoveryUtils.databaseNames(mongoClient, CollectionDiscoveryUtils.databaseFilter(databaseList));
                discoveredCollections = CollectionDiscoveryUtils.collectionNames(mongoClient, discoveredDatabases, CollectionDiscoveryUtils.collectionsFilter(collectionList));
            }
            if (CollectionDiscoveryUtils.isIncludeListExplicitlySpecified(collectionList, discoveredCollections)) {
                MongoNamespace namespace = new MongoNamespace(discoveredCollections.get(0));
                props.put("database", namespace.getDatabaseName());
                props.put("collection", namespace.getCollectionName());
            } else {
                String namespacesRegex = CollectionDiscoveryUtils.includeListAsPatterns(collectionList).stream().map(Pattern::pattern).collect(Collectors.joining("|"));
                ArrayList<Bson> pipeline = new ArrayList<Bson>();
                pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
                Bson nsFilter = Filters.regex("_ns_", namespacesRegex);
                if (databaseList != null) {
                    if (CollectionDiscoveryUtils.isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
                        props.put("database", discoveredDatabases.get(0));
                    } else {
                        String databaseRegex = CollectionDiscoveryUtils.includeListAsPatterns(databaseList).stream().map(Pattern::pattern).collect(Collectors.joining("|"));
                        Bson dbFilter = Filters.regex("ns.db", databaseRegex);
                        nsFilter = Filters.and(dbFilter, nsFilter);
                    }
                }
                pipeline.add(Aggregates.match(nsFilter));
                props.put("pipeline", CollectionDiscoveryUtils.bsonListToJson(pipeline));
                String copyExistingNamespaceRegex = discoveredCollections.stream().map(ns -> CollectionDiscoveryUtils.completionPattern(ns).pattern()).collect(Collectors.joining("|"));
                props.put("copy.existing.namespace.regex", copyExistingNamespaceRegex);
            }
        } else if (databaseList != null) {
            List<String> discoveredDatabases;
            try (MongoClient mongoClient = MongoClients.create(connectionString);){
                discoveredDatabases = CollectionDiscoveryUtils.databaseNames(mongoClient, CollectionDiscoveryUtils.databaseFilter(databaseList));
            }
            if (CollectionDiscoveryUtils.isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
                props.put("database", discoveredDatabases.get(0));
            } else {
                String databaseRegex = CollectionDiscoveryUtils.includeListAsPatterns(databaseList).stream().map(Pattern::pattern).collect(Collectors.joining("|"));
                ArrayList<Bson> pipeline = new ArrayList<Bson>();
                pipeline.add(Aggregates.match(Filters.regex("ns.db", databaseRegex)));
                props.put("pipeline", CollectionDiscoveryUtils.bsonListToJson(pipeline));
                String copyExistingNamespaceRegex = discoveredDatabases.stream().map(db -> CollectionDiscoveryUtils.completionPattern(db + "\\..*").pattern()).collect(Collectors.joining("|"));
                props.put("copy.existing.namespace.regex", copyExistingNamespaceRegex);
            }
        }
    }
}

