package org.apache.inlong.agent.plugin.sources.reader;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase;
import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.class */
public class PostgreSQLReader extends org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader {
    public static final String COMPONENT_NAME = "PostgreSQLReader";
    public static final String JOB_POSTGRESQL_USER = "job.postgreSQLJob.user";
    public static final String JOB_DATABASE_PASSWORD = "job.postgreSQLJob.password";
    public static final String JOB_DATABASE_HOSTNAME = "job.postgreSQLJob.hostname";
    public static final String JOB_DATABASE_PORT = "job.postgreSQLJob.port";
    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.postgreSQLJob.offset.intervalMs";
    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.postgreSQLJob.history.filename";
    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.postgreSQLJob.snapshot.mode";
    public static final String JOB_DATABASE_QUEUE_SIZE = "job.postgreSQLJob.queueSize";
    public static final String JOB_DATABASE_OFFSETS = "job.postgreSQLJob.offsets";
    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.postgreSQLJob.offset.specificOffsetFile";
    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.postgreSQLJob.offset.specificOffsetPos";
    public static final String JOB_DATABASE_DBNAME = "job.postgreSQLJob.dbname";
    public static final String JOB_DATABASE_SERVER_NAME = "job.postgreSQLJob.servername";
    public static final String JOB_DATABASE_PLUGIN_NAME = "job.postgreSQLJob.pluginname";
    private static final Gson GSON = new Gson();
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLReader.class);
    private String userName;
    private String password;
    private String hostName;
    private String port;
    private String offsetFlushIntervalMs;
    private String offsetStoreFileName;
    private String snapshotMode;
    private String instanceId;
    private String offset;
    private String specificOffsetFile;
    private String specificOffsetPos;
    private String dbName;
    private String pluginName;
    private String serverName;
    private PostgreSQLSnapshotBase postgreSQLSnapshot;
    private ExecutorService executor;
    private LinkedBlockingQueue<Pair<String, String>> postgreSQLMessageQueue;
    private InstanceProfile jobProfile;
    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
    private boolean finished = false;
    private boolean destroyed = false;

    public Message read() {
        if (this.postgreSQLMessageQueue.isEmpty()) {
            return null;
        }
        return getPostgreSQLMessage();
    }

    private DefaultMessage getPostgreSQLMessage() {
        Pair<String, String> poll = this.postgreSQLMessageQueue.poll();
        HashMap hashMap = new HashMap(CommonConstants.DEFAULT_MAP_CAPACITY.intValue());
        hashMap.put("dataKey", poll.getKey());
        return new DefaultMessage(((String) poll.getValue()).getBytes(StandardCharsets.UTF_8), hashMap);
    }

    @Override // org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader
    public void init(InstanceProfile instanceProfile) {
        super.init(instanceProfile);
        this.jobProfile = instanceProfile;
        LOGGER.info("init PostgreSQL reader with jobConf {}", instanceProfile.toJsonStr());
        this.userName = instanceProfile.get(JOB_POSTGRESQL_USER);
        this.password = instanceProfile.get(JOB_DATABASE_PASSWORD);
        this.hostName = instanceProfile.get(JOB_DATABASE_HOSTNAME);
        this.port = instanceProfile.get(JOB_DATABASE_PORT);
        this.dbName = instanceProfile.get(JOB_DATABASE_DBNAME);
        this.serverName = instanceProfile.get(JOB_DATABASE_SERVER_NAME);
        this.pluginName = instanceProfile.get(JOB_DATABASE_PLUGIN_NAME, "pgoutput");
        this.instanceId = instanceProfile.getInstanceId();
        this.offsetFlushIntervalMs = instanceProfile.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
        this.offsetStoreFileName = instanceProfile.get(JOB_DATABASE_STORE_HISTORY_FILENAME, tryToInitAndGetHistoryPath()) + "/offset.dat" + this.instanceId;
        this.snapshotMode = instanceProfile.get(JOB_DATABASE_SNAPSHOT_MODE, "initial");
        this.postgreSQLMessageQueue = new LinkedBlockingQueue<>(instanceProfile.getInt(JOB_DATABASE_QUEUE_SIZE, LogFileCollectTask.CORE_THREAD_SLEEP_TIME));
        this.finished = false;
        this.offset = instanceProfile.get(JOB_DATABASE_OFFSETS, "");
        this.specificOffsetFile = instanceProfile.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
        this.specificOffsetPos = instanceProfile.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
        this.postgreSQLSnapshot = new PostgreSQLSnapshotBase(this.offsetStoreFileName);
        this.postgreSQLSnapshot.save(this.offset, this.postgreSQLSnapshot.getFile());
        Runnable build = DebeziumEngine.create(Json.class).using(getEngineProps()).notifying((list, recordCommitter) -> {
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ChangeEvent changeEvent = (ChangeEvent) it.next();
                    this.postgreSQLMessageQueue.put(Pair.of(((DebeziumFormat) GSON.fromJson((String) changeEvent.value(), DebeziumFormat.class)).getSource().getTable(), changeEvent.value()));
                    recordCommitter.markProcessed(changeEvent);
                }
                recordCommitter.markBatchFinished();
                AuditUtils.add(3, this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis(), list.size(), list.stream().mapToLong(changeEvent2 -> {
                    return ((String) changeEvent2.value()).length();
                }).sum());
                this.readerMetric.pluginReadSuccessCount.addAndGet(list.size());
                this.readerMetric.pluginReadCount.addAndGet(list.size());
            } catch (Exception e) {
                this.readerMetric.pluginReadFailCount.addAndGet(list.size());
                this.readerMetric.pluginReadCount.addAndGet(list.size());
                LOGGER.error("parse binlog message error", e);
            }
        }).using((z, str, th) -> {
            if (z) {
                return;
            }
            LOGGER.error("PostgreSQL job with jobConf {} has error {}", new Object[]{this.instanceId, str, th});
        }).build();
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.execute(build);
        LOGGER.info("get initial snapshot of job {}, snapshot {}", this.instanceId, getSnapshot());
    }

    private String tryToInitAndGetHistoryPath() {
        return AgentUtils.makeDirsIfNotExist(this.agentConf.get("agent.history.path", ".history"), this.agentConf.get("agent.home", AgentConstants.DEFAULT_AGENT_HOME)).getAbsolutePath();
    }

    private Properties getEngineProps() {
        Properties properties = new Properties();
        properties.setProperty("name", "engine" + this.instanceId);
        properties.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
        properties.setProperty("database.server.name", this.serverName);
        properties.setProperty("plugin.name", this.pluginName);
        properties.setProperty("slot.name", "slot" + this.instanceId);
        properties.setProperty("database.hostname", this.hostName);
        properties.setProperty("database.port", this.port);
        properties.setProperty("database.user", this.userName);
        properties.setProperty("database.dbname", this.dbName);
        properties.setProperty("database.password", this.password);
        properties.setProperty("offset.flush.interval.ms", this.offsetFlushIntervalMs);
        properties.setProperty("database.snapshot.mode", this.snapshotMode);
        properties.setProperty("key.converter.schemas.enable", "false");
        properties.setProperty("value.converter.schemas.enable", "false");
        properties.setProperty("snapshot.mode", this.snapshotMode);
        properties.setProperty("offset.storage.file.filename", this.offsetStoreFileName);
        if ("custom".equals(this.snapshotMode)) {
            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "job.postgreSQLJob.offset.specificOffsetFile cannot be null");
            Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "job.postgreSQLJob.offset.specificOffsetPos cannot be null");
            properties.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
            properties.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset(this.instanceId, this.specificOffsetFile, this.specificOffsetPos));
        } else {
            properties.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
        }
        properties.setProperty("tombstones.on.delete", "false");
        properties.setProperty("converters", "datetime");
        properties.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
        properties.setProperty("datetime.format.date", "yyyy-MM-dd");
        properties.setProperty("datetime.format.time", "HH:mm:ss");
        properties.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
        properties.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
        LOGGER.info("PostgreSQL job {} start with props {}", this.jobProfile.getInstanceId(), properties);
        return properties;
    }

    public void destroy() {
        synchronized (this) {
            if (!this.destroyed) {
                this.executor.shutdownNow();
                this.postgreSQLSnapshot.close();
                this.destroyed = true;
            }
        }
    }

    public boolean isFinished() {
        return this.finished;
    }

    public String getReadSource() {
        return this.instanceId;
    }

    public void setReadSource(String str) {
        this.instanceId = str;
    }

    public void setReadTimeout(long j) {
    }

    public void setWaitMillisecond(long j) {
    }

    public String getSnapshot() {
        return this.postgreSQLSnapshot != null ? this.postgreSQLSnapshot.getSnapshot() : "";
    }

    public void finishRead() {
        this.finished = true;
    }

    public boolean isSourceExist() {
        return true;
    }
}
