package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.class */
public class AdjustStreamThreadCountTest {

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public TestName testName = new TestName();
    private final List<KafkaStreams.State> stateTransitionHistory = new ArrayList();
    private static String inputTopic;
    private static StreamsBuilder builder;
    private static Properties properties;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static String appId = "";
    public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setup() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        appId = "appId_" + safeUniqueTestName;
        inputTopic = "input" + safeUniqueTestName;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
        builder = new StreamsBuilder();
        builder.stream(inputTopic);
        properties = Utils.mkObjectProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("application.id", appId), Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath()), Utils.mkEntry("num.stream.threads", 2), Utils.mkEntry("default.key.serde", Serdes.StringSerde.class), Utils.mkEntry("default.value.serde", Serdes.StringSerde.class), Utils.mkEntry("session.timeout.ms", 10000)}));
    }

    private void startStreamsAndWaitForRunning(KafkaStreams kafkaStreams) throws InterruptedException {
        kafkaStreams.start();
        waitForRunning();
    }

    @After
    public void teardown() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(properties);
    }

    private void addStreamStateChangeListener(KafkaStreams kafkaStreams) {
        kafkaStreams.setStateListener((state, state2) -> {
            this.stateTransitionHistory.add(state);
        });
    }

    private void waitForRunning() throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return !this.stateTransitionHistory.isEmpty() && this.stateTransitionHistory.get(this.stateTransitionHistory.size() - 1).equals(KafkaStreams.State.RUNNING);
        }, DEFAULT_DURATION.toMillis(), () -> {
            return String.format("Client did not transit to state %s in %d seconds", KafkaStreams.State.RUNNING, Long.valueOf(DEFAULT_DURATION.toMillis() / 1000));
        });
    }

    private void waitForTransitionFromRebalancingToRunning() throws InterruptedException {
        waitForRunning();
        int size = this.stateTransitionHistory.size();
        MatcherAssert.assertThat("Client did not transit from REBALANCING to RUNNING. The observed state transitions are: " + this.stateTransitionHistory, Boolean.valueOf(size >= 2 && this.stateTransitionHistory.get(size - 2).equals(KafkaStreams.State.REBALANCING) && this.stateTransitionHistory.get(size - 1).equals(KafkaStreams.State.RUNNING)), CoreMatchers.is(true));
    }

    @Test
    public void shouldAddStreamThread() throws Exception {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            startStreamsAndWaitForRunning(kafkaStreams);
            int size = kafkaStreams.metadataForLocalThreads().size();
            MatcherAssert.assertThat(kafkaStreams.metadataForLocalThreads().stream().map(threadMetadata -> {
                return threadMetadata.threadName().split("-StreamThread-")[1];
            }).sorted().toArray(), CoreMatchers.equalTo(new String[]{"1", "2"}));
            this.stateTransitionHistory.clear();
            Optional addStreamThread = kafkaStreams.addStreamThread();
            MatcherAssert.assertThat(addStreamThread, CoreMatchers.not(Optional.empty()));
            TestUtils.waitForCondition(() -> {
                return ((Stream) kafkaStreams.metadataForLocalThreads().stream().sequential()).map((v0) -> {
                    return v0.threadName();
                }).anyMatch(str -> {
                    return str.equals(addStreamThread.orElse(""));
                });
            }, "Wait for the thread to be added");
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.metadataForLocalThreads().size()), CoreMatchers.equalTo(Integer.valueOf(size + 1)));
            MatcherAssert.assertThat(kafkaStreams.metadataForLocalThreads().stream().map(threadMetadata2 -> {
                return threadMetadata2.threadName().split("-StreamThread-")[1];
            }).sorted().toArray(), CoreMatchers.equalTo(new String[]{"1", "2", "3"}));
            waitForTransitionFromRebalancingToRunning();
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldRemoveStreamThread() throws Exception {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            startStreamsAndWaitForRunning(kafkaStreams);
            int size = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            MatcherAssert.assertThat(((String) kafkaStreams.removeStreamThread().get()).split("-")[0], CoreMatchers.equalTo(appId));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.metadataForLocalThreads().size()), CoreMatchers.equalTo(Integer.valueOf(size - 1)));
            waitForTransitionFromRebalancingToRunning();
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldRemoveStreamThreadWithStaticMembership() throws Exception {
        properties.put("group.instance.id", "member-A");
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            startStreamsAndWaitForRunning(kafkaStreams);
            int size = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            MatcherAssert.assertThat(((String) kafkaStreams.removeStreamThread().get()).split("-")[0], CoreMatchers.equalTo(appId));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.metadataForLocalThreads().size()), CoreMatchers.equalTo(Integer.valueOf(size - 1)));
            waitForTransitionFromRebalancingToRunning();
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldnNotRemoveStreamThreadWithinTimeout() throws Exception {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            startStreamsAndWaitForRunning(kafkaStreams);
            Assert.assertThrows(TimeoutException.class, () -> {
                kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION));
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldAddAndRemoveThreadsMultipleTimes() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            startStreamsAndWaitForRunning(kafkaStreams);
            int size = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            CountDownLatch countDownLatch = new CountDownLatch(2);
            Thread adjustCountHelperThread = adjustCountHelperThread(kafkaStreams, 4, countDownLatch);
            adjustCountHelperThread(kafkaStreams, 6, countDownLatch).start();
            adjustCountHelperThread.start();
            countDownLatch.await(30L, TimeUnit.SECONDS);
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.metadataForLocalThreads().size()), CoreMatchers.equalTo(Integer.valueOf(size)));
            waitForTransitionFromRebalancingToRunning();
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    private Thread adjustCountHelperThread(KafkaStreams kafkaStreams, int i, CountDownLatch countDownLatch) {
        return new Thread(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                kafkaStreams.addStreamThread();
                kafkaStreams.removeStreamThread();
            }
            countDownLatch.countDown();
        });
    }

    @Test
    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            startStreamsAndWaitForRunning(kafkaStreams);
            int size = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            MatcherAssert.assertThat(kafkaStreams.metadataForLocalThreads().stream().map(threadMetadata -> {
                return threadMetadata.threadName().split("-StreamThread-")[1];
            }).sorted().toArray(), CoreMatchers.equalTo(new String[]{"1", "2"}));
            Optional addStreamThread = kafkaStreams.addStreamThread();
            MatcherAssert.assertThat("New thread has index 3", "3".equals(((String) addStreamThread.get()).split("-StreamThread-")[1]));
            TestUtils.waitForCondition(() -> {
                return ((Stream) kafkaStreams.metadataForLocalThreads().stream().sequential()).map((v0) -> {
                    return v0.threadName();
                }).anyMatch(str -> {
                    return str.equals(addStreamThread.get());
                });
            }, "Stream thread has not been added");
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.metadataForLocalThreads().size()), CoreMatchers.equalTo(Integer.valueOf(size + 1)));
            MatcherAssert.assertThat(kafkaStreams.metadataForLocalThreads().stream().map(threadMetadata2 -> {
                return threadMetadata2.threadName().split("-StreamThread-")[1];
            }).sorted().toArray(), CoreMatchers.equalTo(new String[]{"1", "2", "3"}));
            waitForTransitionFromRebalancingToRunning();
            int size2 = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            Optional removeStreamThread = kafkaStreams.removeStreamThread();
            MatcherAssert.assertThat(removeStreamThread, CoreMatchers.not(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.metadataForLocalThreads().size()), CoreMatchers.equalTo(Integer.valueOf(size2 - 1)));
            waitForTransitionFromRebalancingToRunning();
            this.stateTransitionHistory.clear();
            Optional addStreamThread2 = kafkaStreams.addStreamThread();
            MatcherAssert.assertThat(addStreamThread2, CoreMatchers.not(Optional.empty()));
            TestUtils.waitForCondition(() -> {
                return ((Stream) kafkaStreams.metadataForLocalThreads().stream().sequential()).map((v0) -> {
                    return v0.threadName();
                }).anyMatch(str -> {
                    return str.equals(addStreamThread2.orElse(""));
                });
            }, "Wait for the thread to be added");
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.metadataForLocalThreads().size()), CoreMatchers.equalTo(Integer.valueOf(size2)));
            MatcherAssert.assertThat(kafkaStreams.metadataForLocalThreads().stream().map(threadMetadata3 -> {
                return threadMetadata3.threadName().split("-StreamThread-")[1];
            }).sorted().toArray(), CoreMatchers.equalTo(new String[]{"1", "2", "3"}));
            MatcherAssert.assertThat("the new thread should have received the old threads name", addStreamThread2.equals(removeStreamThread));
            waitForTransitionFromRebalancingToRunning();
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testConcurrentlyAccessThreads() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            startStreamsAndWaitForRunning(kafkaStreams);
            int size = kafkaStreams.metadataForLocalThreads().size();
            AtomicReference atomicReference = new AtomicReference();
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 5; i++) {
                newFixedThreadPool.execute(() -> {
                    for (int i2 = 0; i2 < 4; i2++) {
                        try {
                            if (!kafkaStreams.addStreamThread().isPresent()) {
                                throw new RuntimeException("failed to create stream thread");
                            }
                            kafkaStreams.metadataForLocalThreads();
                            if (!kafkaStreams.removeStreamThread().isPresent()) {
                                throw new RuntimeException("failed to delete a stream thread");
                            }
                        } catch (Exception e) {
                            atomicReference.set(e);
                            return;
                        }
                    }
                });
            }
            newFixedThreadPool.shutdown();
            Assert.assertTrue(newFixedThreadPool.awaitTermination(60L, TimeUnit.SECONDS));
            Assert.assertNull(atomicReference.get());
            Assert.assertEquals(size, kafkaStreams.metadataForLocalThreads().size());
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldResizeCacheAfterThreadRemovalTimesOut() throws InterruptedException {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("num.stream.threads", 2);
        properties2.put("cache.max.bytes.buffering", 10L);
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties2);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            startStreamsAndWaitForRunning(kafkaStreams);
            LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KafkaStreams.class);
            Throwable th2 = null;
            try {
                try {
                    Assert.assertThrows(TimeoutException.class, () -> {
                        kafkaStreams.removeStreamThread(Duration.ofSeconds(0L));
                    });
                    Iterator<String> it = createAndRegister.getMessages().iterator();
                    while (it.hasNext()) {
                        if (it.next().endsWith("Resizing thread cache due to thread removal, new cache size per thread is 10")) {
                            if (createAndRegister != null) {
                                if (0 != 0) {
                                    try {
                                        createAndRegister.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    createAndRegister.close();
                                }
                            }
                            if (kafkaStreams != null) {
                                if (0 == 0) {
                                    kafkaStreams.close();
                                    return;
                                }
                                try {
                                    kafkaStreams.close();
                                    return;
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                    return;
                                }
                            }
                            return;
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 != 0) {
                            try {
                                createAndRegister.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            createAndRegister.close();
                        }
                    }
                    Assert.fail();
                } catch (Throwable th6) {
                    th2 = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (createAndRegister != null) {
                    if (th2 != null) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th8) {
                            th2.addSuppressed(th8);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
                throw th7;
            }
        } finally {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
        }
    }

    @Test
    public void shouldResizeCacheAfterThreadReplacement() throws InterruptedException {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("num.stream.threads", 2);
        properties2.put("cache.max.bytes.buffering", 10L);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(inputTopic).transform(() -> {
            return new Transformer<String, String, KeyValue<String, String>>() { // from class: org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.1
                public void init(ProcessorContext processorContext) {
                    Duration ofSeconds = Duration.ofSeconds(1L);
                    PunctuationType punctuationType = PunctuationType.WALL_CLOCK_TIME;
                    AtomicBoolean atomicBoolean2 = atomicBoolean;
                    processorContext.schedule(ofSeconds, punctuationType, j -> {
                        if (Thread.currentThread().getName().endsWith("StreamThread-1") && atomicBoolean2.get()) {
                            atomicBoolean2.set(false);
                            throw new RuntimeException("BOOM");
                        }
                    });
                }

                public KeyValue<String, String> transform(String str, String str2) {
                    return new KeyValue<>(str, str2);
                }

                public void close() {
                }
            };
        }, new String[0]);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties2);
        Throwable th = null;
        try {
            addStreamStateChangeListener(kafkaStreams);
            kafkaStreams.setUncaughtExceptionHandler(th2 -> {
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            });
            startStreamsAndWaitForRunning(kafkaStreams);
            this.stateTransitionHistory.clear();
            LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
            Throwable th3 = null;
            try {
                atomicBoolean.set(true);
                TestUtils.waitForCondition(() -> {
                    return !atomicBoolean.get();
                }, "StreamThread did not hit and reset the injected error");
                waitForTransitionFromRebalancingToRunning();
                Iterator<String> it = createAndRegister.getMessages().iterator();
                while (it.hasNext()) {
                    if (it.next().endsWith("Adding StreamThread-3, there will now be 2 live threads and the new cache size per thread is 5")) {
                        if (kafkaStreams != null) {
                            if (0 == 0) {
                                kafkaStreams.close();
                                return;
                            }
                            try {
                                kafkaStreams.close();
                                return;
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                                return;
                            }
                        }
                        return;
                    }
                }
                if (createAndRegister != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th5) {
                            th3.addSuppressed(th5);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
                Assert.fail();
            } finally {
                if (createAndRegister != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th6) {
                            th3.addSuppressed(th6);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
            }
        } finally {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th7) {
                        th.addSuppressed(th7);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
        }
    }
}
