package com.datatorrent.contrib.kafka;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.lib.util.PojoUtils;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.lang3.ClassUtils;

/* loaded from: input_file:com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.class */
public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object, Object> {

    @AutoMetric
    private long outputMessagesPerSec;

    @AutoMetric
    private long outputBytesPerSec;
    private long messageCount;
    private long byteCount;
    private String brokerList;
    private double windowTimeSec;

    @Min(2)
    protected int batchSize;
    protected transient PojoUtils.Getter keyMethod;
    protected transient Class<?> pojoClass;
    protected final String BROKER_KEY = KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST;
    protected final String BATCH_NUM_KEY = "batch.num.messages";
    protected final String PRODUCER_KEY = "producer.type";
    protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
    protected final String ASYNC_PRODUCER_TYPE = "async";
    private String keyField = "";
    protected boolean isBatchProcessing = true;
    public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() { // from class: com.datatorrent.contrib.kafka.POJOKafkaOutputOperator.1
        public void setup(Context.PortContext portContext) {
            if (portContext.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
                POJOKafkaOutputOperator.this.pojoClass = (Class) portContext.getAttributes().get(Context.PortContext.TUPLE_CLASS);
            }
        }

        public void process(Object obj) {
            POJOKafkaOutputOperator.this.processTuple(obj);
        }
    };

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator
    public ProducerConfig createKafkaProducerConfig() {
        if (this.brokerList != null) {
            getConfigProperties().setProperty(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST, this.brokerList);
        }
        if (this.isBatchProcessing) {
            if (this.batchSize != 0) {
                getConfigProperties().setProperty("batch.num.messages", String.valueOf(this.batchSize));
            }
            getConfigProperties().setProperty("producer.type", "async");
        }
        return super.createKafkaProducerConfig();
    }

    @Override // com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator
    public void setup(Context.OperatorContext operatorContext) {
        if (this.isBatchProcessing) {
            getConfigProperties().setProperty("queue.buffering.max.ms", String.valueOf(operatorContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)));
        }
        super.setup(operatorContext);
        this.windowTimeSec = ((((Integer) operatorContext.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)).intValue() * ((Integer) operatorContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)).intValue()) * 1.0d) / 1000.0d;
        if (this.pojoClass == null || this.keyField == "") {
            return;
        }
        try {
            this.keyMethod = generateGetterForKeyField();
        } catch (NoSuchFieldException e) {
            throw new RuntimeException("Field " + this.keyField + " is invalid: " + e);
        }
    }

    @Override // com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator
    public void beginWindow(long j) {
        super.beginWindow(j);
        this.outputMessagesPerSec = 0L;
        this.outputBytesPerSec = 0L;
        this.messageCount = 0L;
        this.byteCount = 0L;
    }

    protected void processTuple(Object obj) {
        if (this.keyMethod == null && this.keyField != "") {
            this.pojoClass = obj.getClass();
            try {
                this.keyMethod = generateGetterForKeyField();
            } catch (NoSuchFieldException e) {
                throw new RuntimeException("Field " + this.keyField + " is invalid: " + e);
            }
        }
        getProducer().send(this.keyMethod != null ? new KeyedMessage(getTopic(), this.keyMethod.get(obj), obj) : new KeyedMessage(getTopic(), obj, obj));
        this.messageCount++;
        if (obj instanceof byte[]) {
            this.byteCount += ((byte[]) obj).length;
        }
    }

    @Override // com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator
    public void endWindow() {
        super.endWindow();
        this.outputBytesPerSec = (long) (this.byteCount / this.windowTimeSec);
        this.outputMessagesPerSec = (long) (this.messageCount / this.windowTimeSec);
    }

    private PojoUtils.Getter generateGetterForKeyField() throws NoSuchFieldException, SecurityException {
        return PojoUtils.createGetter(this.pojoClass, this.keyField, ClassUtils.primitiveToWrapper(this.pojoClass.getDeclaredField(this.keyField).getType()));
    }

    public String getBrokerList() {
        return this.brokerList;
    }

    public void setBrokerList(@NotNull String str) {
        this.brokerList = str;
    }

    public boolean isBatchProcessing() {
        return this.isBatchProcessing;
    }

    public void setBatchProcessing(boolean z) {
        this.isBatchProcessing = z;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public String getKeyField() {
        return this.keyField;
    }

    public void setKeyField(String str) {
        this.keyField = str;
    }
}
