package org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDB;
import org.apache.flink.metrics.influxdb.shaded.org.influxdb.dto.BatchPoints;
import org.apache.flink.metrics.influxdb.shaded.org.influxdb.dto.Point;

/* loaded from: input_file:org/apache/flink/metrics/influxdb/shaded/org/influxdb/impl/BatchProcessor.class */
public final class BatchProcessor {
    private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
    protected final BlockingQueue<AbstractBatchEntry> queue;
    private final ScheduledExecutorService scheduler;
    private final BiConsumer<Iterable<Point>, Throwable> exceptionHandler;
    final InfluxDBImpl influxDB;
    final int actions;
    private final TimeUnit flushIntervalUnit;
    private final int flushInterval;
    private final InfluxDB.ConsistencyLevel consistencyLevel;
    private final int jitterInterval;
    private final BatchWriter batchWriter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/metrics/influxdb/shaded/org/influxdb/impl/BatchProcessor$AbstractBatchEntry.class */
    public static abstract class AbstractBatchEntry {
        private final Point point;

        public AbstractBatchEntry(Point point) {
            this.point = point;
        }

        public Point getPoint() {
            return this.point;
        }
    }

    /* loaded from: input_file:org/apache/flink/metrics/influxdb/shaded/org/influxdb/impl/BatchProcessor$Builder.class */
    public static final class Builder {
        private final InfluxDBImpl influxDB;
        private int actions;
        private TimeUnit flushIntervalUnit;
        private int flushInterval;
        private int jitterInterval;
        private InfluxDB.ConsistencyLevel consistencyLevel;
        private ThreadFactory threadFactory = Executors.defaultThreadFactory();
        private int bufferLimit = 0;
        private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (iterable, th) -> {
        };

        public Builder threadFactory(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            return this;
        }

        public Builder(InfluxDB influxDB) {
            this.influxDB = (InfluxDBImpl) influxDB;
        }

        public Builder actions(int i) {
            this.actions = i;
            return this;
        }

        public Builder interval(int i, TimeUnit timeUnit) {
            this.flushInterval = i;
            this.flushIntervalUnit = timeUnit;
            return this;
        }

        public Builder interval(int i, int i2, TimeUnit timeUnit) {
            this.flushInterval = i;
            this.jitterInterval = i2;
            this.flushIntervalUnit = timeUnit;
            return this;
        }

        public Builder bufferLimit(int i) {
            this.bufferLimit = i;
            return this;
        }

        public Builder exceptionHandler(BiConsumer<Iterable<Point>, Throwable> biConsumer) {
            this.exceptionHandler = biConsumer;
            return this;
        }

        public Builder consistencyLevel(InfluxDB.ConsistencyLevel consistencyLevel) {
            this.consistencyLevel = consistencyLevel;
            return this;
        }

        public BatchProcessor build() {
            Objects.requireNonNull(this.influxDB, "influxDB");
            Preconditions.checkPositiveNumber(Integer.valueOf(this.actions), "actions");
            Preconditions.checkPositiveNumber(Integer.valueOf(this.flushInterval), "flushInterval");
            Preconditions.checkNotNegativeNumber(Integer.valueOf(this.jitterInterval), "jitterInterval");
            Preconditions.checkNotNegativeNumber(Integer.valueOf(this.bufferLimit), "bufferLimit");
            Objects.requireNonNull(this.flushIntervalUnit, "flushIntervalUnit");
            Objects.requireNonNull(this.threadFactory, "threadFactory");
            Objects.requireNonNull(this.exceptionHandler, "exceptionHandler");
            return new BatchProcessor(this.influxDB, this.bufferLimit > this.actions ? new RetryCapableBatchWriter(this.influxDB, this.exceptionHandler, this.bufferLimit, this.actions) : new OneShotBatchWriter(this.influxDB), this.threadFactory, this.actions, this.flushIntervalUnit, this.flushInterval, this.jitterInterval, this.exceptionHandler, this.consistencyLevel);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/metrics/influxdb/shaded/org/influxdb/impl/BatchProcessor$HttpBatchEntry.class */
    public static class HttpBatchEntry extends AbstractBatchEntry {
        private final String db;
        private final String rp;

        public HttpBatchEntry(Point point, String str, String str2) {
            super(point);
            this.db = str;
            this.rp = str2;
        }

        public String getDb() {
            return this.db;
        }

        public String getRp() {
            return this.rp;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/metrics/influxdb/shaded/org/influxdb/impl/BatchProcessor$UdpBatchEntry.class */
    public static class UdpBatchEntry extends AbstractBatchEntry {
        private final int udpPort;

        public UdpBatchEntry(Point point, int i) {
            super(point);
            this.udpPort = i;
        }

        public int getUdpPort() {
            return this.udpPort;
        }
    }

    public static Builder builder(InfluxDB influxDB) {
        return new Builder(influxDB);
    }

    BatchProcessor(InfluxDBImpl influxDBImpl, BatchWriter batchWriter, ThreadFactory threadFactory, int i, TimeUnit timeUnit, int i2, int i3, BiConsumer<Iterable<Point>, Throwable> biConsumer, InfluxDB.ConsistencyLevel consistencyLevel) {
        this.influxDB = influxDBImpl;
        this.batchWriter = batchWriter;
        this.actions = i;
        this.flushIntervalUnit = timeUnit;
        this.flushInterval = i2;
        this.jitterInterval = i3;
        this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
        this.exceptionHandler = biConsumer;
        this.consistencyLevel = consistencyLevel;
        if (i <= 1 || i >= Integer.MAX_VALUE) {
            this.queue = new LinkedBlockingQueue();
        } else {
            this.queue = new LinkedBlockingQueue(i);
        }
        this.scheduler.schedule(new Runnable() { // from class: org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl.BatchProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                BatchProcessor.this.write();
                BatchProcessor.this.scheduler.schedule(this, BatchProcessor.this.flushInterval + ((int) (Math.random() * BatchProcessor.this.jitterInterval)), BatchProcessor.this.flushIntervalUnit);
            }
        }, this.flushInterval + ((int) (Math.random() * this.jitterInterval)), this.flushIntervalUnit);
    }

    void write() {
        try {
            if (this.queue.isEmpty()) {
                this.batchWriter.write(Collections.emptyList());
                return;
            }
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            ArrayList<AbstractBatchEntry> arrayList = new ArrayList(this.queue.size());
            this.queue.drainTo(arrayList);
            ArrayList arrayList2 = new ArrayList(arrayList.size());
            for (AbstractBatchEntry abstractBatchEntry : arrayList) {
                Point point = abstractBatchEntry.getPoint();
                arrayList2.add(point);
                if (abstractBatchEntry instanceof HttpBatchEntry) {
                    HttpBatchEntry httpBatchEntry = (HttpBatchEntry) HttpBatchEntry.class.cast(abstractBatchEntry);
                    String db = httpBatchEntry.getDb();
                    String rp = httpBatchEntry.getRp();
                    String str = db + "_" + rp;
                    if (!hashMap.containsKey(str)) {
                        hashMap.put(str, BatchPoints.database(db).retentionPolicy(rp).consistency(getConsistencyLevel()).build());
                    }
                    ((BatchPoints) hashMap.get(str)).point(point);
                } else if (abstractBatchEntry instanceof UdpBatchEntry) {
                    int udpPort = ((UdpBatchEntry) UdpBatchEntry.class.cast(abstractBatchEntry)).getUdpPort();
                    if (!hashMap2.containsKey(Integer.valueOf(udpPort))) {
                        hashMap2.put(Integer.valueOf(udpPort), new ArrayList());
                    }
                    ((List) hashMap2.get(Integer.valueOf(udpPort))).add(point.lineProtocol());
                }
            }
            this.batchWriter.write(hashMap.values());
            for (Map.Entry entry : hashMap2.entrySet()) {
                Iterator it = ((List) entry.getValue()).iterator();
                while (it.hasNext()) {
                    this.influxDB.write(((Integer) entry.getKey()).intValue(), (String) it.next());
                }
            }
        } catch (Throwable th) {
            this.exceptionHandler.accept(null, th);
            LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void put(AbstractBatchEntry abstractBatchEntry) {
        try {
            this.queue.put(abstractBatchEntry);
            if (this.queue.size() >= this.actions) {
                this.scheduler.submit(new Runnable() { // from class: org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl.BatchProcessor.2
                    @Override // java.lang.Runnable
                    public void run() {
                        BatchProcessor.this.write();
                    }
                });
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flushAndShutdown() {
        write();
        this.scheduler.shutdown();
        this.batchWriter.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush() {
        write();
    }

    public InfluxDB.ConsistencyLevel getConsistencyLevel() {
        return this.consistencyLevel;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchWriter getBatchWriter() {
        return this.batchWriter;
    }
}
