package io.debezium.connector.mysql.signal;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.signal.ExecuteSnapshot;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Collect;
import io.debezium.util.Threads;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigDef;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mysql/signal/KafkaSignalThread.class */
public class KafkaSignalThread<T extends DataCollectionId> {
    private final ExecutorService signalTopicListenerExecutor;
    private final String topicName;
    private final String connectorName;
    private final Duration pollTimeoutMs;
    private final MySqlReadOnlyIncrementalSnapshotChangeEventSource<T> eventSource;
    private final KafkaConsumer<String, String> signalsConsumer;
    public static final String CONFIGURATION_FIELD_PREFIX_STRING = "signal.";
    private static final String CONSUMER_PREFIX = "signal.consumer.";
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSignalThread.class);
    public static final Field SIGNAL_TOPIC = Field.create("signal.kafka.topic").withDisplayName("Signal topic name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the signals to the connector").withValidation(Field::isRequired);
    public static final Field BOOTSTRAP_SERVERS = Field.create("signal.kafka.bootstrap.servers").withDisplayName("Kafka broker addresses").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("A list of host/port pairs that the connector will use for establishing the initial connection to the Kafka cluster for retrieving signals to the connector.This should point to the same Kafka cluster used by the Kafka Connect process.").withValidation(Field::isRequired);
    public static final Field SIGNAL_POLL_TIMEOUT_MS = Field.create("signal.signal.kafka.poll.timeout.ms").withDisplayName("Poll timeout for kafka signals (ms)").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of milliseconds to wait while polling signals.").withDefault(100).withValidation(Field::isNonNegativeInteger);

    public KafkaSignalThread(Class<? extends SourceConnector> cls, CommonConnectorConfig commonConnectorConfig, MySqlReadOnlyIncrementalSnapshotChangeEventSource<T> mySqlReadOnlyIncrementalSnapshotChangeEventSource) {
        this.connectorName = commonConnectorConfig.getLogicalName();
        this.signalTopicListenerExecutor = Threads.newSingleThreadExecutor(cls, this.connectorName, "kafka-signal", true);
        Configuration build = commonConnectorConfig.getConfig().subset(CONFIGURATION_FIELD_PREFIX_STRING, false).edit().withDefault(SIGNAL_TOPIC, this.connectorName + "-signal").build();
        this.eventSource = mySqlReadOnlyIncrementalSnapshotChangeEventSource;
        this.topicName = build.getString(SIGNAL_TOPIC);
        this.pollTimeoutMs = Duration.ofMillis(build.getInteger(SIGNAL_POLL_TIMEOUT_MS));
        this.signalsConsumer = new KafkaConsumer<>(build.subset(CONSUMER_PREFIX, true).edit().withDefault("bootstrap.servers", build.getString(BOOTSTRAP_SERVERS)).withDefault("client.id", UUID.randomUUID().toString()).withDefault("group.id", "kafka-signal").withDefault(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1).withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false).withDefault("session.timeout.ms", 10000).withDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class).withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class).build().asProperties());
        LOGGER.info("Subscribing to signals topic '{}'", this.topicName);
        this.signalsConsumer.assign(Collect.arrayListOf(new TopicPartition(this.topicName, 0), new TopicPartition[0]));
    }

    public void start() {
        this.signalTopicListenerExecutor.submit(this::monitorSignals);
    }

    private void monitorSignals() {
        while (true) {
            Iterator<ConsumerRecord<String, String>> it = this.signalsConsumer.poll(this.pollTimeoutMs.toMillis()).iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, String> next = it.next();
                try {
                    processSignal(next);
                } catch (InterruptedException e) {
                    LOGGER.error("Signals processing was interrupted", e);
                    this.signalsConsumer.close();
                    return;
                } catch (Exception e2) {
                    LOGGER.error("Skipped signal due to an error '{}'", next, e2);
                }
            }
        }
    }

    private void processSignal(ConsumerRecord<String, String> consumerRecord) throws IOException, InterruptedException {
        if (!this.connectorName.equals(consumerRecord.key())) {
            LOGGER.info("Signal key '{}' doesn't match the connector's name '{}'", consumerRecord.key(), this.connectorName);
            return;
        }
        String value = consumerRecord.value();
        LOGGER.trace("Processing signal: {}", value);
        Document create = (value == null || value.isEmpty()) ? Document.create() : DocumentReader.defaultReader().read(value);
        String string = create.getString("type");
        Document document = create.getDocument(CloudEventsMaker.FieldName.DATA);
        if (ExecuteSnapshot.NAME.equals(string)) {
            executeSnapshot(document, consumerRecord.offset());
        } else {
            LOGGER.warn("Unknown signal type {}", string);
        }
    }

    private void executeSnapshot(Document document, long j) {
        List<String> dataCollections = ExecuteSnapshot.getDataCollections(document);
        if (dataCollections != null) {
            ExecuteSnapshot.SnapshotType snapshotType = ExecuteSnapshot.getSnapshotType(document);
            LOGGER.info("Requested '{}' snapshot of data collections '{}'", snapshotType, dataCollections);
            if (snapshotType == ExecuteSnapshot.SnapshotType.INCREMENTAL) {
                this.eventSource.enqueueDataCollectionNamesToSnapshot(dataCollections, j);
            }
        }
    }

    public void seek(long j) {
        this.signalsConsumer.seek(new TopicPartition(this.topicName, 0), j + 1);
    }
}
