package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig;
import io.confluent.connect.jdbc.source.TableQuerier;
import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.Version;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jdbc/source/JdbcSourceTask.class */
public class JdbcSourceTask extends SourceTask {
    private static final int CONSECUTIVE_EMPTY_RESULTS_BEFORE_RETURN = 3;
    private static final Logger log;
    private Time time;
    private JdbcSourceTaskConfig config;
    private DatabaseDialect dialect;
    CachedConnectionProvider cachedConnectionProvider;
    PriorityQueue<TableQuerier> tableQueue;
    private final AtomicBoolean running;
    private final AtomicLong taskThreadId;
    int maxRetriesPerQuerier;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: io.confluent.connect.jdbc.source.JdbcSourceTask$2, reason: invalid class name */
    /* loaded from: input_file:io/confluent/connect/jdbc/source/JdbcSourceTask$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$connect$jdbc$source$TableQuerier$QueryMode = new int[TableQuerier.QueryMode.values().length];

        static {
            try {
                $SwitchMap$io$confluent$connect$jdbc$source$TableQuerier$QueryMode[TableQuerier.QueryMode.TABLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$connect$jdbc$source$TableQuerier$QueryMode[TableQuerier.QueryMode.QUERY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public JdbcSourceTask() {
        this.tableQueue = new PriorityQueue<>();
        this.running = new AtomicBoolean(false);
        this.taskThreadId = new AtomicLong(0L);
        this.time = new SystemTime();
    }

    public JdbcSourceTask(Time time) {
        this.tableQueue = new PriorityQueue<>();
        this.running = new AtomicBoolean(false);
        this.taskThreadId = new AtomicLong(0L);
        this.time = time;
    }

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        List singletonList;
        log.info("Starting JDBC source task");
        try {
            this.config = new JdbcSourceTaskConfig(map);
            List<String> list = this.config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
            String string = this.config.getString("query");
            if (list.isEmpty() && string.isEmpty()) {
                throw new ConnectException("Task is being killed because it was not assigned a table nor a query to execute. If run in table mode please make sure that the tables exist on the database. If the table does exist on the database, we recommend using the fully qualified table name.");
            }
            if (!list.isEmpty() && !string.isEmpty()) {
                throw new ConnectException("Invalid configuration: a JdbcSourceTask cannot have both a table and a query assigned to it");
            }
            String string2 = this.config.getString("connection.url");
            int intValue = this.config.getInt("connection.attempts").intValue();
            long longValue = this.config.getLong("connection.backoff.ms").longValue();
            String string3 = this.config.getString("dialect.name");
            if (string3 == null || string3.trim().isEmpty()) {
                this.dialect = DatabaseDialects.findBestFor(string2, this.config);
            } else {
                this.dialect = DatabaseDialects.create(string3, this.config);
            }
            log.info("Using JDBC dialect {}", this.dialect.name());
            this.cachedConnectionProvider = connectionProvider(intValue, longValue);
            this.dialect.setConnectionIsolationMode(this.cachedConnectionProvider.getConnection(), JdbcSourceConnectorConfig.TransactionIsolationMode.valueOf(this.config.getString(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG)));
            TableQuerier.QueryMode queryMode = !string.isEmpty() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE;
            List<String> singletonList2 = queryMode == TableQuerier.QueryMode.QUERY ? Collections.singletonList(string) : list;
            String string4 = this.config.getString(JdbcSourceConnectorConfig.MODE_CONFIG);
            HashMap hashMap = new HashMap();
            Map map2 = null;
            if (string4.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) || string4.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) || string4.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) {
                ArrayList arrayList = new ArrayList(list.size());
                switch (AnonymousClass2.$SwitchMap$io$confluent$connect$jdbc$source$TableQuerier$QueryMode[queryMode.ordinal()]) {
                    case JdbcSourceConnectorConfig.VALIDATE_NON_NULL_DEFAULT /* 1 */:
                        log.trace("Starting in TABLE mode");
                        for (String str : list) {
                            List<Map<String, String>> possibleTablePartitions = possibleTablePartitions(str);
                            arrayList.addAll(possibleTablePartitions);
                            hashMap.put(str, possibleTablePartitions);
                        }
                        break;
                    case 2:
                        log.trace("Starting in QUERY mode");
                        arrayList.add(Collections.singletonMap("query", "query"));
                        break;
                    default:
                        throw new ConnectException("Unknown query mode: " + queryMode);
                }
                map2 = this.context.offsetStorageReader().offsets(arrayList);
                log.trace("The partition offsets are {}", map2);
            }
            String string5 = this.config.getString(JdbcSourceConnectorConfig.INCREMENTING_COLUMN_NAME_CONFIG);
            List list2 = this.config.getList(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG);
            Long l = this.config.getLong(JdbcSourceConnectorConfig.TIMESTAMP_DELAY_INTERVAL_MS_CONFIG);
            boolean booleanValue = this.config.getBoolean(JdbcSourceConnectorConfig.VALIDATE_NON_NULL_CONFIG).booleanValue();
            TimeZone timeZone = this.config.timeZone();
            String trim = this.config.getString(JdbcSourceConnectorConfig.QUERY_SUFFIX_CONFIG).trim();
            for (String str2 : singletonList2) {
                switch (AnonymousClass2.$SwitchMap$io$confluent$connect$jdbc$source$TableQuerier$QueryMode[queryMode.ordinal()]) {
                    case JdbcSourceConnectorConfig.VALIDATE_NON_NULL_DEFAULT /* 1 */:
                        if (booleanValue) {
                            validateNonNullable(string4, str2, string5, list2);
                        }
                        singletonList = (List) hashMap.get(str2);
                        break;
                    case 2:
                        singletonList = Collections.singletonList(Collections.singletonMap("query", "query"));
                        break;
                    default:
                        throw new ConnectException("Unexpected query mode: " + queryMode);
                }
                Map<String, Object> map3 = null;
                if (map2 != null) {
                    Iterator it = singletonList.iterator();
                    while (true) {
                        if (it.hasNext()) {
                            Map map4 = (Map) it.next();
                            map3 = (Map) map2.get(map4);
                            if (map3 != null) {
                                log.info("Found offset {} for partition {}", map2, map4);
                            }
                        }
                    }
                }
                Map<String, Object> computeInitialOffset = computeInitialOffset(str2, map3, timeZone);
                String str3 = this.config.topicPrefix();
                JdbcSourceConnectorConfig.TimestampGranularity timestampGranularity = JdbcSourceConnectorConfig.TimestampGranularity.get(this.config);
                if (string4.equals(JdbcSourceConnectorConfig.MODE_BULK)) {
                    this.tableQueue.add(new BulkTableQuerier(this.dialect, queryMode, str2, str3, trim));
                } else if (string4.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING)) {
                    this.tableQueue.add(new TimestampIncrementingTableQuerier(this.dialect, queryMode, str2, str3, null, string5, computeInitialOffset, l, timeZone, trim, timestampGranularity));
                } else if (string4.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP)) {
                    this.tableQueue.add(new TimestampTableQuerier(this.dialect, queryMode, str2, str3, list2, computeInitialOffset, l, timeZone, trim, timestampGranularity));
                } else if (string4.endsWith(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) {
                    this.tableQueue.add(new TimestampIncrementingTableQuerier(this.dialect, queryMode, str2, str3, list2, string5, computeInitialOffset, l, timeZone, trim, timestampGranularity));
                }
            }
            this.running.set(true);
            this.taskThreadId.set(Thread.currentThread().getId());
            log.info("Started JDBC source task");
            this.maxRetriesPerQuerier = this.config.getInt(JdbcSourceConnectorConfig.QUERY_RETRIES_CONFIG).intValue();
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
        }
    }

    protected CachedConnectionProvider connectionProvider(int i, long j) {
        return new CachedConnectionProvider(this.dialect, i, j) { // from class: io.confluent.connect.jdbc.source.JdbcSourceTask.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.confluent.connect.jdbc.util.CachedConnectionProvider
            public void onConnect(Connection connection) throws SQLException {
                super.onConnect(connection);
                connection.setAutoCommit(false);
            }
        };
    }

    private List<Map<String, String>> possibleTablePartitions(String str) {
        TableId parseTableIdentifier = this.dialect.parseTableIdentifier(str);
        return Arrays.asList(OffsetProtocols.sourcePartitionForProtocolV1(parseTableIdentifier), OffsetProtocols.sourcePartitionForProtocolV0(parseTableIdentifier));
    }

    protected Map<String, Object> computeInitialOffset(String str, Map<String, Object> map, TimeZone timeZone) {
        if (map != null) {
            return map;
        }
        HashMap hashMap = null;
        Long l = this.config.getLong(JdbcSourceConnectorConfig.TIMESTAMP_INITIAL_CONFIG);
        if (l != null) {
            if (l == JdbcSourceConnectorConfig.TIMESTAMP_INITIAL_CURRENT) {
                try {
                    l = Long.valueOf(this.dialect.currentTimeOnDB(this.cachedConnectionProvider.getConnection(), Calendar.getInstance(timeZone)).getTime());
                } catch (SQLException e) {
                    throw new ConnectException("Error while getting initial timestamp from database", e);
                }
            }
            hashMap = new HashMap();
            hashMap.put(JdbcSourceConnectorConfig.MODE_TIMESTAMP, l);
            log.info("No offsets found for '{}', so using configured timestamp {}", str, l);
        }
        return hashMap;
    }

    public void stop() throws ConnectException {
        log.info("Stopping JDBC source task");
        this.running.set(false);
        if (this.taskThreadId.longValue() == Thread.currentThread().getId()) {
            shutdown();
        }
    }

    protected void closeResources() {
        log.info("Closing resources for JDBC source task");
        try {
            try {
                if (this.cachedConnectionProvider != null) {
                    this.cachedConnectionProvider.close(true);
                }
                this.cachedConnectionProvider = null;
                try {
                    if (this.dialect != null) {
                        this.dialect.close();
                    }
                } catch (Throwable th) {
                    log.warn("Error while closing the {} dialect: ", this.dialect.name(), th);
                } finally {
                }
            } catch (Throwable th2) {
                log.warn("Error while closing the connections", th2);
                this.cachedConnectionProvider = null;
                try {
                    if (this.dialect != null) {
                        this.dialect.close();
                    }
                } catch (Throwable th3) {
                    log.warn("Error while closing the {} dialect: ", this.dialect.name(), th3);
                } finally {
                }
            }
        } catch (Throwable th4) {
            this.cachedConnectionProvider = null;
            try {
                try {
                    if (this.dialect != null) {
                        this.dialect.close();
                    }
                } catch (Throwable th5) {
                    log.warn("Error while closing the {} dialect: ", this.dialect.name(), th5);
                    this.dialect = null;
                    throw th4;
                }
                throw th4;
            } finally {
            }
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        ArrayList arrayList;
        log.trace("{} Polling for new data");
        Map map = (Map) this.tableQueue.stream().collect(Collectors.toMap(Function.identity(), tableQuerier -> {
            return 0;
        }));
        while (this.running.get()) {
            TableQuerier peek = this.tableQueue.peek();
            try {
                if (!peek.querying()) {
                    long lastUpdate = peek.getLastUpdate() + this.config.getInt(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG).intValue();
                    long milliseconds = this.time.milliseconds();
                    long min = Math.min(lastUpdate - milliseconds, 100L);
                    if (min > 0) {
                        log.trace("Waiting {} ms to poll {} next", Long.valueOf(lastUpdate - milliseconds), peek.toString());
                        this.time.sleep(min);
                    }
                }
                log.debug("Checking for next block of results from {}", peek.toString());
                peek.maybeStartQuery(this.cachedConnectionProvider.getConnection());
                int intValue = this.config.getInt(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG).intValue();
                boolean z = true;
                while (arrayList.size() < intValue) {
                    boolean next = peek.next();
                    z = next;
                    if (!next) {
                        break;
                    }
                    arrayList.add(peek.extractRecord());
                }
                peek.resetRetryCount();
                if (!z) {
                    resetAndRequeueHead(peek, false);
                }
                if (!arrayList.isEmpty()) {
                    map.put(peek, 0);
                    log.debug("Returning {} records for {}", Integer.valueOf(arrayList.size()), peek);
                    return arrayList;
                }
                map.compute(peek, (tableQuerier2, num) -> {
                    return Integer.valueOf(num.intValue() + 1);
                });
                log.trace("No updates for {}", peek.toString());
                if (((Integer) Collections.min(map.values())).intValue() >= 3) {
                    log.trace("More than 3 consecutive empty results for all queriers, returning");
                    return null;
                }
            } catch (SQLNonTransientException e) {
                log.error("Non-transient SQL exception while running query for table: {}", peek, e);
                resetAndRequeueHead(peek, true);
                closeResources();
                throw new ConnectException(e);
            } catch (SQLException e2) {
                log.error("SQL exception while running query for table: {}, {}. Attempting retry {} of {} attempts.", new Object[]{peek, e2, Integer.valueOf(peek.getAttemptedRetryCount() + 1), Integer.valueOf(this.maxRetriesPerQuerier)});
                resetAndRequeueHead(peek, true);
                if (this.maxRetriesPerQuerier <= 0 || peek.getAttemptedRetryCount() < this.maxRetriesPerQuerier) {
                    peek.incrementRetryCount();
                    return null;
                }
                closeResources();
                throw new ConnectException("Failed to Query table after retries", e2);
            } catch (Throwable th) {
                log.error("Failed to run query for table: {}", peek, th);
                resetAndRequeueHead(peek, true);
                closeResources();
                throw th;
            }
            arrayList = new ArrayList();
        }
        shutdown();
        return null;
    }

    private void shutdown() {
        TableQuerier peek = this.tableQueue.peek();
        if (peek != null) {
            resetAndRequeueHead(peek, true);
        }
        closeResources();
    }

    private void resetAndRequeueHead(TableQuerier tableQuerier, boolean z) {
        log.debug("Resetting querier {}", tableQuerier.toString());
        TableQuerier poll = this.tableQueue.poll();
        if (!$assertionsDisabled && poll != tableQuerier) {
            throw new AssertionError();
        }
        tableQuerier.reset(this.time.milliseconds(), z);
        this.tableQueue.add(tableQuerier);
    }

    /* JADX WARN: Finally extract failed */
    private void validateNonNullable(String str, String str2, String str3, List<String> list) {
        try {
            HashSet hashSet = new HashSet();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().toLowerCase(Locale.getDefault()));
            }
            boolean z = false;
            boolean z2 = false;
            Connection connection = this.cachedConnectionProvider.getConnection();
            boolean autoCommit = connection.getAutoCommit();
            try {
                connection.setAutoCommit(true);
                for (ColumnDefinition columnDefinition : this.dialect.describeColumns(connection, str2, null).values()) {
                    String name = columnDefinition.id().name();
                    if (name.equalsIgnoreCase(str3)) {
                        z = columnDefinition.isOptional();
                    } else if (hashSet.contains(name.toLowerCase(Locale.getDefault())) && !columnDefinition.isOptional()) {
                        z2 = true;
                    }
                }
                connection.setAutoCommit(autoCommit);
                if ((str.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) || str.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) && z) {
                    throw new ConnectException("Cannot make incremental queries using incrementing column " + str3 + " on " + str2 + " because this column is nullable.");
                }
                if ((str.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) || str.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) && !z2) {
                    throw new ConnectException("Cannot make incremental queries using timestamp columns " + list + " on " + str2 + " because all of these columns nullable.");
                }
            } catch (Throwable th) {
                connection.setAutoCommit(autoCommit);
                throw th;
            }
        } catch (SQLException e) {
            throw new ConnectException("Failed trying to validate that columns used for offsets are NOT NULL", e);
        }
    }

    static {
        $assertionsDisabled = !JdbcSourceTask.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(JdbcSourceTask.class);
    }
}
