/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.kafka.common;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkKafkaProducer<K, V>
implements Producer<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(FlinkKafkaProducer.class);
    private final KafkaProducer<K, V> kafkaProducer;
    private final String transactionalId;

    public FlinkKafkaProducer(Map<String, Object> properties) {
        this.transactionalId = (String)properties.get("transactional.id");
        this.kafkaProducer = new KafkaProducer(properties);
    }

    @Override
    public Uuid clientInstanceId(Duration timeout) {
        return this.kafkaProducer.clientInstanceId(timeout);
    }

    @Override
    public void initTransactions() {
        this.kafkaProducer.initTransactions();
    }

    @Override
    public void beginTransaction() throws ProducerFencedException {
        this.kafkaProducer.beginTransaction();
    }

    @Override
    public void commitTransaction() throws ProducerFencedException {
        this.kafkaProducer.commitTransaction();
    }

    @Override
    public void abortTransaction() throws ProducerFencedException {
        this.kafkaProducer.abortTransaction();
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        this.kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
        this.kafkaProducer.sendOffsetsToTransaction(offsets, groupMetadata);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.kafkaProducer.send(record);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        return this.kafkaProducer.send(record, callback);
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this.kafkaProducer.partitionsFor(topic);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return this.kafkaProducer.metrics();
    }

    @Override
    public void close() {
        this.kafkaProducer.close();
    }

    @Override
    public void close(Duration timeout) {
        this.kafkaProducer.close(timeout);
    }

    @Override
    public void flush() {
        this.kafkaProducer.flush();
        if (this.transactionalId != null) {
            this.flushNewPartitions();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resumeTransaction(long producerId, short epoch) {
        Object transactionManager;
        logger.info("Attempting to resume transaction {} with producerId {} and epoch {}", this.transactionalId, producerId, epoch);
        Object object = transactionManager = FlinkKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        synchronized (object) {
            Object sequenceNumbers = FlinkKafkaProducer.getValue(transactionManager, "sequenceNumbers");
            FlinkKafkaProducer.invoke(transactionManager, "transitionTo", FlinkKafkaProducer.getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
            FlinkKafkaProducer.invoke(sequenceNumbers, "clear", new Object[0]);
            Object producerIdAndEpoch = FlinkKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
            FlinkKafkaProducer.setValue(producerIdAndEpoch, "producerId", producerId);
            FlinkKafkaProducer.setValue(producerIdAndEpoch, "epoch", epoch);
            FlinkKafkaProducer.invoke(transactionManager, "transitionTo", FlinkKafkaProducer.getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
            FlinkKafkaProducer.invoke(transactionManager, "transitionTo", FlinkKafkaProducer.getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
            FlinkKafkaProducer.setValue(transactionManager, "transactionStarted", true);
        }
    }

    public String getTransactionalId() {
        return this.transactionalId;
    }

    public long getProducerId() {
        Object transactionManager = FlinkKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Object producerIdAndEpoch = FlinkKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
        return (Long)FlinkKafkaProducer.getValue(producerIdAndEpoch, "producerId");
    }

    public short getEpoch() {
        Object transactionManager = FlinkKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Object producerIdAndEpoch = FlinkKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
        return (Short)FlinkKafkaProducer.getValue(producerIdAndEpoch, "epoch");
    }

    public int getTransactionCoordinatorId() {
        Object transactionManager = FlinkKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Node node = (Node)FlinkKafkaProducer.invoke(transactionManager, "coordinator", new Object[]{FindCoordinatorRequest.CoordinatorType.TRANSACTION});
        return node.id();
    }

    private void flushNewPartitions() {
        logger.info("Flushing new partitions");
        TransactionalRequestResult result2 = this.enqueueNewPartitions();
        Object sender = FlinkKafkaProducer.getValue(this.kafkaProducer, "sender");
        FlinkKafkaProducer.invoke(sender, "wakeup", new Object[0]);
        result2.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TransactionalRequestResult enqueueNewPartitions() {
        Object transactionManager;
        Object object = transactionManager = FlinkKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        synchronized (object) {
            Object txnRequestHandler = FlinkKafkaProducer.invoke(transactionManager, "addPartitionsToTransactionHandler", new Object[0]);
            FlinkKafkaProducer.invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
            TransactionalRequestResult result2 = (TransactionalRequestResult)FlinkKafkaProducer.getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
            return result2;
        }
    }

    private static Enum<?> getEnum(String enumFullName) {
        String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
        if (x.length == 2) {
            String enumClassName = x[0];
            String enumName = x[1];
            try {
                Class<?> cl = Class.forName(enumClassName);
                return Enum.valueOf(cl, enumName);
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException("Incompatible KafkaProducer version", e);
            }
        }
        return null;
    }

    private static Object invoke(Object object, String methodName, Object ... args2) {
        Class[] argTypes = new Class[args2.length];
        for (int i = 0; i < args2.length; ++i) {
            argTypes[i] = args2[i].getClass();
        }
        return FlinkKafkaProducer.invoke(object, methodName, argTypes, args2);
    }

    private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args2) {
        try {
            Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
            method.setAccessible(true);
            return method.invoke(object, args2);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static Object getValue(Object object, String fieldName) {
        return FlinkKafkaProducer.getValue(object, object.getClass(), fieldName);
    }

    private static Object getValue(Object object, Class<?> clazz, String fieldName) {
        try {
            Field field = clazz.getDeclaredField(fieldName);
            field.setAccessible(true);
            return field.get(object);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static void setValue(Object object, String fieldName, Object value) {
        try {
            Field field = object.getClass().getDeclaredField(fieldName);
            field.setAccessible(true);
            field.set(object, value);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }
}

