package org.apache.rocketmq.streams.db.driver.batchloader;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;

/* loaded from: input_file:org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.class */
public class BatchRowLoader {
    protected static final int MAX_LINE = 5000;
    protected String idFieldName;
    protected String sql;
    protected IRowOperator dataRowProcessor;
    private static final Log LOG = LogFactory.getLog(BatchRowLoader.class);
    protected static ExecutorService executorService = null;
    protected int batchSize = 1000;
    private JDBCDriver jdbcDriver = DriverBuilder.createDriver();

    /* loaded from: input_file:org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader$FetchDataTask.class */
    protected class FetchDataTask implements Runnable {
        long startIndex;
        long endIndex;
        String sql;
        CountDownLatch countDownLatch;
        JDBCDriver resource;
        AtomicInteger finishedCount;
        int totalSize;

        public FetchDataTask(String str, long j, long j2, CountDownLatch countDownLatch, AtomicInteger atomicInteger, JDBCDriver jDBCDriver, int i) {
            this.startIndex = j;
            this.endIndex = j2;
            this.countDownLatch = countDownLatch;
            this.sql = str;
            this.finishedCount = atomicInteger;
            this.resource = jDBCDriver;
            this.totalSize = i;
        }

        /* JADX WARN: Code restructure failed: missing block: B:12:0x00ef, code lost:
        
            if (r0 <= 0) goto L13;
         */
        /* JADX WARN: Code restructure failed: missing block: B:13:0x00f2, code lost:
        
            r9.this$0.doProcess(r0);
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 288
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.streams.db.driver.batchloader.BatchRowLoader.FetchDataTask.run():void");
        }
    }

    public BatchRowLoader(String str, String str2, IRowOperator iRowOperator) {
        this.idFieldName = str;
        this.sql = str2;
        this.dataRowProcessor = iRowOperator;
        executorService = new ThreadPoolExecutor(20, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1000));
    }

    public void startLoadData() {
        try {
            try {
                String str = this.sql;
                String str2 = "select count(1) as c, min(" + this.idFieldName + ") as min, max(" + this.idFieldName + ") as max " + this.sql.substring(this.sql.toLowerCase().indexOf("from"));
                Map<String, Object> map = this.jdbcDriver.queryForList(str2).get(0);
                int intValue = Integer.valueOf(map.get("c").toString()).intValue();
                if (intValue == 0) {
                    LOG.warn("there is no data during execute sql: " + str2);
                    this.jdbcDriver.destroy();
                    return;
                }
                new IntValueKV(intValue);
                long longValue = Long.valueOf(map.get("min").toString()).longValue();
                long longValue2 = Long.valueOf(map.get("max").toString()).longValue();
                int i = (intValue / MAX_LINE) + 1;
                long j = ((longValue2 - longValue) + 1) / i;
                CountDownLatch countDownLatch = new CountDownLatch(i + 1);
                AtomicInteger atomicInteger = new AtomicInteger(0);
                String str3 = this.sql.indexOf(" where ") != -1 ? this.sql + " and " + this.idFieldName + ">#{startIndex} and " + this.idFieldName + "<=#{endIndex} order by " + this.idFieldName + " limit " + this.batchSize : this.sql + " where " + this.idFieldName + ">#{startIndex} and " + this.idFieldName + "<=#{endIndex} order by " + this.idFieldName + " limit " + this.batchSize;
                int i2 = 0;
                while (i2 < i) {
                    executorService.execute(new FetchDataTask(str3, (longValue - 1) + (j * i2), (longValue - 1) + (j * (i2 + 1)), countDownLatch, atomicInteger, this.jdbcDriver, intValue));
                    i2++;
                }
                executorService.execute(new FetchDataTask(str3, (longValue - 1) + (j * i2), (longValue - 1) + (j * (i2 + 1)), countDownLatch, atomicInteger, this.jdbcDriver, intValue));
                countDownLatch.await();
                LOG.info(getClass().getSimpleName() + " load data finish, load data line  size is " + intValue);
                this.jdbcDriver.destroy();
            } catch (Exception e) {
                LOG.error("failed loading data batch!", e);
                this.jdbcDriver.destroy();
            }
        } catch (Throwable th) {
            this.jdbcDriver.destroy();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doProcess(List<Map<String, Object>> list) {
        Iterator<Map<String, Object>> it = list.iterator();
        while (it.hasNext()) {
            this.dataRowProcessor.doProcess(it.next());
        }
    }
}
