package com.github.hackerwin7.mysql.tracker.kafka.driver.producer;

import com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf;
import com.github.hackerwin7.mysql.tracker.monitor.TrackerMonitor;
import com.github.hackerwin7.mysql.tracker.protocol.json.JSONConvert;
import com.github.hackerwin7.mysql.tracker.tracker.utils.TrackerConf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/hackerwin7/mysql/tracker/kafka/driver/producer/KafkaSender.class */
public class KafkaSender {
    private KafkaConf conf;
    private Producer<String, byte[]> producer;
    private Logger logger = LoggerFactory.getLogger(KafkaSender.class);
    private int retrys = 10;
    private int reconns = 5;

    public KafkaSender(KafkaConf kafkaConf) {
        this.conf = kafkaConf;
    }

    public void connect() {
        Properties properties = new Properties();
        KafkaConf kafkaConf = this.conf;
        properties.put("metadata.broker.list", KafkaConf.brokerList);
        KafkaConf kafkaConf2 = this.conf;
        properties.put("serializer.class", KafkaConf.serializer);
        KafkaConf kafkaConf3 = this.conf;
        properties.put("key.serializer.class", KafkaConf.keySerializer);
        KafkaConf kafkaConf4 = this.conf;
        properties.put("partitioner.class", KafkaConf.partitioner);
        KafkaConf kafkaConf5 = this.conf;
        properties.put("request.required.acks", KafkaConf.acks);
        this.producer = new Producer<>(new ProducerConfig(properties));
    }

    public void send(byte[] bArr) {
        KafkaConf kafkaConf = this.conf;
        blockSend(new KeyedMessage<>(KafkaConf.topic, null, bArr));
    }

    public void send(String str, byte[] bArr) {
        blockSend(new KeyedMessage<>(str, null, bArr));
    }

    public void send(List<byte[]> list) {
        ArrayList arrayList = new ArrayList();
        for (byte[] bArr : list) {
            KafkaConf kafkaConf = this.conf;
            arrayList.add(new KeyedMessage<>(KafkaConf.topic, null, bArr));
        }
        blockSend(arrayList);
    }

    public void send(String str, List<byte[]> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<byte[]> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new KeyedMessage<>(str, null, it.next()));
        }
        blockSend(arrayList);
    }

    public int sendKeyMsg(List<KeyedMessage<String, byte[]>> list) {
        return blockSend(list);
    }

    public int sendKeyMsg(List<KeyedMessage<String, byte[]>> list, KafkaSender kafkaSender, TrackerConf trackerConf) {
        return blockSend(list, kafkaSender, trackerConf);
    }

    public int sendKeyMsg(KeyedMessage<String, byte[]> keyedMessage) {
        return blockSend(keyedMessage);
    }

    public int sendKeyMsg(KeyedMessage<String, byte[]> keyedMessage, KafkaSender kafkaSender, TrackerConf trackerConf) {
        return blockSend(keyedMessage, kafkaSender, trackerConf);
    }

    public int blockSend(List<KeyedMessage<String, byte[]>> list) {
        boolean z = false;
        int i = 0;
        int i2 = 0;
        while (!z) {
            if (i >= this.retrys) {
                reconnect();
                i2++;
                if (i2 > this.reconns) {
                    return -1;
                }
                this.logger.warn("retry times out, reconnect the kafka server......");
                i = 0;
            }
            i++;
            try {
                this.producer.send(list);
                z = true;
            } catch (Exception e) {
                this.logger.warn("retrying sending... Exception:" + e.getMessage());
                delay(3);
            }
        }
        return 0;
    }

    public int blockSend(List<KeyedMessage<String, byte[]>> list, KafkaSender kafkaSender, TrackerConf trackerConf) {
        boolean z = false;
        int i = 0;
        int i2 = 0;
        while (!z) {
            if (i >= this.retrys) {
                reconnect();
                i2++;
                if (i2 > this.reconns) {
                    return -1;
                }
                this.logger.warn("retry times out, reconnect the kafka server......");
                i = 0;
            }
            i++;
            try {
                this.producer.send(list);
                z = true;
            } catch (Exception e) {
                try {
                    TrackerMonitor trackerMonitor = new TrackerMonitor();
                    trackerMonitor.exMsg = e.getMessage();
                    kafkaSender.sendKeyMsg(new KeyedMessage<>(trackerConf.phKaTopic, null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(20003, trackerConf.jobId)).toString().getBytes("UTF-8")));
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
                this.logger.warn("retrying sending... Exception:" + e.getMessage());
                delay(3);
            }
        }
        return 0;
    }

    public int blockSend(KeyedMessage<String, byte[]> keyedMessage) {
        boolean z = false;
        int i = 0;
        int i2 = 0;
        while (!z) {
            if (i >= this.retrys) {
                reconnect();
                i2++;
                if (i2 > this.reconns) {
                    return -1;
                }
                this.logger.warn("retry times out, reconnect the kafka server......");
                i = 0;
            }
            i++;
            try {
                this.producer.send(keyedMessage);
                z = true;
            } catch (Exception e) {
                this.logger.warn("retrying sending... Exception:" + e.getMessage());
                delay(3);
            }
        }
        return 0;
    }

    public int blockSend(KeyedMessage<String, byte[]> keyedMessage, KafkaSender kafkaSender, TrackerConf trackerConf) {
        boolean z = false;
        int i = 0;
        int i2 = 0;
        while (!z) {
            if (i >= this.retrys) {
                reconnect();
                i2++;
                if (i2 > this.reconns) {
                    return -1;
                }
                this.logger.warn("retry times out, reconnect the kafka server......");
                i = 0;
            }
            i++;
            try {
                this.producer.send(keyedMessage);
                z = true;
            } catch (Exception e) {
                try {
                    TrackerMonitor trackerMonitor = new TrackerMonitor();
                    trackerMonitor.exMsg = e.getMessage();
                    kafkaSender.sendKeyMsg(new KeyedMessage<>(trackerConf.phKaTopic, null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(20003, trackerConf.jobId)).toString().getBytes("UTF-8")));
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
                this.logger.warn("retrying sending... Exception:" + e.getMessage());
                delay(3);
            }
        }
        return 0;
    }

    private void delay(int i) {
        try {
            Thread.sleep(i * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    public void reconnect() {
        close();
        connect();
    }

    public boolean isConnected() {
        Properties properties = new Properties();
        KafkaConf kafkaConf = this.conf;
        properties.put("metadata.broker.list", KafkaConf.brokerList);
        KafkaConf kafkaConf2 = this.conf;
        properties.put("serializer.class", KafkaConf.serializer);
        KafkaConf kafkaConf3 = this.conf;
        properties.put("key.serializer.class", KafkaConf.keySerializer);
        KafkaConf kafkaConf4 = this.conf;
        properties.put("partitioner.class", KafkaConf.partitioner);
        KafkaConf kafkaConf5 = this.conf;
        properties.put("request.required.acks", KafkaConf.acks);
        KafkaConf kafkaConf6 = this.conf;
        properties.put("send.buffer.bytes", KafkaConf.sendBufferSize);
        try {
            Producer producer = new Producer(new ProducerConfig(properties));
            if (producer != null) {
                producer.close();
            }
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}
