package org.apache.kafka.connect.util.clusters;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120);
    private final KafkaServer[] brokers;
    private final Properties brokerConfig;
    private final Time time;
    private final int[] currentBrokerPorts;
    private final String[] currentBrokerLogDirs;
    private final boolean hasListenerConfig;
    final Map<String, String> clientConfigs;
    private EmbeddedZookeeper zookeeper;
    private ListenerName listenerName;
    private KafkaProducer<byte[], byte[]> producer;

    public EmbeddedKafkaCluster(int i, Properties properties) {
        this(i, properties, Collections.emptyMap());
    }

    public EmbeddedKafkaCluster(int i, Properties properties, Map<String, String> map) {
        this.time = new MockTime();
        this.zookeeper = null;
        this.listenerName = new ListenerName("PLAINTEXT");
        this.brokers = new KafkaServer[i];
        this.currentBrokerPorts = new int[i];
        this.currentBrokerLogDirs = new String[i];
        this.brokerConfig = properties;
        this.hasListenerConfig = properties.get(KafkaConfig.ListenersProp()) != null;
        this.clientConfigs = map;
    }

    public void startOnlyKafkaOnSamePorts() {
        doStart();
    }

    public void start() {
        this.zookeeper = new EmbeddedZookeeper();
        Arrays.fill(this.currentBrokerPorts, 0);
        Arrays.fill(this.currentBrokerLogDirs, (Object) null);
        doStart();
    }

    private void doStart() {
        this.brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
        putIfAbsent(this.brokerConfig, KafkaConfig.DeleteTopicEnableProp(), (Object) true);
        putIfAbsent(this.brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), (Object) 0);
        putIfAbsent(this.brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (Object) Short.valueOf((short) this.brokers.length));
        putIfAbsent(this.brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), (Object) false);
        putIfAbsent(this.brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), (Object) 2097152L);
        Object obj = this.brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
        if (obj == null) {
            obj = this.brokerConfig.get(KafkaConfig.InterBrokerSecurityProtocolProp());
        }
        if (obj == null) {
            obj = "PLAINTEXT";
        }
        this.listenerName = new ListenerName(obj.toString());
        for (int i = 0; i < this.brokers.length; i++) {
            this.brokerConfig.put(KafkaConfig.BrokerIdProp(), Integer.valueOf(i));
            this.currentBrokerLogDirs[i] = this.currentBrokerLogDirs[i] == null ? createLogDir() : this.currentBrokerLogDirs[i];
            this.brokerConfig.put(KafkaConfig.LogDirProp(), this.currentBrokerLogDirs[i]);
            if (!this.hasListenerConfig) {
                this.brokerConfig.put(KafkaConfig.ListenersProp(), this.listenerName.value() + "://localhost:" + this.currentBrokerPorts[i]);
            }
            this.brokers[i] = TestUtils.createServer(new KafkaConfig(this.brokerConfig, true), this.time);
            this.currentBrokerPorts[i] = this.brokers[i].boundPort(this.listenerName);
        }
        HashMap hashMap = new HashMap(this.clientConfigs);
        hashMap.put("bootstrap.servers", bootstrapServers());
        if (sslEnabled()) {
            hashMap.put("ssl.truststore.location", this.brokerConfig.get("ssl.truststore.location"));
            hashMap.put("ssl.truststore.password", this.brokerConfig.get("ssl.truststore.password"));
            hashMap.put("security.protocol", "SSL");
        }
        this.producer = new KafkaProducer<>(hashMap, new ByteArraySerializer(), new ByteArraySerializer());
    }

    public void stopOnlyKafka() {
        stop(false, false);
    }

    public void stop() {
        stop(true, true);
    }

    private void stop(boolean z, boolean z2) {
        try {
            if (this.producer != null) {
                this.producer.close();
            }
            for (KafkaServer kafkaServer : this.brokers) {
                try {
                    kafkaServer.shutdown();
                } catch (Throwable th) {
                    String format = String.format("Could not shutdown broker at %s", address(kafkaServer));
                    log.error(format, th);
                    throw new RuntimeException(format, th);
                }
            }
            if (z) {
                for (KafkaServer kafkaServer2 : this.brokers) {
                    try {
                        log.info("Cleaning up kafka log dirs at {}", kafkaServer2.config().logDirs());
                        CoreUtils.delete(kafkaServer2.config().logDirs());
                    } catch (Throwable th2) {
                        String format2 = String.format("Could not clean up log dirs for broker at %s", address(kafkaServer2));
                        log.error(format2, th2);
                        throw new RuntimeException(format2, th2);
                    }
                }
            }
            if (z2) {
                try {
                    this.zookeeper.shutdown();
                } catch (Throwable th3) {
                    String format3 = String.format("Could not shutdown zookeeper at %s", zKConnectString());
                    log.error(format3, th3);
                    throw new RuntimeException(format3, th3);
                }
            }
        } catch (Exception e) {
            log.error("Could not shutdown producer ", e);
            throw new RuntimeException("Could not shutdown producer", e);
        }
    }

    private static void putIfAbsent(Properties properties, String str, Object obj) {
        if (properties.containsKey(str)) {
            return;
        }
        properties.put(str, obj);
    }

    private String createLogDir() {
        try {
            return Files.createTempDirectory(getClass().getSimpleName(), new FileAttribute[0]).toString();
        } catch (IOException e) {
            log.error("Unable to create temporary log directory", e);
            throw new ConnectException("Unable to create temporary log directory", e);
        }
    }

    public String bootstrapServers() {
        return (String) Arrays.stream(this.brokers).map(this::address).collect(Collectors.joining(","));
    }

    public String address(KafkaServer kafkaServer) {
        EndPoint endPoint = (EndPoint) kafkaServer.advertisedListeners().head();
        return endPoint.host() + ":" + endPoint.port();
    }

    public String zKConnectString() {
        return "127.0.0.1:" + this.zookeeper.port();
    }

    public Set<KafkaServer> runningBrokers() {
        return brokersInState(brokerState -> {
            return brokerState == BrokerState.RUNNING;
        });
    }

    public Set<KafkaServer> brokersInState(Predicate<BrokerState> predicate) {
        return (Set) Arrays.stream(this.brokers).filter(kafkaServer -> {
            return hasState(kafkaServer, predicate);
        }).collect(Collectors.toSet());
    }

    protected boolean hasState(KafkaServer kafkaServer, Predicate<BrokerState> predicate) {
        try {
            return predicate.test(kafkaServer.brokerState());
        } catch (Throwable th) {
            return false;
        }
    }

    public boolean sslEnabled() {
        String property = this.brokerConfig.getProperty(KafkaConfig.ListenersProp());
        return property != null && property.contains("SSL");
    }

    public Map<String, Optional<TopicDescription>> describeTopics(String... strArr) {
        return describeTopics(new HashSet(Arrays.asList(strArr)));
    }

    /* JADX WARN: Failed to calculate best type for var: r8v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r8v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r9v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r9v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 8, insn: 0x010a: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:44:0x010a */
    /* JADX WARN: Not initialized variable reg: 9, insn: 0x010e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:46:0x010e */
    /* JADX WARN: Type inference failed for: r8v1, types: [org.apache.kafka.clients.admin.Admin] */
    /* JADX WARN: Type inference failed for: r9v0, types: [java.lang.Throwable] */
    public Map<String, Optional<TopicDescription>> describeTopics(Set<String> set) {
        HashMap hashMap = new HashMap();
        log.info("Describing topics {}", set);
        try {
            try {
                Admin createAdminClient = createAdminClient();
                Throwable th = null;
                for (Map.Entry entry : createAdminClient.describeTopics(set).topicNameValues().entrySet()) {
                    String str = (String) entry.getKey();
                    try {
                        TopicDescription topicDescription = (TopicDescription) ((KafkaFuture) entry.getValue()).get();
                        hashMap.put(str, Optional.of(topicDescription));
                        log.info("Found topic {} : {}", str, topicDescription);
                    } catch (ExecutionException e) {
                        if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                            throw new AssertionError("Could not describe topic(s)" + set, e);
                        }
                        hashMap.put(str, Optional.empty());
                        log.info("Found non-existent topic {}", str);
                    }
                }
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                log.info("Found topics {}", hashMap);
                return hashMap;
            } finally {
            }
        } catch (Exception e2) {
            throw new AssertionError("Could not describe topic(s) " + set, e2);
        }
    }

    public void createTopic(String str) {
        createTopic(str, 1);
    }

    public void createTopic(String str, int i) {
        createTopic(str, i, 1, Collections.emptyMap());
    }

    public void createTopic(String str, int i, int i2, Map<String, String> map) {
        createTopic(str, i, i2, map, Collections.emptyMap());
    }

    public void createTopic(String str, int i, int i2, Map<String, String> map, Map<String, Object> map2) {
        if (i2 > this.brokers.length) {
            throw new InvalidReplicationFactorException("Insufficient brokers (" + this.brokers.length + ") for desired replication (" + i2 + ")");
        }
        log.info("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), map});
        NewTopic newTopic = new NewTopic(str, i, (short) i2);
        newTopic.configs(map);
        try {
            Admin createAdminClient = createAdminClient(map2);
            Throwable th = null;
            try {
                createAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteTopic(String str) {
        try {
            Admin createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                try {
                    createAdminClient.deleteTopics(Collections.singleton(str)).all().get();
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void produce(String str, String str2) {
        produce(str, null, null, str2);
    }

    public void produce(String str, String str2, String str3) {
        produce(str, null, str2, str3);
    }

    public void produce(String str, Integer num, String str2, String str3) {
        ProducerRecord producerRecord = new ProducerRecord(str, num, str2 == null ? null : str2.getBytes(), str3 == null ? null : str3.getBytes());
        try {
            this.producer.send(producerRecord).get(DEFAULT_PRODUCE_SEND_DURATION_MS, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new KafkaException("Could not produce message: " + producerRecord, e);
        }
    }

    public Admin createAdminClient(Map<String, Object> map) {
        Properties mkProperties = Utils.mkProperties(this.clientConfigs);
        mkProperties.putAll(map);
        mkProperties.put("bootstrap.servers", bootstrapServers());
        Object obj = this.brokerConfig.get(KafkaConfig.ListenersProp());
        if (obj != null && obj.toString().contains("SSL")) {
            mkProperties.put("ssl.truststore.location", this.brokerConfig.get("ssl.truststore.location"));
            mkProperties.put("ssl.truststore.password", ((Password) this.brokerConfig.get("ssl.truststore.password")).value());
            mkProperties.put("security.protocol", "SSL");
        }
        return Admin.create(mkProperties);
    }

    public Admin createAdminClient() {
        return createAdminClient(Collections.emptyMap());
    }

    public ConsumerRecords<byte[], byte[]> consume(int i, long j, String... strArr) {
        return consume(i, j, Collections.emptyMap(), strArr);
    }

    public ConsumerRecords<byte[], byte[]> consume(int i, long j, Map<String, Object> map, String... strArr) {
        HashMap hashMap = new HashMap();
        int i2 = 0;
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo = createConsumerAndSubscribeTo(map, strArr);
        Throwable th = null;
        try {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                long j2 = j;
                while (j2 > 0) {
                    log.debug("Consuming from {} for {} millis.", Arrays.toString(strArr), Long.valueOf(j2));
                    ConsumerRecords poll = createConsumerAndSubscribeTo.poll(Duration.ofMillis(j2));
                    if (poll.isEmpty()) {
                        j2 = j - (System.currentTimeMillis() - currentTimeMillis);
                    } else {
                        for (TopicPartition topicPartition : poll.partitions()) {
                            List records = poll.records(topicPartition);
                            ((List) hashMap.computeIfAbsent(topicPartition, topicPartition2 -> {
                                return new ArrayList();
                            })).addAll(records);
                            i2 += records.size();
                        }
                        if (i2 >= i) {
                            ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(hashMap);
                            if (createConsumerAndSubscribeTo != null) {
                                if (0 != 0) {
                                    try {
                                        createConsumerAndSubscribeTo.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    createConsumerAndSubscribeTo.close();
                                }
                            }
                            return consumerRecords;
                        }
                        j2 = j - (System.currentTimeMillis() - currentTimeMillis);
                    }
                }
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                throw new RuntimeException("Could not find enough records. found " + i2 + ", expected " + i);
            } finally {
            }
        } catch (Throwable th4) {
            if (createConsumerAndSubscribeTo != null) {
                if (th != null) {
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createConsumerAndSubscribeTo.close();
                }
            }
            throw th4;
        }
    }

    public ConsumerRecords<byte[], byte[]> consumeAll(long j, Map<String, Object> map, Map<String, Object> map2, String... strArr) throws TimeoutException, InterruptedException, ExecutionException {
        long currentTimeMillis = System.currentTimeMillis() + j;
        KafkaConsumer<byte[], byte[]> createConsumer = createConsumer(map != null ? map : Collections.emptyMap());
        Admin createAdminClient = createAdminClient(map2 != null ? map2 : Collections.emptyMap());
        Set<TopicPartition> listPartitions = listPartitions(currentTimeMillis - System.currentTimeMillis(), createAdminClient, Arrays.asList(strArr));
        Map<TopicPartition, Long> readEndOffsets = readEndOffsets(currentTimeMillis - System.currentTimeMillis(), createAdminClient, listPartitions);
        Map map3 = (Map) listPartitions.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return new ArrayList();
        }));
        createConsumer.assign(listPartitions);
        while (!readEndOffsets.isEmpty()) {
            Iterator<Map.Entry<TopicPartition, Long>> it = readEndOffsets.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<TopicPartition, Long> next = it.next();
                if (createConsumer.position(next.getKey()) >= next.getValue().longValue()) {
                    it.remove();
                } else {
                    long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                    if (currentTimeMillis2 <= 0) {
                        throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(strArr) + " within " + j + "ms");
                    }
                    ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(currentTimeMillis2));
                    poll.partitions().forEach(topicPartition2 -> {
                        ((List) map3.get(topicPartition2)).addAll(poll.records(topicPartition2));
                    });
                }
            }
        }
        return new ConsumerRecords<>(map3);
    }

    private Set<TopicPartition> listPartitions(long j, Admin admin, Collection<String> collection) throws TimeoutException, InterruptedException, ExecutionException {
        Assert.assertFalse("collection of topics may not be empty", collection.isEmpty());
        return (Set) ((Map) admin.describeTopics(collection).allTopicNames().get(j, TimeUnit.MILLISECONDS)).entrySet().stream().flatMap(entry -> {
            return ((TopicDescription) entry.getValue()).partitions().stream().map(topicPartitionInfo -> {
                return new TopicPartition((String) entry.getKey(), topicPartitionInfo.partition());
            });
        }).collect(Collectors.toSet());
    }

    private Map<TopicPartition, Long> readEndOffsets(long j, Admin admin, Collection<TopicPartition> collection) throws TimeoutException, InterruptedException, ExecutionException {
        Assert.assertFalse("collection of topic partitions may not be empty", collection.isEmpty());
        return (Map) ((Map) admin.listOffsets((Map) collection.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return OffsetSpec.latest();
        })), new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED)).all().get(j, TimeUnit.MILLISECONDS)).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) entry.getValue()).offset());
        }));
    }

    public KafkaConsumer<byte[], byte[]> createConsumer(Map<String, Object> map) {
        HashMap hashMap = new HashMap(this.clientConfigs);
        hashMap.putAll(map);
        putIfAbsent(hashMap, "group.id", UUID.randomUUID().toString());
        putIfAbsent(hashMap, "bootstrap.servers", bootstrapServers());
        putIfAbsent(hashMap, "enable.auto.commit", "false");
        putIfAbsent(hashMap, "auto.offset.reset", "earliest");
        putIfAbsent(hashMap, "key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        putIfAbsent(hashMap, "value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        if (sslEnabled()) {
            putIfAbsent(hashMap, "ssl.truststore.location", this.brokerConfig.get("ssl.truststore.location"));
            putIfAbsent(hashMap, "ssl.truststore.password", this.brokerConfig.get("ssl.truststore.password"));
            putIfAbsent(hashMap, "security.protocol", "SSL");
        }
        try {
            return new KafkaConsumer<>(hashMap);
        } catch (Throwable th) {
            throw new ConnectException("Failed to create consumer", th);
        }
    }

    public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(Map<String, Object> map, String... strArr) {
        KafkaConsumer<byte[], byte[]> createConsumer = createConsumer(map);
        createConsumer.subscribe(Arrays.asList(strArr));
        return createConsumer;
    }

    public KafkaProducer<byte[], byte[]> createProducer(Map<String, Object> map) {
        HashMap hashMap = new HashMap(this.clientConfigs);
        hashMap.putAll(map);
        putIfAbsent(hashMap, "bootstrap.servers", bootstrapServers());
        putIfAbsent(hashMap, "key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        putIfAbsent(hashMap, "value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        if (sslEnabled()) {
            putIfAbsent(hashMap, "ssl.truststore.location", this.brokerConfig.get("ssl.truststore.location"));
            putIfAbsent(hashMap, "ssl.truststore.password", this.brokerConfig.get("ssl.truststore.password"));
            putIfAbsent(hashMap, "security.protocol", "SSL");
        }
        try {
            return new KafkaProducer<>(hashMap);
        } catch (Throwable th) {
            throw new ConnectException("Failed to create producer", th);
        }
    }

    private static void putIfAbsent(Map<String, Object> map, String str, Object obj) {
        if (map.containsKey(str)) {
            return;
        }
        map.put(str, obj);
    }
}
