package com.couchbase.kafka;

import com.couchbase.client.core.message.dcp.DCPMessage;
import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.deps.com.lmax.disruptor.EventHandler;
import com.couchbase.kafka.filter.Filter;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;

/* loaded from: input_file:com/couchbase/kafka/KafkaWriter.class */
public class KafkaWriter implements EventHandler<DCPEvent> {
    private final Producer<String, DCPEvent> producer;
    private final String topic;
    private final Filter filter;

    public KafkaWriter(CouchbaseKafkaEnvironment couchbaseKafkaEnvironment, Producer<String, DCPEvent> producer, Filter filter) {
        this(couchbaseKafkaEnvironment.kafkaTopic(), couchbaseKafkaEnvironment, producer, filter);
    }

    public KafkaWriter(String str, CouchbaseKafkaEnvironment couchbaseKafkaEnvironment, Producer<String, DCPEvent> producer, Filter filter) {
        this.topic = str;
        this.producer = producer;
        this.filter = filter;
    }

    public void onEvent(DCPEvent dCPEvent, long j, boolean z) throws Exception {
        try {
            if (this.filter.pass(dCPEvent)) {
                this.producer.send(new KeyedMessage(this.topic, dCPEvent.key(), dCPEvent));
            }
        } finally {
            if (dCPEvent.message() instanceof MutationMessage) {
                dCPEvent.message().content().release();
            }
            if (dCPEvent.message() instanceof DCPMessage) {
                dCPEvent.connection().consumed(dCPEvent.message());
            }
        }
    }
}
