package net.quasardb.kinesis;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tag;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import net.quasardb.qdb.Session;
import net.quasardb.qdb.SessionFactory;
import net.quasardb.qdb.ts.Writer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.kinesis.KinesisClient;

@CommandLine.Command(name = "start", mixinStandardHelpOptions = true, description = {"Start kinesis connector"})
/* loaded from: input_file:net/quasardb/kinesis/Connector.class */
class Connector implements Callable<Integer> {
    private static final Logger logger;

    @CommandLine.Option(names = {"--relay-queue-size"}, description = {"Size of the internal queue for the relays"})
    private int relayQueueSize;

    @CommandLine.Option(names = {"--relay-reset-interval"}, description = {"Reset inverval (in ms) to periodically clean up relay memory"})
    private long relayResetIntervalMs;

    @CommandLine.Option(names = {"--cluster-public-key"}, description = {"Filename to cluster public key"})
    private String qdbClusterPublicKeyFile;

    @CommandLine.Option(names = {"--user-security-file"}, description = {"Filename to user private key file"})
    private String qdbUserSecurityFile;

    @CommandLine.Option(names = {"--role-arn"}, description = {"AWS Profile / STS client role arn"})
    private String roleArn;

    @CommandLine.Option(names = {"--external-id"}, description = {"AWS Profile / STS ExternalId"})
    private String externalId;

    @CommandLine.Option(names = {"--stream-name"}, description = {"AWS Kinesis stream name"})
    private String streamName;

    @CommandLine.Option(names = {"--plugin-path"}, description = {"Path to a directory of plugins with additional jars to load"})
    private String pluginPath;

    @CommandLine.Option(names = {"--parser"}, description = {"Classname of the parser to use"})
    private String parserClassName;

    @CommandLine.Option(names = {"--with-parents"}, description = {"Include parent shards into subscribe list"})
    private boolean includeParentShards;

    @CommandLine.Option(names = {"--push-mode"}, description = {"QuasarDB push mode (ASYNC or FAST)"})
    private Writer.PushMode pushMode;
    static final /* synthetic */ boolean $assertionsDisabled;

    @CommandLine.Option(names = {"--threads"}, description = {"Number of Kinesis consumer threads to launch"})
    private int threadCount = 16;

    @CommandLine.Option(names = {"--relay-pool-size"}, description = {"Number of relays (batch writers) to launch"})
    private int relayPoolSize = 8;

    @CommandLine.Option(names = {"--stop-after"}, description = {"Stop process after this amount of ms. Useful when you want to periodically restart the process for housekeeping and rediscovering shards. Set to 0 to disable this functionality. Defaults to 0."})
    private long stopAfterMs = 0;

    @CommandLine.Option(names = {"--record-rejection-age"}, description = {"Age (in ms) for a record before it is rejected for insertion, use 0 to disable. Defauls to 0."})
    private long rejectionAgeMs = 0;

    @CommandLine.Option(names = {"--cluster"}, description = {"Cluster endpoint"})
    private String qdbUri = "qdb://127.0.0.1:2836";

    @CommandLine.Option(names = {"--metrics-namespace"}, description = {"Metrics namespace to use"})
    private String metricsNamespace = "quasardb.kinesis";

    @CommandLine.Option(names = {"--worker-id"}, description = {"Numeric identifier of this worker, start at 0"})
    private int workerId = 0;

    @CommandLine.Option(names = {"--worker-total"}, description = {"Total number of workers, defaults to 1"})
    private int workerTotal = 1;

    @CommandLine.Option(names = {"--blacklist-file"}, description = {"File that contains sensor ids that are blacklisted. Defaults to /etc/qdb/kinesis.blacklist"})
    private String blacklistFile = "/etc/qdb/kinesis.blacklist";

    Connector() {
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Integer call() throws Exception {
        SessionFactory sessionFactory;
        logger.warn("Launching Kinesis connector");
        if (this.roleArn != null && this.externalId == null) {
            logger.error("For assuming roles, you must always provide an external id.");
            return 1;
        }
        if (this.parserClassName == null) {
            logger.error("No parser class provided: you must provide a Parser class implements the net.quasardb.kinesis.Parser interface.");
            return 1;
        }
        Class loadClass = PluginLoader.loadClass(this.pluginPath, this.parserClassName, Parser.class);
        String tablePrefix = ((Parser) PluginLoader.instantiate(loadClass)).tablePrefix();
        logger.info("Loaded parser class: ", loadClass.toString());
        AwsCredentialsProvider assumeRoleProvider = this.roleArn != null ? Credentials.assumeRoleProvider(this.roleArn, this.externalId, "qdb-kinesis") : Credentials.defaultProvider();
        logger.info("got credentials provider: {} ", assumeRoleProvider.toString());
        Set<String> parseBlacklist = Util.parseBlacklist(this.blacklistFile);
        logger.info("Got blacklist: {}", parseBlacklist.toString());
        if (this.qdbClusterPublicKeyFile == null || this.qdbClusterPublicKeyFile.isEmpty() || this.qdbUserSecurityFile == null || this.qdbUserSecurityFile.isEmpty()) {
            logger.info("Using insecure connections");
            sessionFactory = new SessionFactory(this.qdbUri);
        } else {
            logger.info("Using secure connections, user_security_file = {}, cluster_public_key_file = {}", this.qdbUserSecurityFile, this.qdbClusterPublicKeyFile);
            sessionFactory = new SessionFactory(this.qdbUri, Session.SecurityOptions.ofFiles(this.qdbUserSecurityFile, this.qdbClusterPublicKeyFile));
        }
        if (!$assertionsDisabled && sessionFactory == null) {
            throw new AssertionError();
        }
        Util.createKinesisAsyncClient(assumeRoleProvider);
        KinesisClient createKinesisClient = Util.createKinesisClient(assumeRoleProvider);
        List<String> filterShardsForThisWorker = Util.filterShardsForThisWorker(Util.shardIdsByStreamName(createKinesisClient, this.streamName, this.includeParentShards), this.workerId, this.workerTotal);
        logger.info("got {} shard ids", Integer.valueOf(filterShardsForThisWorker.size()));
        final ArrayList arrayList = new ArrayList();
        RelayPool relayPool = new RelayPool(new RelayFactory(sessionFactory, Util.seedTables(tablePrefix), this.pushMode), this.relayPoolSize, this.relayResetIntervalMs);
        Metrics.registry.gauge("relay_pool.available", Arrays.asList(Tag.of("prefix", tablePrefix)), relayPool, relayPool2 -> {
            return relayPool2.size();
        });
        Metrics.startReport(this.metricsNamespace);
        HashMap hashMap = new HashMap();
        int i = 0;
        while (true) {
            Integer num = i;
            if (num.intValue() >= new Integer(this.threadCount).intValue()) {
                break;
            }
            hashMap.put(num, new ArrayList());
            i = Integer.valueOf(num.intValue() + 1);
        }
        int i2 = 0;
        for (String str : filterShardsForThisWorker) {
            int i3 = i2;
            i2++;
            ((List) hashMap.get(Integer.valueOf(i3 % this.threadCount))).add(str);
        }
        Counter counter = Metrics.registry.counter("records", Arrays.asList(Tag.of("prefix", tablePrefix)));
        for (int i4 = 0; i4 < this.threadCount; i4++) {
            List list = (List) hashMap.get(new Integer(i4));
            logger.info("Consumer {} has been assigned {} shards", Integer.valueOf(i4), Integer.valueOf(list.size()));
            Consumer consumer = new Consumer(createKinesisClient, sessionFactory, relayPool, this.relayQueueSize, this.rejectionAgeMs, parseBlacklist, loadClass, list, tablePrefix, counter, this.streamName);
            consumer.start();
            consumer.setName("Consumer " + i4 + "(" + tablePrefix + ")");
            arrayList.add(consumer);
            logger.info("Sleeping 100ms between initializations of consumers");
            Thread.sleep(100L);
        }
        logger.info("Waiting for relays to finish");
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Consumer) it2.next()).gracefulStop();
            }
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                Consumer consumer2 = (Consumer) it3.next();
                System.out.println("Waiting for consumer " + consumer2.getName() + " to stop");
                consumer2.isStopped().join();
            }
            System.out.println("All consumers stopped, now closing relay pool");
            try {
                relayPool.close();
            } catch (IOException e) {
                logger.error("An error occured while closing relay", (Throwable) e);
            }
            System.out.println("Cleanup done");
        }));
        if (this.stopAfterMs != 0) {
            logger.info("Scheduling connector to stop after {} ms", Long.valueOf(this.stopAfterMs));
            new Timer().schedule(new TimerTask() { // from class: net.quasardb.kinesis.Connector.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    Connector.logger.info("Stop timer triggered, gracefully stopping all consumers");
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        ((Consumer) it2.next()).gracefulStop();
                    }
                }
            }, this.stopAfterMs);
        }
        try {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Consumer) it2.next()).isStopped().join();
            }
            return 0;
        } catch (Exception e) {
            logger.error("Caught exception: ", (Throwable) e);
            return 1;
        }
    }

    public static void main(String... strArr) {
        System.exit(new CommandLine(new Connector()).execute(strArr));
    }

    static {
        $assertionsDisabled = !Connector.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) Connector.class);
    }
}
