package org.apache.flink.cdc.connectors.mysql.source.reader;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import java.util.Collections;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.class */
public class MySqlRecordEmitterTest {
    @Test
    public void testHeartbeatEventHandling() throws Exception {
        Configuration build = JdbcConfiguration.create().with(Heartbeat.HEARTBEAT_INTERVAL, 100).with(CommonConnectorConfig.TRANSACTION_TOPIC, "fake-topic").with(MySqlConnectorConfig.SERVER_NAME, "mysql_binlog_source").build();
        Heartbeat createHeartbeat = new HeartbeatFactory(new MySqlConnectorConfig(build), TopicSelector.defaultSelector(new MySqlConnectorConfig(build), (tableId, str, str2) -> {
            return "fake-topic";
        }), SchemaNameAdjuster.create()).createHeartbeat();
        BinlogOffset ofBinlogFilePosition = BinlogOffset.ofBinlogFilePosition("fake-file", 15213L);
        MySqlRecordEmitter<Void> createRecordEmitter = createRecordEmitter();
        MySqlBinlogSplitState createBinlogSplitState = createBinlogSplitState();
        createHeartbeat.forcedBeat(Collections.emptyMap(), ofBinlogFilePosition.getOffset(), sourceRecord -> {
            try {
                createRecordEmitter.emitRecord(SourceRecords.fromSingleRecord(sourceRecord), new TestingReaderOutput(), createBinlogSplitState);
            } catch (Exception e) {
                throw new RuntimeException("Failed to emit heartbeat record", e);
            }
        });
        createHeartbeat.close();
        Assert.assertNotNull(createBinlogSplitState.getStartingOffset());
        Assert.assertEquals(0L, createBinlogSplitState.getStartingOffset().compareTo(ofBinlogFilePosition));
    }

    private MySqlRecordEmitter<Void> createRecordEmitter() {
        return new MySqlRecordEmitter<>(new DebeziumDeserializationSchema<Void>() { // from class: org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitterTest.1
            public void deserialize(SourceRecord sourceRecord, Collector<Void> collector) {
                throw new UnsupportedOperationException();
            }

            public TypeInformation<Void> getProducedType() {
                return TypeInformation.of(Void.class);
            }
        }, new MySqlSourceReaderMetrics(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false);
    }

    private MySqlBinlogSplitState createBinlogSplitState() {
        return new MySqlBinlogSplitState(new MySqlBinlogSplit("binlog-split", BinlogOffset.ofEarliest(), BinlogOffset.ofNonStopping(), Collections.emptyList(), Collections.emptyMap(), 0));
    }
}
