/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.kafka.producer;

import com.fasterxml.jackson.databind.node.NullNode;
import com.google.protobuf.ByteString;
import com.networknt.config.Config;
import com.networknt.exception.FrameworkException;
import com.networknt.kafka.common.KafkaProducerConfig;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.EmbeddedFormat;
import com.networknt.kafka.entity.PartitionOffset;
import com.networknt.kafka.entity.ProduceRecord;
import com.networknt.kafka.entity.ProduceRequest;
import com.networknt.kafka.entity.ProduceResponse;
import com.networknt.kafka.producer.CompletableFutures;
import com.networknt.kafka.producer.NativeLightProducer;
import com.networknt.kafka.producer.NoSchemaRecordSerializer;
import com.networknt.kafka.producer.ProduceResult;
import com.networknt.kafka.producer.RegisteredSchema;
import com.networknt.kafka.producer.SchemaManager;
import com.networknt.kafka.producer.SchemaManagerImpl;
import com.networknt.kafka.producer.SchemaRecordSerializer;
import com.networknt.kafka.producer.SerializedKeyAndValue;
import com.networknt.status.Status;
import com.networknt.utility.ObjectUtils;
import com.networknt.utility.UuidUtil;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SidecarProducer
implements NativeLightProducer {
    private static final Logger logger = LoggerFactory.getLogger(SidecarProducer.class);
    public static final KafkaProducerConfig config = (KafkaProducerConfig)Config.getInstance().getJsonObjectConfig("kafka-producer", KafkaProducerConfig.class);
    public static Map<String, Optional<RegisteredSchema>> schemaCache = new ConcurrentHashMap<String, Optional<RegisteredSchema>>();
    private static final String FAILED_TO_GET_SCHEMA = "ERR12208";
    private SchemaManager schemaManager;
    private SchemaRecordSerializer schemaRecordSerializer;
    private NoSchemaRecordSerializer noSchemaRecordSerializer;
    public Producer<byte[], byte[]> producer;

    @Override
    public void open() {
        if (logger.isTraceEnabled()) {
            logger.trace("config properties: {}", (Object)config.getProperties());
        }
        this.producer = new KafkaProducer<byte[], byte[]>(config.getProperties());
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.putAll(config.getProperties());
        String url = (String)config.getProperties().get("schema.registry.url");
        Object cacheObj = config.getProperties().get("schema.registry.cache");
        int cache = 100;
        if (cacheObj != null && cacheObj instanceof String) {
            cache = Integer.valueOf((String)cacheObj);
        }
        CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(new RestService(Collections.singletonList(url)), cache, Arrays.asList(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), configs, null);
        this.noSchemaRecordSerializer = new NoSchemaRecordSerializer(new HashMap<String, Object>());
        this.schemaRecordSerializer = new SchemaRecordSerializer(schemaRegistryClient, configs, configs, configs);
        this.schemaManager = new SchemaManagerImpl(schemaRegistryClient, new TopicNameStrategy());
        this.registerModule();
    }

    @Override
    public Producer getProducer() {
        return this.producer;
    }

    @Override
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    public final CompletableFuture<ProduceResponse> produceWithSchema(String topicName, String serviceId, Optional<Integer> partition, ProduceRequest request, Headers headers, List<AuditRecord> auditRecords, boolean isReplay) {
        long startSchema = System.currentTimeMillis();
        Optional<Object> keySchema = Optional.empty();
        if (null != request.getKeySchemaId() && request.getKeySchemaId().isPresent()) {
            keySchema = schemaCache.get(topicName + "k" + String.valueOf(request.getKeySchemaId().get()));
        } else if (null != request.getKeySchemaVersion() && request.getKeySchemaVersion().isPresent()) {
            keySchema = null != request.getKeySchemaSubject() && request.getKeySchemaSubject().isPresent() ? schemaCache.get(request.getKeySchemaSubject().get() + String.valueOf(request.getKeySchemaVersion().get())) : schemaCache.get(topicName + "k" + String.valueOf(request.getKeySchemaVersion().get()));
        } else if (isReplay) {
            keySchema = schemaCache.get(topicName + "k");
        }
        if (keySchema == null) {
            keySchema = Optional.empty();
        }
        if (keySchema.isEmpty() && request.getKeyFormat().isPresent() && request.getKeyFormat().get().requiresSchema() && (keySchema = this.getSchema(topicName, request.getKeyFormat(), request.getKeySchemaSubject(), request.getKeySchemaId(), request.getKeySchemaVersion(), request.getKeySchema(), true)).isPresent()) {
            if (request.getKeySchemaId().isPresent()) {
                schemaCache.put(topicName + "k" + String.valueOf(request.getKeySchemaId().get()), keySchema);
            } else if (request.getKeySchemaVersion().isPresent()) {
                if (request.getKeySchemaSubject().isPresent()) {
                    schemaCache.put(request.getKeySchemaSubject().get() + String.valueOf(request.getKeySchemaVersion().get()), keySchema);
                } else {
                    schemaCache.put(topicName + "k" + String.valueOf(request.getKeySchemaVersion().get()), keySchema);
                }
            } else if (isReplay) {
                schemaCache.put(topicName + "k", keySchema);
            } else {
                logger.error("Could not put key schema into the cache. It means that neither keySchemaId nor keySchemaVersion is supplied and Kafka Schema Registry will be overloaded.");
            }
        }
        Optional<EmbeddedFormat> keyFormat = keySchema.map(schema -> Optional.of(schema.getFormat())).orElse(request.getKeyFormat());
        Optional<Object> valueSchema = Optional.empty();
        if (null != request.getValueSchemaId() && request.getValueSchemaId().isPresent()) {
            valueSchema = schemaCache.get(topicName + "v" + String.valueOf(request.getValueSchemaId().get()));
        } else if (null != request.getValueSchemaVersion() && request.getValueSchemaVersion().isPresent()) {
            valueSchema = null != request.getValueSchemaSubject() && request.getValueSchemaSubject().isPresent() ? schemaCache.get(request.getValueSchemaSubject().get() + String.valueOf(request.getValueSchemaVersion().get())) : schemaCache.get(topicName + "v" + String.valueOf(request.getValueSchemaVersion().get()));
        } else if (isReplay) {
            valueSchema = schemaCache.get(topicName + "v");
        }
        if (valueSchema == null) {
            valueSchema = Optional.empty();
        }
        if (valueSchema.isEmpty() && request.getValueFormat().isPresent() && request.getValueFormat().get().requiresSchema() && (valueSchema = this.getSchema(topicName, request.getValueFormat(), request.getValueSchemaSubject(), request.getValueSchemaId(), request.getValueSchemaVersion(), request.getValueSchema(), false)).isPresent()) {
            if (request.getValueSchemaId().isPresent()) {
                schemaCache.put(topicName + "v" + String.valueOf(request.getValueSchemaId().get()), valueSchema);
            } else if (request.getValueSchemaVersion().isPresent()) {
                if (request.getValueSchemaSubject().isPresent()) {
                    schemaCache.put(request.getValueSchemaSubject().get() + String.valueOf(request.getValueSchemaVersion().get()), valueSchema);
                } else {
                    schemaCache.put(topicName + "v" + String.valueOf(request.getValueSchemaVersion().get()), valueSchema);
                }
            } else if (isReplay) {
                schemaCache.put(topicName + "v", valueSchema);
            } else {
                logger.error("Could not put value schema into the cache. It means that neither valueSchemaId nor valueSchemaVersion is supplied and Kafka Schema Registry will be overloaded.");
            }
        }
        Optional<EmbeddedFormat> valueFormat = valueSchema.map(schema -> Optional.of(schema.getFormat())).orElse(request.getValueFormat());
        List<SerializedKeyAndValue> serialized = this.serialize(keyFormat, valueFormat, topicName, partition, keySchema, valueSchema, request.getRecords());
        if (logger.isDebugEnabled()) {
            logger.debug("Serializing key and value with schema registry takes " + (System.currentTimeMillis() - startSchema));
        }
        long startProduce = System.currentTimeMillis();
        List<CompletableFuture<ProduceResult>> resultFutures = this.doProduce(topicName, serviceId, serialized, headers, auditRecords);
        if (logger.isDebugEnabled()) {
            logger.debug("Producing the entire batch to Kafka takes " + (System.currentTimeMillis() - startProduce));
        }
        return SidecarProducer.produceResultsToResponse(keySchema, valueSchema, resultFutures);
    }

    public final CompletableFuture<ProduceResponse> produceWithSchema(String topicName, String serviceId, Optional<Integer> partition, ProduceRequest request, Headers headers, List<AuditRecord> auditRecords) {
        return this.produceWithSchema(topicName, serviceId, partition, request, headers, auditRecords, false);
    }

    private List<SerializedKeyAndValue> serialize(Optional<EmbeddedFormat> keyFormat, Optional<EmbeddedFormat> valueFormat, String topicName, Optional<Integer> partition, Optional<RegisteredSchema> keySchema, Optional<RegisteredSchema> valueSchema, List<ProduceRecord> records) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return records.stream().map(record -> {
            int atomicIntegerNew = atomicInteger.getAndIncrement();
            return new SerializedKeyAndValue(record.getPartition().map(Optional::of).orElse(partition), record.getTraceabilityId(), record.getCorrelationId(), keyFormat.isPresent() && ((EmbeddedFormat)((Object)((Object)keyFormat.get()))).requiresSchema() ? this.schemaRecordSerializer.serialize(atomicIntegerNew, (EmbeddedFormat)((Object)((Object)keyFormat.get())), topicName, keySchema, record.getKey().orElse(NullNode.getInstance()), true) : this.noSchemaRecordSerializer.serialize(atomicIntegerNew, keyFormat.orElse(EmbeddedFormat.valueOf(config.getKeyFormat().toUpperCase())), record.getKey().orElse(NullNode.getInstance())), valueFormat.isPresent() && ((EmbeddedFormat)((Object)((Object)valueFormat.get()))).requiresSchema() ? this.schemaRecordSerializer.serialize(atomicIntegerNew, (EmbeddedFormat)((Object)((Object)valueFormat.get())), topicName, valueSchema, record.getValue().orElse(NullNode.getInstance()), false) : this.noSchemaRecordSerializer.serialize(atomicIntegerNew, valueFormat.orElse(EmbeddedFormat.valueOf(config.getValueFormat().toUpperCase())), record.getValue().orElse(NullNode.getInstance())), record.getHeaders(), record.getTimestamp());
        }).collect(Collectors.toList());
    }

    private Optional<RegisteredSchema> getSchema(String topicName, Optional<EmbeddedFormat> format, Optional<String> subject, Optional<Integer> schemaId, Optional<Integer> schemaVersion, Optional<String> schema, boolean isKey) {
        try {
            return Optional.of(this.schemaManager.getSchema(topicName, format, subject, Optional.empty(), schemaId, schemaVersion, schema, isKey));
        }
        catch (IllegalStateException e) {
            logger.error("IllegalStateException:", e);
            Status status = new Status(FAILED_TO_GET_SCHEMA, new Object[0]);
            throw new FrameworkException(status, (Throwable)e);
        }
        catch (RuntimeException e) {
            return Optional.empty();
        }
    }

    private List<CompletableFuture<ProduceResult>> doProduce(String topicName, String serviceId, List<SerializedKeyAndValue> serialized, Headers headers, List<AuditRecord> auditRecords) {
        return serialized.stream().map(record -> this.produce(topicName, record.getPartitionId(), record.getTraceabilityId(), record.getCorrelationId().isPresent() ? record.getCorrelationId() : Optional.of(UuidUtil.uuidToBase64(UuidUtil.getUUID())), serviceId, headers, auditRecords, record.getKey(), record.getValue(), record.getHeaders(), !ObjectUtils.isEmpty(record.getTimestamp()) && record.getTimestamp().isPresent() && record.getTimestamp().get() > 0L ? Instant.ofEpochMilli(record.getTimestamp().get()) : Instant.now())).collect(Collectors.toList());
    }

    public CompletableFuture<ProduceResult> produce(String topicName, Optional<Integer> partitionId, Optional<String> traceabilityId, Optional<String> correlationId, String serviceId, Headers headers, List<AuditRecord> auditRecords, Optional<ByteString> key, Optional<ByteString> value, Optional<Map<String, String>> recordHeaders, Instant timestamp) {
        RecordHeaders newHeaders = new RecordHeaders();
        headers.forEach(header -> {
            if (!header.key().equalsIgnoreCase("X-Traceability-Id") && !header.key().equalsIgnoreCase("X-Correlation-Id")) {
                newHeaders.add((Header)header);
            }
        });
        if (traceabilityId.isPresent()) {
            newHeaders.add("X-Traceability-Id", traceabilityId.get().getBytes(StandardCharsets.UTF_8));
        }
        if (correlationId.isPresent()) {
            newHeaders.add("X-Correlation-Id", correlationId.get().getBytes(StandardCharsets.UTF_8));
        }
        if (recordHeaders.isPresent() && !recordHeaders.get().isEmpty()) {
            recordHeaders.get().entrySet().removeIf(entry -> ((String)entry.getKey()).equalsIgnoreCase("X-Traceability-Id") || ((String)entry.getKey()).equalsIgnoreCase("X-Correlation-Id"));
            recordHeaders.get().entrySet().forEach(eachEntry -> newHeaders.add((String)eachEntry.getKey(), ((String)eachEntry.getValue()).getBytes(StandardCharsets.UTF_8)));
        }
        if (traceabilityId.isPresent()) {
            logger.info("Associate traceability Id " + traceabilityId.get() + " with correlation Id " + correlationId.get());
        }
        CompletableFuture<ProduceResult> result2 = new CompletableFuture<ProduceResult>();
        this.producer.send(new ProducerRecord<byte[], byte[]>(topicName, partitionId.orElse(null), timestamp.toEpochMilli(), key.map(ByteString::toByteArray).orElse(null), value.map(ByteString::toByteArray).orElse(null), newHeaders), (metadata, exception) -> {
            if (exception != null) {
                if (config.isAuditEnabled()) {
                    List list = auditRecords;
                    synchronized (list) {
                        auditRecords.add(this.auditFromRecordMetadata(null, topicName, exception, serviceId, key, traceabilityId, correlationId, false));
                    }
                }
                result2.completeExceptionally(exception);
            } else {
                if (config.isAuditEnabled()) {
                    List list = auditRecords;
                    synchronized (list) {
                        auditRecords.add(this.auditFromRecordMetadata(metadata, topicName, null, serviceId, key, traceabilityId, correlationId, true));
                    }
                }
                result2.complete(ProduceResult.fromRecordMetadata(metadata));
            }
        });
        return result2;
    }

    private static CompletableFuture<ProduceResponse> produceResultsToResponse(Optional<RegisteredSchema> keySchema, Optional<RegisteredSchema> valueSchema, List<CompletableFuture<ProduceResult>> resultFutures) {
        CompletableFuture offsetsFuture = CompletableFutures.allAsList(resultFutures.stream().map(future -> future.thenApply(result2 -> new PartitionOffset(result2.getPartitionId(), result2.getOffset(), null, null))).map(future -> future.exceptionally(throwable -> new PartitionOffset(null, null, SidecarProducer.errorCodeFromProducerException(throwable.getCause()), throwable.getCause().getMessage()))).collect(Collectors.toList()));
        return offsetsFuture.thenApply(offsets -> new ProduceResponse((List<PartitionOffset>)offsets, keySchema.map(RegisteredSchema::getSchemaId).orElse(null), valueSchema.map(RegisteredSchema::getSchemaId).orElse(null)));
    }

    private static int errorCodeFromProducerException(Throwable e) {
        if (e instanceof AuthenticationException) {
            return 40101;
        }
        if (e instanceof AuthorizationException) {
            return 40301;
        }
        if (e instanceof RetriableException) {
            return 50003;
        }
        if (e instanceof KafkaException) {
            return 50002;
        }
        logger.error("Unexpected Producer Exception", e);
        throw new RuntimeException("Unexpected Producer Exception", e);
    }

    protected AuditRecord auditFromRecordMetadata(RecordMetadata rmd, String topicName, Exception e, String serviceId, Optional<ByteString> key, Optional<String> traceabilityId, Optional<String> correlationId, boolean produced) {
        AuditRecord auditRecord = new AuditRecord();
        auditRecord.setTopic(topicName);
        auditRecord.setId(UUID.randomUUID().toString());
        auditRecord.setServiceId(serviceId);
        auditRecord.setAuditType(AuditRecord.AuditType.PRODUCER);
        if (rmd != null) {
            auditRecord.setPartition(rmd.partition());
            auditRecord.setOffset(rmd.offset());
        } else {
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            e.printStackTrace(pw);
            auditRecord.setStacktrace(sw.toString());
        }
        if (correlationId.isPresent()) {
            auditRecord.setCorrelationId(correlationId.get());
        }
        if (traceabilityId.isPresent()) {
            auditRecord.setTraceabilityId(traceabilityId.get());
        }
        auditRecord.setAuditStatus(produced ? AuditRecord.AuditStatus.SUCCESS : AuditRecord.AuditStatus.FAILURE);
        auditRecord.setTimestamp(System.currentTimeMillis());
        if (key.isPresent()) {
            auditRecord.setKey(key.get().toString(StandardCharsets.UTF_8));
        }
        return auditRecord;
    }
}

