package net.uncontended.precipice.samples.kafka;

import net.uncontended.precipice.AbstractService;
import net.uncontended.precipice.AsyncService;
import net.uncontended.precipice.ResilientAction;
import net.uncontended.precipice.ServiceProperties;
import net.uncontended.precipice.concurrent.Eventual;
import net.uncontended.precipice.concurrent.PrecipiceFuture;
import net.uncontended.precipice.concurrent.PrecipicePromise;
import net.uncontended.precipice.metrics.Metric;
import net.uncontended.precipice.timeout.ActionTimeoutException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TimeoutException;

/* loaded from: input_file:net/uncontended/precipice/samples/kafka/KafkaService.class */
public class KafkaService<K, V> extends AbstractService implements AsyncService {
    private final KafkaProducer<K, V> producer;

    /* loaded from: input_file:net/uncontended/precipice/samples/kafka/KafkaService$RecordMetadataAction.class */
    private static class RecordMetadataAction<K, V> extends KafkaAction<RecordMetadata, K, V> {
        public RecordMetadataAction(ProducerRecord<K, V> producerRecord) {
            super(producerRecord);
        }

        /* renamed from: run, reason: merged with bridge method [inline-methods] */
        public RecordMetadata m6run() throws Exception {
            return this.recordMetadata;
        }
    }

    public KafkaService(String str, ServiceProperties serviceProperties, KafkaProducer<K, V> kafkaProducer) {
        super(str, serviceProperties.circuitBreaker(), serviceProperties.actionMetrics(), serviceProperties.latencyMetrics(), serviceProperties.semaphore());
        this.producer = kafkaProducer;
    }

    public PrecipiceFuture<RecordMetadata> sendRecordAction(ProducerRecord<K, V> producerRecord) {
        return submit(new RecordMetadataAction(producerRecord), -1L);
    }

    public <T> PrecipiceFuture<T> submit(ResilientAction<T> resilientAction, long j) {
        Eventual eventual = new Eventual();
        complete(resilientAction, eventual, j);
        return eventual;
    }

    public <T> void complete(ResilientAction<T> resilientAction, final PrecipicePromise<T> precipicePromise, long j) {
        acquirePermitOrRejectIfActionNotAllowed();
        final KafkaAction kafkaAction = (KafkaAction) resilientAction;
        this.producer.send(kafkaAction.getRecord(), new Callback() { // from class: net.uncontended.precipice.samples.kafka.KafkaService.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                try {
                    KafkaService.this.handleResult(precipicePromise, kafkaAction, recordMetadata, exc);
                } finally {
                    KafkaService.this.semaphore.releasePermit();
                }
            }
        });
    }

    public void shutdown() {
        this.isShutdown = true;
        this.producer.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void handleResult(PrecipicePromise<T> precipicePromise, KafkaAction<T, K, V> kafkaAction, RecordMetadata recordMetadata, Exception exc) {
        if (exc != null) {
            if (exc instanceof TimeoutException) {
                this.actionMetrics.incrementMetricCount(Metric.TIMEOUT);
                precipicePromise.completeWithTimeout();
                return;
            } else {
                this.actionMetrics.incrementMetricCount(Metric.ERROR);
                precipicePromise.completeExceptionally(exc);
                return;
            }
        }
        kafkaAction.setRecordMetadata(recordMetadata);
        try {
            Object run = kafkaAction.run();
            this.actionMetrics.incrementMetricCount(Metric.SUCCESS);
            precipicePromise.complete(run);
        } catch (Exception e) {
            this.actionMetrics.incrementMetricCount(Metric.ERROR);
            precipicePromise.completeExceptionally(e);
        } catch (ActionTimeoutException e2) {
            this.actionMetrics.incrementMetricCount(Metric.TIMEOUT);
            precipicePromise.completeWithTimeout();
        }
    }
}
