/*
 * Decompiled with CFR 0.152.
 */
package io.castled.kafka.producer;

import com.google.common.collect.Maps;
import io.castled.exceptions.CastledException;
import io.castled.kafka.producer.CastledProducerCallback;
import io.castled.kafka.producer.KafkaProducerConfiguration;
import io.castled.utils.TimeUtils;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CastledKafkaProducer
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(CastledKafkaProducer.class);
    private final KafkaProducer<byte[], byte[]> kafkaProducer;

    public CastledKafkaProducer(KafkaProducerConfiguration producerConfiguration) {
        HashMap producerProps = Maps.newHashMap();
        producerProps.put("bootstrap.servers", producerConfiguration.getBootstrapServers());
        producerProps.put("key.serializer", ByteArraySerializer.class.getCanonicalName());
        producerProps.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
        producerProps.put("retries", Integer.MAX_VALUE);
        producerProps.put("acks", "all");
        producerProps.put("max.in.flight.requests.per.connection", 1);
        producerProps.put("compression.type", "gzip");
        producerProps.put("max.request.size", 0x8000000);
        producerProps.put("buffer.memory", 0x2000000L);
        producerProps.put("enable.idempotence", true);
        producerProps.put("batch.size", 524288);
        producerProps.put("request.timeout.ms", (int)TimeUtils.minutesToMillis(15L));
        producerProps.put("max.block.ms", Long.MAX_VALUE);
        producerProps.put("delivery.timeout.ms", (int)TimeUtils.minutesToMillis(20L));
        producerProps.putAll(producerConfiguration.getProps());
        this.kafkaProducer = new KafkaProducer((Map)producerProps);
    }

    public void publish(ProducerRecord<byte[], byte[]> producerRecord) {
        this.publish(producerRecord, null);
    }

    public void publish(ProducerRecord<byte[], byte[]> producerRecord, CastledProducerCallback castledProducerCallback) {
        this.kafkaProducer.send(producerRecord, (Callback)new KafkaCallbackWrapper(castledProducerCallback));
    }

    public void publishSync(ProducerRecord<byte[], byte[]> producerRecord) throws CastledException {
        try {
            this.kafkaProducer.send(producerRecord).get();
        }
        catch (Exception e) {
            log.error("Publish sync failed for topic {}", (Object)producerRecord.topic(), (Object)e);
            throw new CastledException(e.getMessage());
        }
    }

    public void flush() {
        this.kafkaProducer.flush();
    }

    @Override
    public void close() {
        this.kafkaProducer.close();
    }

    private static class KafkaCallbackWrapper
    implements Callback {
        private static final Logger log = LoggerFactory.getLogger(KafkaCallbackWrapper.class);
        private final CastledProducerCallback castledProducerCallback;

        public KafkaCallbackWrapper(CastledProducerCallback castledProducerCallback) {
            this.castledProducerCallback = castledProducerCallback;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (this.castledProducerCallback == null) {
                return;
            }
            if (exception != null) {
                this.castledProducerCallback.onFailure(metadata, exception);
                log.error("Failed to publish records for topic {} and partition {}", new Object[]{metadata.topic(), metadata.partition(), exception});
                return;
            }
            this.castledProducerCallback.onSuccess(metadata);
        }
    }
}

