package org.apache.inlong.audit.service;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.inlong.audit.cache.HalfHourCache;
import org.apache.inlong.audit.cache.HourCache;
import org.apache.inlong.audit.cache.TenMinutesCache;
import org.apache.inlong.audit.channel.DataQueue;
import org.apache.inlong.audit.config.ConfigConstants;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.config.SqlConstants;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.SinkConfig;
import org.apache.inlong.audit.entities.SourceConfig;
import org.apache.inlong.audit.sink.CacheSink;
import org.apache.inlong.audit.sink.JdbcSink;
import org.apache.inlong.audit.source.JdbcSource;
import org.apache.inlong.audit.utils.JdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/EtlService.class */
public class EtlService {
    private static final Logger LOGGER = LoggerFactory.getLogger(EtlService.class);
    private JdbcSource mysqlSourceOfTemp;
    private JdbcSource mysqlSourceOfTenMinutesCache;
    private JdbcSource mysqlSourceOfHalfHourCache;
    private JdbcSource mysqlSourceOfHourCache;
    private JdbcSink mysqlSinkOfDay;
    private JdbcSink mysqlSinkOfTemp;
    private CacheSink cacheSinkOfTenMinutesCache;
    private CacheSink cacheSinkOfHalfHourCache;
    private CacheSink cacheSinkOfHourCache;
    private final List<JdbcSource> auditJdbcSources = new LinkedList();
    private final int queueSize = Configuration.getInstance().get(ConfigConstants.KEY_DATA_QUEUE_SIZE, ConfigConstants.DEFAULT_DATA_QUEUE_SIZE);
    private final int statBackTimes = Configuration.getInstance().get(ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES, 6);
    private final String serviceId = Configuration.getInstance().get(ConfigConstants.KEY_SELECTOR_SERVICE_ID, ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID);

    public void start() {
        mysqlToMysqlOfDay();
        mysqlToTenMinutesCache();
        mysqlToHalfHourCache();
        mysqlToHourCache();
    }

    private void mysqlToMysqlOfDay() {
        DataQueue dataQueue = new DataQueue(this.queueSize);
        this.mysqlSourceOfTemp = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.DAY, Configuration.getInstance().get(ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES, 2)));
        this.mysqlSourceOfTemp.start();
        this.mysqlSinkOfDay = new JdbcSink(dataQueue, buildMysqlSinkConfig(Configuration.getInstance().get(SqlConstants.KEY_MYSQL_SINK_INSERT_DAY_SQL, SqlConstants.DEFAULT_MYSQL_SINK_INSERT_DAY_SQL)));
        this.mysqlSinkOfDay.start();
    }

    private void mysqlToTenMinutesCache() {
        DataQueue dataQueue = new DataQueue(this.queueSize);
        this.mysqlSourceOfTenMinutesCache = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_10, this.statBackTimes));
        this.mysqlSourceOfTenMinutesCache.start();
        this.cacheSinkOfTenMinutesCache = new CacheSink(dataQueue, TenMinutesCache.getInstance().getCache());
        this.cacheSinkOfTenMinutesCache.start();
    }

    private void mysqlToHalfHourCache() {
        DataQueue dataQueue = new DataQueue(this.queueSize);
        this.mysqlSourceOfHalfHourCache = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_30, this.statBackTimes));
        this.mysqlSourceOfHalfHourCache.start();
        this.cacheSinkOfHalfHourCache = new CacheSink(dataQueue, HalfHourCache.getInstance().getCache());
        this.cacheSinkOfHalfHourCache.start();
    }

    private void mysqlToHourCache() {
        DataQueue dataQueue = new DataQueue(this.queueSize);
        this.mysqlSourceOfHourCache = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.HOUR, this.statBackTimes));
        this.mysqlSourceOfHourCache.start();
        this.cacheSinkOfHourCache = new CacheSink(dataQueue, HourCache.getInstance().getCache());
        this.cacheSinkOfHourCache.start();
    }

    public void auditSourceToMysql() {
        DataQueue dataQueue = new DataQueue(this.queueSize);
        for (JdbcConfig jdbcConfig : ConfigService.getInstance().getAuditSourceByServiceId(this.serviceId)) {
            JdbcSource jdbcSource = new JdbcSource(dataQueue, buildAuditJdbcSourceConfig(jdbcConfig));
            jdbcSource.start();
            this.auditJdbcSources.add(jdbcSource);
            LOGGER.info("Audit source to mysql jdbc config:{}", jdbcConfig);
        }
        this.mysqlSinkOfTemp = new JdbcSink(dataQueue, buildMysqlSinkConfig(Configuration.getInstance().get(SqlConstants.KEY_MYSQL_SINK_INSERT_TEMP_SQL, SqlConstants.DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL)));
        this.mysqlSinkOfTemp.start();
    }

    private SinkConfig buildMysqlSinkConfig(String str) {
        JdbcConfig buildMysqlConfig = JdbcUtils.buildMysqlConfig();
        return new SinkConfig(str, buildMysqlConfig.getDriverClass(), buildMysqlConfig.getJdbcUrl(), buildMysqlConfig.getUserName(), buildMysqlConfig.getPassword());
    }

    private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int i) {
        JdbcConfig buildMysqlConfig = JdbcUtils.buildMysqlConfig();
        return new SourceConfig(auditCycle, Configuration.getInstance().get(SqlConstants.KEY_MYSQL_SOURCE_QUERY_TEMP_SQL, SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL), i, buildMysqlConfig.getDriverClass(), buildMysqlConfig.getJdbcUrl(), buildMysqlConfig.getUserName(), buildMysqlConfig.getPassword());
    }

    private SourceConfig buildAuditJdbcSourceConfig(JdbcConfig jdbcConfig) {
        return new SourceConfig(AuditCycle.MINUTE_5, Configuration.getInstance().get(SqlConstants.KEY_SOURCE_STAT_SQL, SqlConstants.DEFAULT_SOURCE_STAT_SQL), Configuration.getInstance().get(ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES, 6), jdbcConfig.getDriverClass(), jdbcConfig.getJdbcUrl(), jdbcConfig.getUserName(), jdbcConfig.getPassword(), true);
    }

    public void stop() {
        this.mysqlSourceOfTemp.destroy();
        this.mysqlSinkOfDay.destroy();
        Iterator<JdbcSource> it = this.auditJdbcSources.iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        if (null != this.mysqlSinkOfTemp) {
            this.mysqlSinkOfTemp.destroy();
        }
        this.mysqlSourceOfTenMinutesCache.destroy();
        this.mysqlSourceOfHalfHourCache.destroy();
        this.mysqlSourceOfHourCache.destroy();
        this.cacheSinkOfTenMinutesCache.destroy();
        this.cacheSinkOfHalfHourCache.destroy();
        this.cacheSinkOfHourCache.destroy();
    }
}
