package org.apache.druid.testing.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:org/apache/druid/testing/utils/KafkaEventWriter.class */
public class KafkaEventWriter implements StreamEventWriter {
    private final KafkaProducer<String, byte[]> producer;
    private final boolean txnEnabled;
    private final List<Future<RecordMetadata>> pendingWriteRecords = new ArrayList();

    public KafkaEventWriter(IntegrationTestingConfig integrationTestingConfig, boolean z) {
        Properties properties = new Properties();
        KafkaUtil.addPropertiesFromTestConfig(integrationTestingConfig, properties);
        properties.setProperty("bootstrap.servers", integrationTestingConfig.getKafkaHost());
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "3");
        properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        this.txnEnabled = z;
        if (z) {
            properties.setProperty("enable.idempotence", "true");
            properties.setProperty("transactional.id", IdUtils.getRandomId());
        }
        this.producer = new KafkaProducer<>(properties, new StringSerializer(), new ByteArraySerializer());
        if (z) {
            this.producer.initTransactions();
        }
    }

    @Override // org.apache.druid.testing.utils.StreamEventWriter
    public boolean supportTransaction() {
        return true;
    }

    @Override // org.apache.druid.testing.utils.StreamEventWriter
    public boolean isTransactionEnabled() {
        return this.txnEnabled;
    }

    @Override // org.apache.druid.testing.utils.StreamEventWriter
    public void initTransaction() {
        if (!this.txnEnabled) {
            throw new IllegalStateException("Kafka writer was initialized with transaction disabled");
        }
        this.producer.beginTransaction();
    }

    @Override // org.apache.druid.testing.utils.StreamEventWriter
    public void commitTransaction() {
        if (!this.txnEnabled) {
            throw new IllegalStateException("Kafka writer was initialized with transaction disabled");
        }
        this.producer.commitTransaction();
    }

    @Override // org.apache.druid.testing.utils.StreamEventWriter
    public void write(String str, byte[] bArr) {
        this.pendingWriteRecords.add(this.producer.send(new ProducerRecord(str, bArr)));
    }

    @Override // org.apache.druid.testing.utils.StreamEventWriter, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        flush();
        this.producer.close();
    }

    @Override // org.apache.druid.testing.utils.StreamEventWriter
    public void flush() {
        Exception exc = null;
        Iterator<Future<RecordMetadata>> it = this.pendingWriteRecords.iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (InterruptedException | ExecutionException e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (exc == null) {
                    exc = e;
                } else {
                    exc.addSuppressed(e);
                }
            }
        }
        this.pendingWriteRecords.clear();
        if (exc != null) {
            throw new RuntimeException(exc);
        }
    }
}
