package org.apache.inlong.agent.plugin.sources.reader;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.relational.history.FileDatabaseHistory;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter;
import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/reader/BinlogReader.class */
public class BinlogReader extends AbstractReader {
    public static final String JOB_DATABASE_USER = "job.binlogJob.user";
    public static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password";
    public static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname";
    public static final String JOB_TABLE_WHITELIST = "job.binlogJob.tableWhiteList";
    public static final String JOB_DATABASE_WHITELIST = "job.binlogJob.databaseWhiteList";
    public static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets";
    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.binlogJob.offset.specificOffsetFile";
    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.binlogJob.offset.specificOffsetPos";
    public static final String JOB_DATABASE_SERVER_TIME_ZONE = "job.binlogJob.serverTimezone";
    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.binlogJob.offset.intervalMs";
    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.binlogJob.history.filename";
    public static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES = "job.binlogJob.schema";
    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.binlogJob.snapshot.mode";
    public static final String JOB_DATABASE_HISTORY_MONITOR_DDL = "job.binlogJob.ddl";
    public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
    public static final String JOB_DATABASE_QUEUE_SIZE = "job.binlogJob.queueSize";
    private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReader.class);
    private static final Gson GSON = new Gson();
    private LinkedBlockingQueue<Pair<String, String>> binlogMessagesQueue;
    private String userName;
    private String password;
    private String hostName;
    private String port;
    private String tableWhiteList;
    private String databaseWhiteList;
    private String serverTimeZone;
    private String offsetStoreFileName;
    private String offsetFlushIntervalMs;
    private String databaseStoreHistoryName;
    private String includeSchemaChanges;
    private String snapshotMode;
    private String historyMonitorDdl;
    private String instanceId;
    private ExecutorService executor;
    private String specificOffsetFile;
    private String specificOffsetPos;
    private BinlogSnapshotBase binlogSnapshot;
    private JobProfile jobProfile;
    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
    private boolean finished = false;
    private boolean destroyed = false;

    public Message read() {
        if (this.binlogMessagesQueue.isEmpty()) {
            return null;
        }
        return getBinlogMessage();
    }

    private DefaultMessage getBinlogMessage() {
        Pair<String, String> poll = this.binlogMessagesQueue.poll();
        HashMap hashMap = new HashMap(CommonConstants.DEFAULT_MAP_CAPACITY.intValue());
        hashMap.put("dataKey", poll.getKey());
        return new DefaultMessage(((String) poll.getValue()).getBytes(StandardCharsets.UTF_8), hashMap);
    }

    @Override // org.apache.inlong.agent.plugin.sources.reader.AbstractReader
    public void init(JobProfile jobProfile) {
        super.init(jobProfile);
        this.jobProfile = jobProfile;
        LOGGER.info("init binlog reader with jobConf {}", jobProfile.toJsonStr());
        this.userName = jobProfile.get(JOB_DATABASE_USER);
        this.password = jobProfile.get(JOB_DATABASE_PASSWORD);
        this.hostName = jobProfile.get(JOB_DATABASE_HOSTNAME);
        this.port = jobProfile.get(JOB_DATABASE_PORT);
        this.tableWhiteList = jobProfile.get(JOB_TABLE_WHITELIST, "[\\s\\S]*.*");
        this.databaseWhiteList = jobProfile.get(JOB_DATABASE_WHITELIST, "");
        this.serverTimeZone = jobProfile.get(JOB_DATABASE_SERVER_TIME_ZONE, "");
        this.offsetFlushIntervalMs = jobProfile.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
        this.databaseStoreHistoryName = jobProfile.get(JOB_DATABASE_STORE_HISTORY_FILENAME, tryToInitAndGetHistoryPath()) + "/history.dat" + jobProfile.getInstanceId();
        this.snapshotMode = jobProfile.get(JOB_DATABASE_SNAPSHOT_MODE, "");
        this.includeSchemaChanges = jobProfile.get(JOB_DATABASE_INCLUDE_SCHEMA_CHANGES, "false");
        this.historyMonitorDdl = jobProfile.get(JOB_DATABASE_HISTORY_MONITOR_DDL, "false");
        this.binlogMessagesQueue = new LinkedBlockingQueue<>(jobProfile.getInt(JOB_DATABASE_QUEUE_SIZE, LogFileCollectTask.CORE_THREAD_SLEEP_TIME));
        this.instanceId = jobProfile.getInstanceId();
        this.finished = false;
        this.specificOffsetFile = jobProfile.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
        this.specificOffsetPos = jobProfile.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
        this.offsetStoreFileName = jobProfile.get(JOB_DATABASE_STORE_HISTORY_FILENAME, tryToInitAndGetHistoryPath()) + "/offset.dat" + jobProfile.getInstanceId();
        this.binlogSnapshot = new BinlogSnapshotBase(this.offsetStoreFileName);
        this.binlogSnapshot.save(jobProfile.get(JOB_DATABASE_OFFSETS, ""), this.binlogSnapshot.getFile());
        Runnable build = DebeziumEngine.create(Json.class).notifying((list, recordCommitter) -> {
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ChangeEvent changeEvent = (ChangeEvent) it.next();
                    this.binlogMessagesQueue.put(Pair.of(((DebeziumFormat) GSON.fromJson((String) changeEvent.value(), DebeziumFormat.class)).getSource().getTable(), changeEvent.value()));
                    recordCommitter.markProcessed(changeEvent);
                }
                recordCommitter.markBatchFinished();
                AuditUtils.add(3, this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis(), list.size(), list.stream().mapToLong(changeEvent2 -> {
                    return ((String) changeEvent2.value()).length();
                }).sum());
                this.readerMetric.pluginReadSuccessCount.addAndGet(list.size());
                this.readerMetric.pluginReadCount.addAndGet(list.size());
            } catch (Exception e) {
                this.readerMetric.pluginReadFailCount.addAndGet(list.size());
                this.readerMetric.pluginReadCount.addAndGet(list.size());
                LOGGER.error("parse binlog message error", e);
            }
        }).using(getEngineProps()).using((z, str, th) -> {
            if (z) {
                return;
            }
            LOGGER.error("error for binlog job: {}, msg: {}", new Object[]{jobProfile.getInstanceId(), str, th});
        }).build();
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.execute(build);
        LOGGER.info("get initial snapshot of job {}, snapshot {}", jobProfile.getInstanceId(), getSnapshot());
    }

    private Properties getEngineProps() {
        Properties properties = new Properties();
        properties.setProperty("name", "engine" + this.instanceId);
        properties.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
        properties.setProperty("database.server.name", this.instanceId);
        properties.setProperty("database.hostname", this.hostName);
        properties.setProperty("database.port", this.port);
        properties.setProperty("database.user", this.userName);
        properties.setProperty("database.password", this.password);
        properties.setProperty("database.serverTimezone", this.serverTimeZone);
        properties.setProperty("table.whitelist", this.tableWhiteList);
        properties.setProperty("database.whitelist", this.databaseWhiteList);
        properties.setProperty("offset.flush.interval.ms", this.offsetFlushIntervalMs);
        properties.setProperty("database.snapshot.mode", this.snapshotMode);
        properties.setProperty("database.history.store.only.monitored.tables.ddl", this.historyMonitorDdl);
        properties.setProperty("database.allowPublicKeyRetrieval", ManagerResultFormatter.SUCCESS_CODE);
        properties.setProperty("key.converter.schemas.enable", "false");
        properties.setProperty("value.converter.schemas.enable", "false");
        properties.setProperty("include.schema.changes", this.includeSchemaChanges);
        properties.setProperty("snapshot.mode", this.snapshotMode);
        properties.setProperty("offset.storage.file.filename", this.offsetStoreFileName);
        properties.setProperty("database.history.file.filename", this.databaseStoreHistoryName);
        if ("schema_only_recovery".equals(this.snapshotMode)) {
            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "job.binlogJob.offset.specificOffsetFileshouldn't be null");
            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "job.binlogJob.offset.specificOffsetPos shouldn't be null");
            properties.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
            properties.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset(this.instanceId, this.specificOffsetFile, this.specificOffsetPos));
            properties.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
        } else {
            properties.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
            properties.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
        }
        properties.setProperty("tombstones.on.delete", "false");
        properties.setProperty("converters", "datetime");
        properties.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
        properties.setProperty("datetime.format.date", "yyyy-MM-dd");
        properties.setProperty("datetime.format.time", "HH:mm:ss");
        properties.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
        properties.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
        properties.setProperty("datetime.format.timestamp.zone", this.serverTimeZone);
        LOGGER.info("binlog job {} start with props {}", this.jobProfile.getInstanceId(), properties);
        return properties;
    }

    public void destroy() {
        synchronized (this) {
            if (!this.destroyed) {
                this.executor.shutdownNow();
                this.binlogSnapshot.close();
                this.destroyed = true;
            }
        }
    }

    public boolean isFinished() {
        return this.finished;
    }

    public String getReadSource() {
        return this.instanceId;
    }

    public void setReadSource(String str) {
        this.instanceId = str;
    }

    public void setReadTimeout(long j) {
    }

    public void setWaitMillisecond(long j) {
    }

    public String getSnapshot() {
        return this.binlogSnapshot != null ? this.binlogSnapshot.getSnapshot() : "";
    }

    public void finishRead() {
        this.finished = true;
    }

    public boolean isSourceExist() {
        return true;
    }

    private String tryToInitAndGetHistoryPath() {
        return AgentUtils.makeDirsIfNotExist(this.agentConf.get("agent.history.path", ".history"), this.agentConf.get("agent.home", AgentConstants.DEFAULT_AGENT_HOME)).getAbsolutePath();
    }
}
