package org.apache.samoa.tasks;

import com.github.javacliparser.ClassOption;
import com.github.javacliparser.Configurable;
import com.github.javacliparser.IntOption;
import com.github.javacliparser.StringOption;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import org.apache.samoa.streams.kafka.KafkaDeserializer;
import org.apache.samoa.streams.kafka.KafkaDestinationProcessor;
import org.apache.samoa.streams.kafka.KafkaEntranceProcessor;
import org.apache.samoa.streams.kafka.KafkaSerializer;
import org.apache.samoa.topology.ComponentFactory;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.topology.Topology;
import org.apache.samoa.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samoa/tasks/KafkaTask.class */
public class KafkaTask implements Task, Configurable {
    private static final long serialVersionUID = 3984474041982397855L;
    private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
    Properties producerProps;
    Properties consumerProps;
    int timeout;
    private KafkaDeserializer deserializer;
    private KafkaSerializer serializer;
    private String inTopic;
    private String outTopic;
    private TopologyBuilder builder;
    private Topology kafkaTopology;
    public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', "Number of destination Processors", 1, 1, Integer.MAX_VALUE);
    public IntOption timeoutOption = new IntOption("timeout", 't', "Kafka consumer timeout", 1, 1, Integer.MAX_VALUE);
    public StringOption inputBrokerOption = new StringOption("inputBroker", 'r', "Input brokers addresses", "inputTopic");
    public StringOption outputBrokerOption = new StringOption("outputBroker", 's', "Output brokers name", "inputTopic");
    public StringOption inputTopicOption = new StringOption("inputTopic", 'i', "Input topic name", "inputTopic");
    public StringOption outputTopicOption = new StringOption("outputTopic", 'o', "Output topic name", "outputTopic");
    public ClassOption serializerOption = new ClassOption("serializer", 'w', "Serializer class name", KafkaSerializer.class, KafkaSerializer.class.getName());
    public ClassOption deserializerOption = new ClassOption("deserializer", 'd', "Deserializer class name", KafkaDeserializer.class, KafkaDeserializer.class.getName());
    public StringOption taskNameOption = new StringOption("taskName", 'n', "Identifier of the task", "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));

    public KafkaTask(Properties properties, Properties properties2, String str, String str2, int i, KafkaSerializer kafkaSerializer, KafkaDeserializer kafkaDeserializer) {
        this.producerProps = properties;
        this.consumerProps = properties2;
        this.deserializer = kafkaDeserializer;
        this.serializer = kafkaSerializer;
        this.inTopic = str;
        this.outTopic = str2;
        this.timeout = i;
    }

    public KafkaTask() {
    }

    @Override // org.apache.samoa.tasks.Task
    public void init() {
        this.producerProps = new Properties();
        this.producerProps.setProperty("bootstrap.servers", this.outputBrokerOption.getValue());
        this.consumerProps = new Properties();
        this.consumerProps.setProperty("bootstrap.servers", this.inputBrokerOption.getValue());
        this.serializer = (KafkaSerializer) this.serializerOption.getValue();
        this.deserializer = (KafkaDeserializer) this.deserializerOption.getValue();
        this.inTopic = this.inputTopicOption.getValue();
        this.outTopic = this.outputTopicOption.getValue();
        this.timeout = this.timeoutOption.getValue();
        logger.info("Invoking init");
        if (this.builder == null) {
            this.builder = new TopologyBuilder();
            logger.info("Successfully instantiating TopologyBuilder");
            this.builder.initTopology(this.taskNameOption.getValue());
            logger.info("Successfully initializing SAMOA topology with name {}", this.taskNameOption.getValue());
        }
        KafkaEntranceProcessor kafkaEntranceProcessor = new KafkaEntranceProcessor(this.consumerProps, this.inTopic, this.timeout, this.deserializer);
        this.builder.addEntranceProcessor(kafkaEntranceProcessor);
        Stream createStream = this.builder.createStream(kafkaEntranceProcessor);
        KafkaDestinationProcessor kafkaDestinationProcessor = new KafkaDestinationProcessor(this.producerProps, this.outTopic, this.serializer);
        this.builder.addProcessor(kafkaDestinationProcessor, this.kafkaParallelismOption.getValue());
        this.builder.connectInputShuffleStream(createStream, kafkaDestinationProcessor);
        this.kafkaTopology = this.builder.build();
        logger.info("Successfully built the topology");
    }

    @Override // org.apache.samoa.tasks.Task
    public Topology getTopology() {
        return this.kafkaTopology;
    }

    @Override // org.apache.samoa.tasks.Task
    public void setFactory(ComponentFactory componentFactory) {
        logger.info("Invoking setFactory: " + componentFactory.toString());
        this.builder = new TopologyBuilder(componentFactory);
        logger.info("Successfully instantiating TopologyBuilder");
        this.builder.initTopology(this.taskNameOption.getValue());
        logger.info("Successfully initializing SAMOA topology with name {}", this.taskNameOption.getValue());
    }
}
