package com.github.imrafaelmerino.kafkacli;

import fun.gen.Gen;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.random.RandomGenerator;
import jio.IO;
import jio.cli.Command;
import jio.cli.ConsolePrinter;
import jio.cli.ConsolePrograms;
import jio.cli.State;
import jsonvalues.JsObj;
import jsonvalues.spec.JsonToAvro;
import org.apache.avro.Schema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/github/imrafaelmerino/kafkacli/ProducerPublishCommand.class */
class ProducerPublishCommand extends Command {
    private static final RandomGenerator keySeed = new Random();
    private static final RandomGenerator valueSeed = new Random();
    private static final String COMMAND_NAME = "producer-publish";
    private static final String USAGE = "Usage: publish [channel-name]\n\nDescription:\nThe `producer-publish` command sends a generated key-value pair or value to a specified Kafka topic using the appropriate Kafka producer.\n\nParameters:\n- channel-name (optional): The name of the channel to publish to. If not provided, the user will be prompted to select from a list of available channels with an `up` status.\n\nSteps:\n1. Without a channel name:\n   - The command will list all available channels with an `up` status.\n   - The user will be prompted to type the name of one of the listed channels.\n   - If the input is invalid, the user will have three attempts to provide a correct name.\n\n2. With a channel name:\n   - The command will directly attempt to publish to the specified channel.\n\nOutput:\n- Success: A message indicating that the record was successfully sent, along with the offset and partition details.\n- Failure: Appropriate error message if the producer is not started or if the channel name is invalid.\n\nExample:\n1. Interactive mode (prompt user for channel name):\n   $ producer-publish\n     Name                 Producer             Status               Topic\n     --------------------------------------------------------------------------------\n     messages             producer2            up                   messageSent\n     flights              producer1            down                 flightUpdated\n     Type the channel name (choose one of the above with an `up` Status):\n\n2. Direct mode (provide channel name):\n   $ producer-publish channel1\n\nNote:\nEnsure that the channel configurations and the corresponding Kafka producers are correctly set and started before attempting to publish.\n";
    private final KafkaProducers producers;
    private final AvroSchemas avroSchemas;
    private final Map<String, Gen<?>> generators;

    public ProducerPublishCommand(Map<String, Gen<?>> map, KafkaProducers kafkaProducers, AvroSchemas avroSchemas) {
        super(COMMAND_NAME, USAGE, strArr -> {
            return strArr[0].equals(COMMAND_NAME);
        });
        this.generators = map;
        this.producers = kafkaProducers;
        this.avroSchemas = avroSchemas;
    }

    public Function<String[], IO<String>> apply(JsObj jsObj, State state) {
        return strArr -> {
            return strArr.length == 1 ? ConsolePrograms.ASK_FOR_INPUT(Prompts.ASK_FOR_CHANNEL.apply(jsObj, this.producers)).then(str -> {
                return sendGenerated(jsObj, str);
            }) : sendGenerated(jsObj, strArr[1]);
        };
    }

    private IO<String> sendGenerated(JsObj jsObj, String str) {
        JsObj obj = jsObj.getObj(ConfigurationFields.CHANNELS).getObj(str);
        String str2 = obj.getStr(ConfigurationFields.PRODUCER);
        String str3 = obj.getStr(ConfigurationFields.TOPIC);
        KafkaProducer<Object, Object> apply = this.producers.apply(str2);
        if (apply == null) {
            return IO.fail(new IllegalArgumentException("Producer `%s` not started. Use the command `start-producer %s`".formatted(str2, str2)));
        }
        String str4 = obj.getStr(ConfigurationFields.KEY_GEN);
        Object obj2 = ((Supplier) this.generators.get(obj.getStr(ConfigurationFields.VALUE_GEN)).apply(valueSeed)).get();
        return str4 == null ? sendValue(obj2, str, apply, str3) : sendKeyAndValue(((Supplier) this.generators.get(str4).apply(keySeed)).get(), obj2, str, apply, str3);
    }

    IO<String> sendKeyAndValue(Object obj, Object obj2, String str, KafkaProducer<Object, Object> kafkaProducer, String str2) {
        Schema schema = this.avroSchemas.keySchemasPerChannel.get(str);
        Schema schema2 = this.avroSchemas.valueSchemasPerChannel.get(str);
        return (schema2 == null || schema == null) ? schema2 != null ? sendRecordTask(kafkaProducer, new ProducerRecord<>(str2, obj, JsonToAvro.convert((JsObj) obj2, schema2))) : schema != null ? sendRecordTask(kafkaProducer, new ProducerRecord<>(str2, JsonToAvro.convert((JsObj) obj, schema), obj2)) : sendRecordTask(kafkaProducer, new ProducerRecord<>(str2, obj, obj2)) : sendRecordTask(kafkaProducer, new ProducerRecord<>(str2, JsonToAvro.convert((JsObj) obj, schema), JsonToAvro.convert((JsObj) obj2, schema2)));
    }

    private IO<String> sendRecordTask(KafkaProducer<Object, Object> kafkaProducer, ProducerRecord<Object, Object> producerRecord) {
        return IO.lazy(() -> {
            return ConsolePrinter.printlnResult(Fun.getMessageSent(producerRecord));
        }).then(str -> {
            return IO.effect(() -> {
                return kafkaProducer.send(producerRecord);
            }).map(recordMetadata -> {
                return new KafkaResponse(recordMetadata.timestamp(), recordMetadata.offset(), recordMetadata.partition()).getResponseReceivedMessage(producerRecord.topic());
            });
        });
    }

    IO<String> sendValue(Object obj, String str, KafkaProducer<Object, Object> kafkaProducer, String str2) {
        Schema schema = this.avroSchemas.valueSchemasPerChannel.get(str);
        return schema != null ? sendRecordTask(kafkaProducer, new ProducerRecord<>(str2, JsonToAvro.convert((JsObj) obj, schema))) : sendRecordTask(kafkaProducer, new ProducerRecord<>(str2, obj));
    }
}
