package org.apache.samza.tools;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.function.Function;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.samza.tools.schemas.PageViewEvent;
import org.apache.samza.tools.schemas.ProfileChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/tools/GenerateKafkaEvents.class */
public class GenerateKafkaEvents {
    private static final String OPT_SHORT_TOPIC_NAME = "t";
    private static final String OPT_LONG_TOPIC_NAME = "topic";
    private static final String OPT_ARG_TOPIC_NAME = "TOPIC_NAME";
    private static final String OPT_DESC_TOPIC_NAME = "Name of the topic to write events to.";
    private static final String OPT_SHORT_BROKER = "b";
    private static final String OPT_LONG_BROKER = "broker";
    private static final String OPT_ARG_BROKER = "BROKER";
    private static final String OPT_DESC_BROKER = "Kafka broker endpoint.";
    private static final String DEFAULT_BROKER = "localhost:9092";
    private static final String OPT_SHORT_NUM_EVENTS = "n";
    private static final String OPT_LONG_NUM_EVENTS = "numEvents";
    private static final String OPT_ARG_NUM_EVENTS = "NUM_EVENTS";
    private static final String OPT_DESC_NUM_EVENTS = "Number of events to be produced.";
    private static final String OPT_SHORT_EVENT_TYPE = "e";
    private static final String OPT_LONG_EVENT_TYPE = "eventtype";
    private static final String OPT_ARG_EVENT_TYPE = "EVENT_TYPE";
    private static final String OPT_DESC_EVENT_TYPE = "Type of the event (PageView|ProfileChange) Default(ProfileChange).";
    private static RandomValueGenerator randValueGenerator;
    private static final String PAGEVIEW_EVENTTYPE = "pageview";
    private static final Logger LOG = LoggerFactory.getLogger(GenerateKafkaEvents.class);
    private static String[] companies = {"Microsoft", "LinkedIn", "Google", "Facebook", "Amazon", "Apple", "Twitter", "Snap"};

    public static void main(String[] strArr) throws UnsupportedEncodingException, InterruptedException {
        randValueGenerator = new RandomValueGenerator(System.currentTimeMillis());
        Options options = new Options();
        options.addOption(CommandLineHelper.createOption(OPT_SHORT_TOPIC_NAME, OPT_LONG_TOPIC_NAME, OPT_ARG_TOPIC_NAME, true, OPT_DESC_TOPIC_NAME));
        options.addOption(CommandLineHelper.createOption(OPT_SHORT_BROKER, OPT_LONG_BROKER, OPT_ARG_BROKER, false, OPT_DESC_BROKER));
        options.addOption(CommandLineHelper.createOption(OPT_SHORT_NUM_EVENTS, OPT_LONG_NUM_EVENTS, OPT_ARG_NUM_EVENTS, false, OPT_DESC_NUM_EVENTS));
        options.addOption(CommandLineHelper.createOption(OPT_SHORT_EVENT_TYPE, OPT_LONG_EVENT_TYPE, OPT_ARG_EVENT_TYPE, false, OPT_DESC_EVENT_TYPE));
        try {
            CommandLine parse = new BasicParser().parse(options, strArr);
            generateEvents(parse.getOptionValue(OPT_SHORT_BROKER, DEFAULT_BROKER), parse.getOptionValue(OPT_SHORT_TOPIC_NAME), parse.getOptionValue(OPT_SHORT_EVENT_TYPE), Long.parseLong(parse.getOptionValue(OPT_SHORT_NUM_EVENTS, String.valueOf(Long.MAX_VALUE))));
        } catch (Exception e) {
            new HelpFormatter().printHelp(String.format("Error: %s%ngenerate-events.sh", e.getMessage()), options);
        }
    }

    private static void generateEvents(String str, String str2, String str3, long j) throws UnsupportedEncodingException, InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("retries", 100);
        properties.put("batch.size", 16384);
        properties.put("key.serializer", ByteArraySerializer.class.getCanonicalName());
        properties.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
        Function function = str3.toLowerCase().contains(PAGEVIEW_EVENTTYPE) ? (v0) -> {
            return generatePageViewEvent(v0);
        } : GenerateKafkaEvents::generateProfileChangeEvent;
        boolean z = j == Long.MAX_VALUE;
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        for (int i = 0; i < j; i++) {
            try {
                try {
                    Pair pair = (Pair) function.apply(Integer.valueOf(i));
                    kafkaProducer.send(new ProducerRecord(str2, ((String) pair.getLeft()).getBytes("UTF-8"), pair.getRight()), (recordMetadata, exc) -> {
                        if (exc != null) {
                            throw new RuntimeException("Failed to send message.", exc);
                        }
                        LOG.info("send completed for event {} at offset {}", 0, Long.valueOf(recordMetadata.offset()));
                    });
                    System.out.println(String.format("Published event %d to topic %s", Integer.valueOf(i), str2));
                    if (z) {
                        Thread.sleep(1000L);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        kafkaProducer.flush();
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private static Pair<String, byte[]> generateProfileChangeEvent(Integer num) {
        ProfileChangeEvent profileChangeEvent = new ProfileChangeEvent();
        String nextString = randValueGenerator.getNextString(10, 20);
        profileChangeEvent.Name = nextString;
        profileChangeEvent.NewCompany = companies[randValueGenerator.getNextInt(0, companies.length - 1)];
        profileChangeEvent.OldCompany = companies[randValueGenerator.getNextInt(0, companies.length - 1)];
        profileChangeEvent.ProfileChangeTimestamp = Long.valueOf(System.currentTimeMillis());
        try {
            return new ImmutablePair(nextString, encodeAvroSpecificRecord(ProfileChangeEvent.class, profileChangeEvent));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> byte[] encodeAvroSpecificRecord(Class<T> cls, T t) throws IOException {
        SpecificDatumWriter specificDatumWriter = new SpecificDatumWriter(cls);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
        specificDatumWriter.write(t, binaryEncoder);
        binaryEncoder.flush();
        return byteArrayOutputStream.toByteArray();
    }

    private static Pair<String, byte[]> generatePageViewEvent(int i) {
        PageViewEvent pageViewEvent = new PageViewEvent();
        String nextString = randValueGenerator.getNextString(10, 20);
        pageViewEvent.id = Integer.valueOf(randValueGenerator.getNextInt());
        pageViewEvent.Name = nextString;
        pageViewEvent.ViewerName = randValueGenerator.getNextString(10, 20);
        pageViewEvent.ProfileViewTimestamp = Long.valueOf(System.currentTimeMillis());
        try {
            return new ImmutablePair(nextString, encodeAvroSpecificRecord(PageViewEvent.class, pageViewEvent));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
