/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror.integration;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.BeforeEach;

public class MirrorConnectorsIntegrationTransactionsTest
extends MirrorConnectorsIntegrationBaseTest {
    @Override
    @BeforeEach
    public void startClusters() throws Exception {
        this.primaryBrokerProps.put("transaction.state.log.replication.factor", "1");
        this.backupBrokerProps.put("transaction.state.log.replication.factor", "1");
        this.primaryBrokerProps.put("transaction.state.log.min.isr", "1");
        this.backupBrokerProps.put("transaction.state.log.min.isr", "1");
        super.startClusters();
    }

    @Override
    protected Producer<byte[], byte[]> initializeProducer(EmbeddedConnectCluster cluster) {
        HashMap<String, Object> producerProps = new HashMap<String, Object>();
        producerProps.put("enable.idempotence", "true");
        producerProps.put("transactional.id", "embedded-kafka-0");
        producerProps.put("max.block.ms", TimeUnit.MINUTES.toMillis(5L));
        KafkaProducer producer = cluster.kafka().createProducer(producerProps);
        producer.initTransactions();
        return producer;
    }

    @Override
    protected void produceMessages(Producer<byte[], byte[]> producer, List<ProducerRecord<byte[], byte[]>> records) {
        try {
            producer.beginTransaction();
            super.produceMessages(producer, records);
            producer.commitTransaction();
        }
        catch (RuntimeException e) {
            producer.abortTransaction();
            throw e;
        }
    }
}

