package org.apache.apex.malhar.sql.table;

import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.Map;
import java.util.Properties;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
import org.apache.apex.malhar.sql.operators.OperatorUtils;
import org.apache.apex.malhar.sql.planner.RelInfo;
import org.apache.apex.malhar.sql.table.Endpoint;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/sql/table/KafkaEndpoint.class */
public class KafkaEndpoint implements Endpoint {
    public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    public static final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    public static final String KAFKA_SERVERS = "servers";
    public static final String KAFKA_TOPICS = "topics";
    private MessageFormat messageFormat;
    private Map<String, Object> operands;

    public KafkaEndpoint() {
    }

    public KafkaEndpoint(String str, String str2, MessageFormat messageFormat) {
        this.messageFormat = messageFormat;
        this.operands = ImmutableMap.of(KAFKA_SERVERS, str, KAFKA_TOPICS, str2);
    }

    @Override // org.apache.apex.malhar.sql.table.Endpoint
    public Endpoint.EndpointType getTargetType() {
        return Endpoint.EndpointType.KAFKA;
    }

    @Override // org.apache.apex.malhar.sql.table.Endpoint
    public void setEndpointOperands(Map<String, Object> map) {
        this.operands = map;
    }

    @Override // org.apache.apex.malhar.sql.table.Endpoint
    public void setMessageFormat(MessageFormat messageFormat) {
        this.messageFormat = messageFormat;
    }

    @Override // org.apache.apex.malhar.sql.table.Endpoint
    public RelInfo populateInputDAG(DAG dag, JavaTypeFactory javaTypeFactory) {
        KafkaSinglePortInputOperator addOperator = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaInput"), KafkaSinglePortInputOperator.class);
        addOperator.setTopics((String) this.operands.get(KAFKA_TOPICS));
        addOperator.setInitialOffset("EARLIEST");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.operands.get(KAFKA_SERVERS));
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        addOperator.setConsumerProps(properties);
        addOperator.setClusters((String) this.operands.get(KAFKA_SERVERS));
        RelInfo populateInputDAG = this.messageFormat.populateInputDAG(dag, javaTypeFactory);
        dag.addStream(OperatorUtils.getUniqueStreamName("Kafka", "Parser"), addOperator.outputPort, populateInputDAG.getInputPorts().get(0));
        return new RelInfo("Input", Lists.newArrayList(), populateInputDAG.getOperator(), populateInputDAG.getOutPort(), this.messageFormat.getRowType(javaTypeFactory));
    }

    @Override // org.apache.apex.malhar.sql.table.Endpoint
    public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory javaTypeFactory) {
        RelInfo populateOutputDAG = this.messageFormat.populateOutputDAG(dag, javaTypeFactory);
        KafkaSinglePortOutputOperator addOperator = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaOutput"), KafkaSinglePortOutputOperator.class);
        addOperator.setTopic((String) this.operands.get(KAFKA_TOPICS));
        Properties properties = new Properties();
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", this.operands.get(KAFKA_SERVERS));
        addOperator.setProperties(properties);
        dag.addStream(OperatorUtils.getUniqueStreamName("Formatter", "Kafka"), populateOutputDAG.getOutPort(), addOperator.inputPort);
        return new RelInfo("Output", populateOutputDAG.getInputPorts(), populateOutputDAG.getOperator(), (Operator.OutputPort) null, this.messageFormat.getRowType(javaTypeFactory));
    }

    @Override // org.apache.apex.malhar.sql.table.Endpoint
    public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
        return this.messageFormat.getRowType(relDataTypeFactory);
    }
}
