package org.apache.inlong.agent.plugin.sources.reader;

import com.alibaba.fastjson.JSONPath;
import com.google.common.base.Preconditions;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.sources.snapshot.MongoDBSnapshotBase;
import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
import org.apache.inlong.agent.utils.GsonUtil;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.class */
public class MongoDBReader extends org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBReader.class);
    private String instanceId;
    private String offsetStoreFileName;
    private String specificOffsetFile;
    private String specificOffsetPos;
    private boolean finished = false;
    private boolean destroyed = false;
    private ExecutorService executor;
    private MongoDBSnapshotBase snapshot;
    private LinkedBlockingQueue<Pair<String, DebeziumFormat>> bufferPool;

    public Message read() {
        if (this.bufferPool.isEmpty()) {
            return null;
        }
        return pollMessage();
    }

    public boolean isFinished() {
        return this.finished;
    }

    public String getReadSource() {
        return this.instanceId;
    }

    public void setReadSource(String str) {
        this.instanceId = str;
    }

    public void setReadTimeout(long j) {
    }

    public void setWaitMillisecond(long j) {
    }

    public String getSnapshot() {
        return this.snapshot != null ? this.snapshot.getSnapshot() : "";
    }

    public void finishRead() {
        this.finished = true;
    }

    public boolean isSourceExist() {
        return true;
    }

    public void destroy() {
        synchronized (this) {
            if (!this.destroyed) {
                this.executor.shutdownNow();
                this.snapshot.close();
                this.destroyed = true;
            }
        }
    }

    @Override // org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader
    public void init(InstanceProfile instanceProfile) {
        super.init(instanceProfile);
        setGlobalParamsValue(instanceProfile);
        startEmbeddedDebeziumEngine(instanceProfile);
    }

    private Message pollMessage() {
        Pair<String, DebeziumFormat> poll = this.bufferPool.poll();
        if (poll == null) {
            return null;
        }
        HashMap hashMap = new HashMap(CommonConstants.DEFAULT_MAP_CAPACITY.intValue());
        hashMap.put("dataKey", poll.getKey());
        return new DefaultMessage(GsonUtil.toJson(poll.getValue()).getBytes(StandardCharsets.UTF_8), hashMap);
    }

    private void setGlobalParamsValue(InstanceProfile instanceProfile) {
        this.bufferPool = new LinkedBlockingQueue<>(instanceProfile.getInt("job.mongoJob.queueSize", LogFileCollectTask.CORE_THREAD_SLEEP_TIME));
        this.instanceId = instanceProfile.getInstanceId();
        this.offsetStoreFileName = instanceProfile.get("job.mongoJob.history.filename", MongoDBSnapshotBase.getSnapshotFilePath()) + "/mongo-" + this.instanceId + "-offset.dat";
        this.snapshot = new MongoDBSnapshotBase(this.offsetStoreFileName);
        this.snapshot.save(instanceProfile.get("job.mongoJob.offsets", ""), new File(this.offsetStoreFileName));
        this.specificOffsetFile = instanceProfile.get("job.mongoJob.offset.specificOffsetFile", "");
        this.specificOffsetPos = instanceProfile.get("job.mongoJob.offset.specificOffsetPos", "-1");
    }

    private void startEmbeddedDebeziumEngine(InstanceProfile instanceProfile) {
        Runnable build = DebeziumEngine.create(Json.class).using(buildMongoConnectorConfig(instanceProfile)).notifying(this::handleChangeEvent).using(this::handle).build();
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.execute(build);
    }

    private void handle(boolean z, String str, Throwable th) {
        if (z) {
            return;
        }
        LOGGER.error("MongoDB job with jobConf {} has error {}", str, th);
    }

    private Properties buildMongoConnectorConfig(InstanceProfile instanceProfile) {
        Configuration.Builder create = Configuration.create();
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.hosts", MongoDbConnectorConfig.HOSTS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.user", MongoDbConnectorConfig.USER);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.password", MongoDbConnectorConfig.PASSWORD);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.databaseIncludeList", MongoDbConnectorConfig.DATABASE_INCLUDE_LIST);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.databaseExcludeList", MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.collectionIncludeList", MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.collectionExcludeList", MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.fieldExcludeList", MongoDbConnectorConfig.FIELD_EXCLUDE_LIST);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.snapshotMode", MongoDbConnectorConfig.SNAPSHOT_MODE);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.captureMode", MongoDbConnectorConfig.CAPTURE_MODE);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.connectTimeoutInMs", MongoDbConnectorConfig.CONNECT_TIMEOUT_MS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.cursorMaxAwaitTimeInMs", MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.socketTimeoutInMs", MongoDbConnectorConfig.SOCKET_TIMEOUT_MS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.selectionTimeoutInMs", MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.fieldRenames", MongoDbConnectorConfig.FIELD_RENAMES);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.membersAutoDiscover", MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.connectMaxAttempts", MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.connectBackoffMaxDelayInMs", MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.connectBackoffInitialDelayInMs", MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.initialSyncMaxThreads", MongoDbConnectorConfig.MAX_COPY_THREADS);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.sslInvalidHostnameAllowed", MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.sslEnabled", MongoDbConnectorConfig.SSL_ENABLED);
        setEngineConfigIfNecessary(instanceProfile, create, "job.mongoJob.pollIntervalInMs", MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS);
        Properties asProperties = create.build().asProperties();
        asProperties.setProperty("offset.storage.file.filename", this.offsetStoreFileName);
        asProperties.setProperty("connector.class", MongoDbConnector.class.getCanonicalName());
        asProperties.setProperty("name", "engine-" + this.instanceId);
        asProperties.setProperty("mongodb.name", "inlong-mongodb-" + this.instanceId);
        if (Objects.equals("initial", asProperties.getOrDefault("job.mongoJob.snapshotMode", "").toString())) {
            Preconditions.checkNotNull("job.mongoJob.offset.specificOffsetFile", "job.mongoJob.offset.specificOffsetFile cannot be null");
            Preconditions.checkNotNull("job.mongoJob.offset.specificOffsetPos", "job.mongoJob.offset.specificOffsetPos cannot be null");
            asProperties.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
            asProperties.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset(this.instanceId, this.specificOffsetFile, this.specificOffsetPos));
        } else {
            asProperties.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
        }
        LOGGER.info("mongo job {} start with props {}", instanceProfile.getInstanceId(), GsonUtil.toJson(asProperties));
        return asProperties;
    }

    private void setEngineConfigIfNecessary(InstanceProfile instanceProfile, Configuration.Builder builder, String str, Field field) {
        String str2 = instanceProfile.get(str, field.defaultValueAsString());
        if (StringUtils.isBlank(str2)) {
            return;
        }
        builder.with(field, str2);
    }

    private void handleChangeEvent(List<ChangeEvent<String, String>> list, DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> recordCommitter) {
        try {
            for (ChangeEvent<String, String> changeEvent : list) {
                DebeziumFormat debeziumFormat = (DebeziumFormat) JSONPath.read((String) changeEvent.value(), "$.payload", DebeziumFormat.class);
                this.bufferPool.put(Pair.of(debeziumFormat.getSource().getCollection(), debeziumFormat));
                recordCommitter.markProcessed(changeEvent);
            }
            recordCommitter.markBatchFinished();
            AuditUtils.add(3, this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis(), list.size(), list.stream().mapToLong(changeEvent2 -> {
                return ((String) changeEvent2.value()).length();
            }).sum());
            this.readerMetric.pluginReadSuccessCount.addAndGet(list.size());
            this.readerMetric.pluginReadCount.addAndGet(list.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
            LOGGER.error("parse mongo message error", e);
            this.readerMetric.pluginReadFailCount.addAndGet(list.size());
            this.readerMetric.pluginReadCount.addAndGet(list.size());
        }
    }
}
