package org.apache.kafka.streams;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest.class */
public class KafkaStreamsTest {
    private static final int NUM_THREADS = 2;
    private static final String APPLICATION_ID = "appId";
    private static final String CLIENT_ID = "test-client";
    private static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
    private MockClientSupplier supplier;
    private MockTime time;
    private Properties props;
    private MockAdminClient adminClient;
    private StateListenerStub streamsStateListener;

    @Mock
    private StreamThread streamThreadOne;

    @Mock
    private StreamThread streamThreadTwo;

    @Captor
    private ArgumentCaptor<StreamThread.StateListener> threadStateListenerCapture;
    private MockedStatic<ClientMetrics> clientMetricsMockedStatic;
    private MockedStatic<StreamThread> streamThreadMockedStatic;
    private MockedStatic<StreamsConfigUtils> streamsConfigUtils;
    private MockedConstruction<GlobalStreamThread> globalStreamThreadMockedConstruction;
    private MockedConstruction<Metrics> metricsMockedConstruction;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public TestName testName = new TestName();

    /* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest$StateListenerStub.class */
    public static class StateListenerStub implements KafkaStreams.StateListener {
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        int numChanges = 0;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap();

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            long longValue = this.mapStates.containsKey(state) ? this.mapStates.get(state).longValue() : 0L;
            this.numChanges++;
            this.oldState = state2;
            this.newState = state;
            this.mapStates.put(state, Long.valueOf(longValue + 1));
        }
    }

    @Before
    public void before() throws Exception {
        this.time = new MockTime();
        this.supplier = new MockClientSupplier();
        this.supplier.setCluster(Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))));
        this.adminClient = this.supplier.getAdmin(null);
        this.streamsStateListener = new StateListenerStub();
        this.props = new Properties();
        this.props.put("application.id", APPLICATION_ID);
        this.props.put("client.id", CLIENT_ID);
        this.props.put("bootstrap.servers", "localhost:2018");
        this.props.put("metric.reporters", MockMetricsReporter.class.getName());
        this.props.put("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
        prepareStreams();
    }

    @After
    public void tearDown() {
        if (this.clientMetricsMockedStatic != null) {
            this.clientMetricsMockedStatic.close();
        }
        if (this.streamThreadMockedStatic != null) {
            this.streamThreadMockedStatic.close();
        }
        if (this.globalStreamThreadMockedConstruction != null) {
            this.globalStreamThreadMockedConstruction.close();
        }
        if (this.metricsMockedConstruction != null) {
            this.metricsMockedConstruction.close();
        }
        if (this.streamsConfigUtils != null) {
            this.streamsConfigUtils.close();
        }
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    private void prepareStreams() throws Exception {
        this.metricsMockedConstruction = Mockito.mockConstruction(Metrics.class, (metrics, context) -> {
            Assert.assertEquals(4L, context.arguments().size());
            List list = (List) context.arguments().get(1);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ((MetricsReporter) it.next()).init(Collections.emptyList());
            }
            ((Metrics) Mockito.doAnswer(invocationOnMock -> {
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    ((MetricsReporter) it2.next()).close();
                }
                return null;
            }).when(metrics)).close();
        });
        this.clientMetricsMockedStatic = Mockito.mockStatic(ClientMetrics.class);
        this.clientMetricsMockedStatic.when(ClientMetrics::version).thenReturn("1.56");
        this.clientMetricsMockedStatic.when(ClientMetrics::commitId).thenReturn("1a2b3c4d5e");
        ClientMetrics.addVersionMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class));
        ClientMetrics.addCommitIdMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class));
        ClientMetrics.addApplicationIdMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (String) Mockito.eq(APPLICATION_ID));
        ClientMetrics.addTopologyDescriptionMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (Gauge) ArgumentMatchers.any());
        ClientMetrics.addStateMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (Gauge) ArgumentMatchers.any());
        ClientMetrics.addNumAliveStreamThreadMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (Gauge) ArgumentMatchers.any());
        this.streamThreadMockedStatic = Mockito.mockStatic(StreamThread.class);
        this.streamThreadMockedStatic.when(() -> {
            StreamThread.create((TopologyMetadata) ArgumentMatchers.any(TopologyMetadata.class), (StreamsConfig) ArgumentMatchers.any(StreamsConfig.class), (KafkaClientSupplier) ArgumentMatchers.any(KafkaClientSupplier.class), (Admin) ArgumentMatchers.any(Admin.class), (UUID) ArgumentMatchers.any(UUID.class), (String) ArgumentMatchers.any(String.class), (StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (Time) ArgumentMatchers.any(Time.class), (StreamsMetadataState) ArgumentMatchers.any(StreamsMetadataState.class), Mockito.anyLong(), (StateDirectory) ArgumentMatchers.any(StateDirectory.class), (StateRestoreListener) ArgumentMatchers.any(StateRestoreListener.class), (StandbyUpdateListener) ArgumentMatchers.any(StandbyUpdateListener.class), Mockito.anyInt(), (Runnable) ArgumentMatchers.any(Runnable.class), (BiConsumer) ArgumentMatchers.any());
        }).thenReturn(this.streamThreadOne).thenReturn(this.streamThreadTwo);
        this.streamsConfigUtils = Mockito.mockStatic(StreamsConfigUtils.class);
        this.streamsConfigUtils.when(() -> {
            StreamsConfigUtils.processingMode((StreamsConfig) ArgumentMatchers.any(StreamsConfig.class));
        }).thenReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE);
        this.streamsConfigUtils.when(() -> {
            StreamsConfigUtils.eosEnabled((StreamsConfig) ArgumentMatchers.any(StreamsConfig.class));
        }).thenReturn(false);
        this.streamsConfigUtils.when(() -> {
            StreamsConfigUtils.getTotalCacheSize((StreamsConfig) ArgumentMatchers.any(StreamsConfig.class));
        }).thenReturn(10485760L);
        Mockito.when(Long.valueOf(this.streamThreadOne.getId())).thenReturn(1L);
        Mockito.when(Long.valueOf(this.streamThreadTwo.getId())).thenReturn(2L);
        prepareStreamThread(this.streamThreadOne, 1, true);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS, false);
        AtomicReference atomicReference = new AtomicReference(GlobalStreamThread.State.CREATED);
        this.globalStreamThreadMockedConstruction = Mockito.mockConstruction(GlobalStreamThread.class, (globalStreamThread, context2) -> {
            Mockito.when(globalStreamThread.state()).thenAnswer(invocationOnMock -> {
                return (GlobalStreamThread.State) atomicReference.get();
            });
            ((GlobalStreamThread) Mockito.doNothing().when(globalStreamThread)).setStateListener((StreamThread.StateListener) this.threadStateListenerCapture.capture());
            ((GlobalStreamThread) Mockito.doAnswer(invocationOnMock2 -> {
                atomicReference.set(GlobalStreamThread.State.RUNNING);
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(globalStreamThread, GlobalStreamThread.State.RUNNING, GlobalStreamThread.State.CREATED);
                return null;
            }).when(globalStreamThread)).start();
            ((GlobalStreamThread) Mockito.doAnswer(invocationOnMock3 -> {
                this.supplier.restoreConsumer.close();
                Iterator<MockProducer<byte[], byte[]>> it = this.supplier.producers.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                atomicReference.set(GlobalStreamThread.State.DEAD);
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(globalStreamThread, GlobalStreamThread.State.PENDING_SHUTDOWN, GlobalStreamThread.State.RUNNING);
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(globalStreamThread, GlobalStreamThread.State.DEAD, GlobalStreamThread.State.PENDING_SHUTDOWN);
                return null;
            }).when(globalStreamThread)).shutdown();
            Mockito.when(Boolean.valueOf(globalStreamThread.stillRunning())).thenReturn(Boolean.valueOf(atomicReference.get() == GlobalStreamThread.State.RUNNING));
        });
    }

    private void prepareStreamThread(StreamThread streamThread, int i, boolean z) throws Exception {
        AtomicReference atomicReference = new AtomicReference(StreamThread.State.CREATED);
        Mockito.when(streamThread.state()).thenAnswer(invocationOnMock -> {
            return (StreamThread.State) atomicReference.get();
        });
        ((StreamThread) Mockito.doNothing().when(streamThread)).setStateListener((StreamThread.StateListener) this.threadStateListenerCapture.capture());
        Mockito.when(streamThread.getStateLock()).thenReturn(new Object());
        ((StreamThread) Mockito.doAnswer(invocationOnMock2 -> {
            atomicReference.set(StreamThread.State.STARTING);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.STARTING, StreamThread.State.CREATED);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.STARTING);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.PARTITIONS_ASSIGNED, StreamThread.State.PARTITIONS_REVOKED);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.RUNNING, StreamThread.State.PARTITIONS_ASSIGNED);
            return null;
        }).when(streamThread)).start();
        Mockito.when(streamThread.getGroupInstanceID()).thenReturn(Optional.empty());
        Mockito.when(streamThread.threadMetadata()).thenReturn(new ThreadMetadataImpl("processId-StreamThread-" + i, "DEAD", "", "", Collections.emptySet(), "", Collections.emptySet(), Collections.emptySet()));
        Mockito.when(Boolean.valueOf(streamThread.waitOnThreadState((StreamThread.State) Mockito.isA(StreamThread.State.class), Mockito.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(streamThread.isThreadAlive())).thenReturn(true);
        Mockito.when(streamThread.getName()).thenReturn("processId-StreamThread-" + i);
        ((StreamThread) Mockito.doAnswer(invocationOnMock3 -> {
            this.supplier.consumer.close();
            this.supplier.restoreConsumer.close();
            Iterator<MockProducer<byte[], byte[]>> it = this.supplier.producers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            atomicReference.set(StreamThread.State.DEAD);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
            return null;
        }).when(streamThread)).shutdown();
        if (!z) {
            ((StreamThread) Mockito.doAnswer(invocationOnMock4 -> {
                Thread.sleep(2000L);
                return null;
            }).when(streamThread)).join();
        }
        Mockito.when(streamThread.readOnlyActiveTasks()).thenReturn(Collections.emptySet());
        Mockito.when(streamThread.readyOnlyAllTasks()).thenReturn(Collections.emptySet());
    }

    @Test
    public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.close();
                Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, kafkaStreams.state());
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.setStateListener(this.streamsStateListener);
            Assert.assertEquals(0L, this.streamsStateListener.numChanges);
            Assert.assertEquals(KafkaStreams.State.CREATED, kafkaStreams.state());
            kafkaStreams.start();
            TestUtils.waitForCondition(() -> {
                return this.streamsStateListener.numChanges == NUM_THREADS;
            }, "Streams never started.");
            Assert.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
            TestUtils.waitForCondition(() -> {
                return this.streamsStateListener.numChanges == NUM_THREADS;
            }, "Streams never started.");
            Assert.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
            Iterator it = kafkaStreams.threads.iterator();
            while (it.hasNext()) {
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange((StreamThread) it.next(), StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.RUNNING);
            }
            Assert.assertEquals(3L, this.streamsStateListener.numChanges);
            Assert.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
            Iterator it2 = kafkaStreams.threads.iterator();
            while (it2.hasNext()) {
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange((StreamThread) it2.next(), StreamThread.State.PARTITIONS_ASSIGNED, StreamThread.State.PARTITIONS_REVOKED);
            }
            Assert.assertEquals(3L, this.streamsStateListener.numChanges);
            Assert.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange((Thread) kafkaStreams.threads.get(1), StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.PARTITIONS_ASSIGNED);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange((Thread) kafkaStreams.threads.get(1), StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
            Assert.assertEquals(3L, this.streamsStateListener.numChanges);
            Assert.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
            for (StreamThread streamThread : kafkaStreams.threads) {
                if (streamThread != kafkaStreams.threads.get(1)) {
                    ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.RUNNING, StreamThread.State.PARTITIONS_ASSIGNED);
                }
            }
            Assert.assertEquals(4L, this.streamsStateListener.numChanges);
            Assert.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
            kafkaStreams.close();
            TestUtils.waitForCondition(() -> {
                return this.streamsStateListener.numChanges == 6;
            }, "Streams never closed.");
            Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, kafkaStreams.state());
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KafkaStreams.class);
        Throwable th = null;
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
            Throwable th2 = null;
            try {
                kafkaStreams.close();
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
                }, "Streams never stopped.");
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.not(CoreMatchers.hasItem(Matchers.containsString("ERROR"))));
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                Assert.assertTrue(this.supplier.consumer.closed());
                Assert.assertTrue(this.supplier.restoreConsumer.closed());
                Iterator<MockProducer<byte[], byte[]>> it = this.supplier.producers.iterator();
                while (it.hasNext()) {
                    Assert.assertTrue(it.next().closed());
                }
            } catch (Throwable th4) {
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createAndRegister.close();
                }
            }
        }
    }

    @Test
    public void testStateThreadClose() throws Exception {
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(2L, kafkaStreams.threads.size());
                Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
                kafkaStreams.start();
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.RUNNING;
                }, "Streams never started.");
                for (int i = 0; i < NUM_THREADS; i++) {
                    StreamThread streamThread = (StreamThread) kafkaStreams.threads.get(i);
                    streamThread.shutdown();
                    TestUtils.waitForCondition(() -> {
                        return streamThread.state() == StreamThread.State.DEAD;
                    }, "Thread never stopped.");
                    ((StreamThread) kafkaStreams.threads.get(i)).join();
                }
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams.metadataForLocalThreads().stream().allMatch(threadMetadata -> {
                        return threadMetadata.threadState().equals("DEAD");
                    });
                }, "Streams never stopped");
                kafkaStreams.close();
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
                }, "Streams never stopped.");
                Assert.assertNull(kafkaStreams.globalStreamThread);
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testStateGlobalThreadClose() throws Exception {
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KafkaStreams.class);
        Throwable th = null;
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
            Throwable th2 = null;
            try {
                try {
                    kafkaStreams.start();
                    TestUtils.waitForCondition(() -> {
                        return kafkaStreams.state() == KafkaStreams.State.RUNNING;
                    }, "Streams never started.");
                    GlobalStreamThread globalStreamThread = kafkaStreams.globalStreamThread;
                    globalStreamThread.shutdown();
                    TestUtils.waitForCondition(() -> {
                        return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
                    }, "Thread never stopped.");
                    globalStreamThread.join();
                    TestUtils.waitForCondition(() -> {
                        return kafkaStreams.state() == KafkaStreams.State.PENDING_ERROR;
                    }, "Thread never stopped.");
                    kafkaStreams.close();
                    TestUtils.waitForCondition(() -> {
                        return kafkaStreams.state() == KafkaStreams.State.ERROR;
                    }, "Thread never stopped.");
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem(Matchers.containsString("ERROR")));
                    if (kafkaStreams != null) {
                        if (0 != 0) {
                            try {
                                kafkaStreams.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            kafkaStreams.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (kafkaStreams != null) {
                    if (th2 != null) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int i = MockMetricsReporter.INIT_COUNT.get();
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals("some reporters including MockMetricsReporter should be initialized by calling on construction", 1L, MockMetricsReporter.INIT_COUNT.get() - i);
                kafkaStreams.start();
                int i2 = MockMetricsReporter.CLOSE_COUNT.get();
                kafkaStreams.close();
                Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.NOT_RUNNING);
                Assert.assertEquals(i2 + r0, MockMetricsReporter.CLOSE_COUNT.get());
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCloseIsIdempotent() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.close();
            int i = MockMetricsReporter.CLOSE_COUNT.get();
            kafkaStreams.close();
            Assert.assertEquals("subsequent close() calls should do nothing", i, MockMetricsReporter.CLOSE_COUNT.get());
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPauseResume() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            kafkaStreams.pause();
            Assert.assertTrue(kafkaStreams.isPaused());
            kafkaStreams.resume();
            Assert.assertFalse(kafkaStreams.isPaused());
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testStartingPaused() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.pause();
            kafkaStreams.start();
            Assert.assertTrue(kafkaStreams.isPaused());
            kafkaStreams.resume();
            Assert.assertFalse(kafkaStreams.isPaused());
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testShowPauseResumeAreIdempotent() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            kafkaStreams.pause();
            Assert.assertTrue(kafkaStreams.isPaused());
            kafkaStreams.pause();
            Assert.assertTrue(kafkaStreams.isPaused());
            kafkaStreams.resume();
            Assert.assertFalse(kafkaStreams.isPaused());
            kafkaStreams.resume();
            Assert.assertFalse(kafkaStreams.isPaused());
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldAddThreadWhenRunning() throws InterruptedException {
        this.props.put("num.stream.threads", 1);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            int size = kafkaStreams.threads.size();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, 15L, "wait until running");
            MatcherAssert.assertThat(kafkaStreams.addStreamThread(), Matchers.equalTo(Optional.of("processId-StreamThread-2")));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size + 1)));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotAddThreadWhenCreated() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            int size = kafkaStreams.threads.size();
            MatcherAssert.assertThat(kafkaStreams.addStreamThread(), Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size)));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotAddThreadWhenClosed() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            int size = kafkaStreams.threads.size();
            kafkaStreams.close();
            MatcherAssert.assertThat(kafkaStreams.addStreamThread(), Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size)));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotAddThreadWhenError() {
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                int size = kafkaStreams.threads.size();
                kafkaStreams.start();
                kafkaStreams.globalStreamThread.shutdown();
                MatcherAssert.assertThat(kafkaStreams.addStreamThread(), Matchers.equalTo(Optional.empty()));
                MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size)));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotReturnDeadThreads() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            this.streamThreadOne.shutdown();
            Set metadataForLocalThreads = kafkaStreams.metadataForLocalThreads();
            MatcherAssert.assertThat(Integer.valueOf(metadataForLocalThreads.size()), Matchers.equalTo(1));
            MatcherAssert.assertThat(metadataForLocalThreads, CoreMatchers.hasItem(this.streamThreadTwo.threadMetadata()));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldRemoveThread() throws InterruptedException {
        this.props.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            int size = kafkaStreams.threads.size();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, 15L, "Kafka Streams client did not reach state RUNNING");
            MatcherAssert.assertThat(kafkaStreams.removeStreamThread(), Matchers.equalTo(Optional.of("processId-StreamThread-1")));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size - 1)));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotRemoveThreadWhenNotRunning() {
        this.props.put("num.stream.threads", 1);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            MatcherAssert.assertThat(kafkaStreams.removeStreamThread(), Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(1));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCannotStartOnceClosed() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            kafkaStreams.close();
            try {
                kafkaStreams.start();
                Assert.fail("Should have throw IllegalStateException");
            } catch (IllegalStateException e) {
            }
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            try {
                kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener) null);
                Assert.fail("Should throw an IllegalStateException");
            } catch (IllegalStateException e) {
            }
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            Assert.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null);
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreateState() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            Assert.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null);
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionSettingStreamsUncaughtExceptionHandlerIfNull() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            Assert.assertThrows(NullPointerException.class, () -> {
                kafkaStreams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null);
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            try {
                kafkaStreams.setStateListener((KafkaStreams.StateListener) null);
                Assert.fail("Should throw IllegalStateException");
            } catch (IllegalStateException e) {
            }
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldAllowCleanupBeforeStartAndAfterClose() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.cleanUp();
            kafkaStreams.start();
        } finally {
            kafkaStreams.close();
            kafkaStreams.cleanUp();
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            try {
                kafkaStreams.cleanUp();
                Assert.fail("Should have thrown IllegalStateException");
            } catch (IllegalStateException e) {
                Assert.assertEquals("Cannot clean up while running.", e.getMessage());
            }
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowOnCleanupWhilePaused() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            kafkaStreams.pause();
            kafkaStreams.getClass();
            TestUtils.waitForCondition(kafkaStreams::isPaused, "Streams did not pause.");
            kafkaStreams.getClass();
            Assert.assertThrows("Cannot clean up while running.", IllegalStateException.class, kafkaStreams::cleanUp);
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDown() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return kafkaStreams.state() == KafkaStreams.State.RUNNING;
        }, "Streams never started.");
        kafkaStreams.close(Duration.ZERO);
        MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
        kafkaStreams.getClass();
        Assert.assertThrows(IllegalStateException.class, kafkaStreams::cleanUp);
        MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupFalse() throws InterruptedException {
        MockConsumer mockConsumer = (MockConsumer) Mockito.mock(MockConsumer.class, Mockito.withSettings().useConstructor(new Object[]{OffsetResetStrategy.EARLIEST}));
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.of("test-instance-id"));
        Mockito.when(mockConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(this.adminClient);
        Mockito.when(mockClientSupplier.getConsumer((Map) ArgumentMatchers.any())).thenReturn(mockConsumer);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.RUNNING;
                }, "Streams never started.");
                KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
                closeOptions.timeout(Duration.ZERO);
                closeOptions.leaveGroup(true);
                kafkaStreams.close(closeOptions);
                MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
                kafkaStreams.getClass();
                Assert.assertThrows(IllegalStateException.class, kafkaStreams::cleanUp);
                MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupTrue() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return kafkaStreams.state() == KafkaStreams.State.RUNNING;
        }, "Streams never started.");
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ZERO);
        kafkaStreams.close(closeOptions);
        MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
        kafkaStreams.getClass();
        Assert.assertThrows(IllegalStateException.class, kafkaStreams::cleanUp);
        MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
    }

    @Test
    public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.getClass();
            Assert.assertThrows(StreamsNotStartedException.class, kafkaStreams::metadataForAllStreamsClients);
            kafkaStreams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            kafkaStreams.getClass();
            Assert.assertThrows(IllegalStateException.class, kafkaStreams::metadataForAllStreamsClients);
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            Assert.assertThrows(StreamsNotStartedException.class, () -> {
                kafkaStreams.streamsMetadataForStore("store");
            });
            kafkaStreams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.streamsMetadataForStore("store");
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            Assert.assertThrows(StreamsNotStartedException.class, () -> {
                kafkaStreams.queryMetadataForKey("store", "key", Serdes.String().serializer());
            });
            kafkaStreams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.queryMetadataForKey("store", "key", Serdes.String().serializer());
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldGetQueryMetadataWithSerializerWhenRunningOrRebalancing() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            Assert.assertEquals(KeyQueryMetadata.NOT_AVAILABLE, kafkaStreams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            Assert.assertThrows(StreamsNotStartedException.class, () -> {
                kafkaStreams.queryMetadataForKey("store", "key", (str, str2, obj, i) -> {
                    return 0;
                });
            });
            kafkaStreams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.queryMetadataForKey("store", "key", (str, str2, obj, i) -> {
                    return 0;
                });
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
                Assert.assertThrows(UnknownStateStoreException.class, () -> {
                });
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotGetStoreWhenWhenNotRunningOrRebalancing() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            Assert.assertThrows(StreamsNotStartedException.class, () -> {
            });
            kafkaStreams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> {
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReturnEmptyLocalStorePartitionLags() {
        ListOffsetsResult listOffsetsResult = (ListOffsetsResult) Mockito.mock(ListOffsetsResult.class);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(Collections.emptyMap());
        Mockito.when(listOffsetsResult.all()).thenReturn(kafkaFutureImpl);
        MockAdminClient mockAdminClient = (MockAdminClient) Mockito.spy(MockAdminClient.class);
        Mockito.when(mockAdminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(listOffsetsResult);
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(mockAdminClient);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            Assert.assertEquals(0L, kafkaStreams.allLocalStorePartitionLags().size());
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier);
        Throwable th = null;
        try {
            Assert.assertFalse(kafkaStreams.close(Duration.ofMillis(10L)));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForClose() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            Assert.assertThrows(IllegalArgumentException.class, () -> {
                kafkaStreams.close(Duration.ofMillis(-1L));
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotBlockInCloseForZeroDuration() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            Assert.assertFalse(kafkaStreams.close(Duration.ZERO));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupFalseWhenThreadsHaventTerminated() {
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(10L));
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier);
        Throwable th = null;
        try {
            try {
                Assert.assertFalse(kafkaStreams.close(closeOptions));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupFalse() {
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(-1L));
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                Assert.assertThrows(IllegalArgumentException.class, () -> {
                    kafkaStreams.close(closeOptions);
                });
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotBlockInCloseWithCloseOptionLeaveGroupFalseForZeroDuration() {
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ZERO);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier);
        Throwable th = null;
        try {
            try {
                Assert.assertFalse(kafkaStreams.close(closeOptions));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupTrueWhenThreadsHaventTerminated() {
        MockConsumer mockConsumer = (MockConsumer) Mockito.mock(MockConsumer.class, Mockito.withSettings().useConstructor(new Object[]{OffsetResetStrategy.EARLIEST}));
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.of("test-instance-id"));
        Mockito.when(mockConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(this.adminClient);
        Mockito.when(mockClientSupplier.getConsumer((Map) ArgumentMatchers.any())).thenReturn(mockConsumer);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(10L));
        closeOptions.leaveGroup(true);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier);
        Throwable th = null;
        try {
            try {
                Assert.assertFalse(kafkaStreams.close(closeOptions));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupTrue() {
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(this.adminClient);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(-1L));
        closeOptions.leaveGroup(true);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier, this.time);
        Throwable th = null;
        try {
            try {
                Assert.assertThrows(IllegalArgumentException.class, () -> {
                    kafkaStreams.close(closeOptions);
                });
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() {
        MockConsumer mockConsumer = (MockConsumer) Mockito.mock(MockConsumer.class, Mockito.withSettings().useConstructor(new Object[]{OffsetResetStrategy.EARLIEST}));
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata) Mockito.mock(ConsumerGroupMetadata.class);
        Mockito.when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.of("test-instance-id"));
        Mockito.when(mockConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(this.adminClient);
        Mockito.when(mockClientSupplier.getConsumer((Map) ArgumentMatchers.any())).thenReturn(mockConsumer);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ZERO);
        closeOptions.leaveGroup(true);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier);
        Throwable th = null;
        try {
            try {
                Assert.assertFalse(kafkaStreams.close(closeOptions));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
        MockedStatic mockStatic = Mockito.mockStatic(Executors.class);
        Throwable th = null;
        try {
            ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
            ScheduledExecutorService scheduledExecutorService2 = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
            mockStatic.when(() -> {
                Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
            }).thenReturn(scheduledExecutorService, new Object[]{scheduledExecutorService2});
            new StreamsBuilder().table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store"));
            this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name());
            KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
            Throwable th2 = null;
            try {
                try {
                    kafkaStreams.start();
                    if (kafkaStreams != null) {
                        if (0 != 0) {
                            try {
                                kafkaStreams.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            kafkaStreams.close();
                        }
                    }
                    mockStatic.verify(() -> {
                        Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
                    }, Mockito.times(NUM_THREADS));
                    ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService2)).scheduleAtFixedRate((Runnable) ArgumentMatchers.any(RocksDBMetricsRecordingTrigger.class), Mockito.eq(0L), Mockito.eq(1L), (TimeUnit) Mockito.eq(TimeUnit.MINUTES));
                    ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService2)).shutdownNow();
                    if (mockStatic != null) {
                        if (0 == 0) {
                            mockStatic.close();
                            return;
                        }
                        try {
                            mockStatic.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (kafkaStreams != null) {
                    if (th2 != null) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (mockStatic != null) {
                if (0 != 0) {
                    try {
                        mockStatic.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    mockStatic.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldGetClientSupplierFromConfigForConstructor() {
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.spy(new StreamsConfig(this.props));
        Mockito.when(streamsConfig.getKafkaClientSupplier()).thenReturn(this.supplier);
        new KafkaStreams(getBuilderWithSource().build(), streamsConfig);
        ((StreamsConfig) Mockito.verify(streamsConfig, Mockito.times(NUM_THREADS))).getKafkaClientSupplier();
    }

    @Test
    public void shouldGetClientSupplierFromConfigForConstructorWithTime() {
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.spy(new StreamsConfig(this.props));
        Mockito.when(streamsConfig.getKafkaClientSupplier()).thenReturn(this.supplier);
        new KafkaStreams(getBuilderWithSource().build(), streamsConfig, this.time);
        ((StreamsConfig) Mockito.verify(streamsConfig, Mockito.times(NUM_THREADS))).getKafkaClientSupplier();
    }

    @Test
    public void shouldUseProvidedClientSupplier() {
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.spy(new StreamsConfig(this.props));
        new KafkaStreams(getBuilderWithSource().build(), streamsConfig, this.supplier);
        ((StreamsConfig) Mockito.verify(streamsConfig, Mockito.times(0))).getKafkaClientSupplier();
    }

    @Test
    public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
        MockedStatic mockStatic = Mockito.mockStatic(Executors.class);
        Throwable th = null;
        try {
            mockStatic.when(() -> {
                Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
            }).thenReturn((ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class));
            new StreamsBuilder().table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store"));
            this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.name());
            KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
            Throwable th2 = null;
            try {
                kafkaStreams.start();
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                mockStatic.verify(() -> {
                    Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
                });
                if (mockStatic != null) {
                    if (0 == 0) {
                        mockStatic.close();
                        return;
                    }
                    try {
                        mockStatic.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (mockStatic != null) {
                if (0 != 0) {
                    try {
                        mockStatic.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    mockStatic.close();
                }
            }
            throw th7;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r12v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r12v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 12, insn: 0x0109: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r12 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:68:0x0109 */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x010e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:70:0x010e */
    /* JADX WARN: Type inference failed for: r12v0, types: [org.mockito.MockedConstruction] */
    /* JADX WARN: Type inference failed for: r13v0, types: [java.lang.Throwable] */
    @Test
    public void shouldCleanupOldStateDirs() {
        ?? r12;
        ?? r13;
        MockedStatic mockStatic = Mockito.mockStatic(Executors.class);
        Throwable th = null;
        try {
            try {
                ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
                mockStatic.when(() -> {
                    Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
                }).thenReturn(scheduledExecutorService);
                MockedConstruction mockConstruction = Mockito.mockConstruction(StateDirectory.class, (stateDirectory, context) -> {
                    Mockito.when(stateDirectory.initializeProcessId()).thenReturn(UUID.randomUUID());
                });
                Throwable th2 = null;
                this.props.setProperty("state.cleanup.delay.ms", "1");
                StreamsBuilder streamsBuilder = new StreamsBuilder();
                streamsBuilder.table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store"));
                KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
                Throwable th3 = null;
                try {
                    try {
                        kafkaStreams.start();
                        if (kafkaStreams != null) {
                            if (0 != 0) {
                                try {
                                    kafkaStreams.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                kafkaStreams.close();
                            }
                        }
                        if (mockConstruction != null) {
                            if (0 != 0) {
                                try {
                                    mockConstruction.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                mockConstruction.close();
                            }
                        }
                        ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService)).scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), Mockito.eq(1L), Mockito.eq(1L), (TimeUnit) Mockito.eq(TimeUnit.MILLISECONDS));
                        ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService)).shutdownNow();
                        if (mockStatic != null) {
                            if (0 == 0) {
                                mockStatic.close();
                                return;
                            }
                            try {
                                mockStatic.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th3 = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (kafkaStreams != null) {
                        if (th3 != null) {
                            try {
                                kafkaStreams.close();
                            } catch (Throwable th9) {
                                th3.addSuppressed(th9);
                            }
                        } else {
                            kafkaStreams.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (r12 != 0) {
                    if (r13 != 0) {
                        try {
                            r12.close();
                        } catch (Throwable th11) {
                            r13.addSuppressed(th11);
                        }
                    } else {
                        r12.close();
                    }
                }
                throw th10;
            }
        } catch (Throwable th12) {
            if (mockStatic != null) {
                if (0 != 0) {
                    try {
                        mockStatic.close();
                    } catch (Throwable th13) {
                        th.addSuppressed(th13);
                    }
                } else {
                    mockStatic.close();
                }
            }
            throw th12;
        }
    }

    @Test
    public void statelessTopologyShouldNotCreateStateDirectory() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        String str = safeUniqueTestName + "-input";
        String str2 = safeUniqueTestName + "-output";
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{str}).addProcessor("process", () -> {
            return new Processor<String, String, String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.1
                private ProcessorContext context;

                public void init(ProcessorContext<String, String> processorContext) {
                    this.context = processorContext;
                }

                public void process(Record<String, String> record) {
                    if (((String) record.value()).length() % KafkaStreamsTest.NUM_THREADS == 0) {
                        this.context.forward(record.withValue(((String) record.key()) + ((String) record.value())));
                    }
                }
            };
        }, new String[]{"source"}).addSink("sink", str2, new StringSerializer(), new StringSerializer(), new String[]{"process"});
        startStreamsAndCheckDirExists(topology, false);
    }

    @Test
    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        startStreamsAndCheckDirExists(getStatefulTopology(safeUniqueTestName + "-input", safeUniqueTestName + "-output", safeUniqueTestName + "-global", safeUniqueTestName + "-counts", safeUniqueTestName + "-globalStore", false), false);
    }

    @Test
    public void statefulTopologyShouldCreateStateDirectory() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        startStreamsAndCheckDirExists(getStatefulTopology(safeUniqueTestName + "-input", safeUniqueTestName + "-output", safeUniqueTestName + "-global", safeUniqueTestName + "-counts", safeUniqueTestName + "-globalStore", true), true);
    }

    @Test
    public void shouldThrowTopologyExceptionOnEmptyTopology() {
        try {
            new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
            Assert.fail("Should have thrown TopologyException");
        } catch (TopologyException e) {
            MatcherAssert.assertThat(e.getMessage(), Matchers.equalTo("Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table."));
        }
    }

    @Test
    public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(0));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedException {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(0));
                Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
                kafkaStreams.start();
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.RUNNING;
                }, "Streams never started, state is " + kafkaStreams.state());
                kafkaStreams.close();
                TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
                }, "Streams never stopped.");
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowOnClientInstanceIdsWithNegativeTimeout() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ofMillis(-1L));
            })).getMessage(), Matchers.equalTo("The timeout cannot be negative."));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowOnClientInstanceIdsWhenNotStarted() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            })).getMessage(), Matchers.equalTo("KafkaStreams has not been started, you can retry after calling start()."));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowOnClientInstanceIdsWhenClosed() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.close();
            MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            })).getMessage(), Matchers.equalTo("KafkaStreams has been stopped (NOT_RUNNING)."));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowStreamsExceptionWhenAdminNotInitialized() {
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            });
            MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not retrieve admin client instance id."));
            Throwable cause = assertThrows.getCause();
            MatcherAssert.assertThat(cause, Matchers.instanceOf(UnsupportedOperationException.class));
            MatcherAssert.assertThat(cause.getMessage(), Matchers.equalTo("clientInstanceId not set"));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotCrashButThrowLaterIfAdminTelemetryDisabled() {
        this.adminClient.disableTelemetry();
        this.props.put("num.stream.threads", 0);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            ClientInstanceIds clientInstanceIds = kafkaStreams.clientInstanceIds(Duration.ZERO);
            clientInstanceIds.getClass();
            MatcherAssert.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, clientInstanceIds::adminInstanceId)).getMessage(), Matchers.equalTo("Telemetry is not enabled on the admin client. Set config `enable.metrics.push` to `true`."));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowTimeExceptionWhenAdminTimesOut() {
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        this.adminClient.injectTimeoutException(1);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            Assert.assertThrows(TimeoutException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            });
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReturnAdminInstanceID() {
        Uuid randomUuid = Uuid.randomUuid();
        this.adminClient.setClientInstanceId(randomUuid);
        this.props.put("num.stream.threads", 0);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                MatcherAssert.assertThat(kafkaStreams.clientInstanceIds(Duration.ZERO).adminInstanceId(), Matchers.equalTo(randomUuid));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenMainConsumerFutureDoesNotComplete() {
        Mockito.when(this.streamThreadOne.consumerClientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(Collections.singletonMap("consumer", new KafkaFutureImpl()));
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            TimeoutException assertThrows = Assert.assertThrows(TimeoutException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            });
            MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not retrieve consumer instance id for consumer."));
            MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(java.util.concurrent.TimeoutException.class));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenGlobalConsumerFutureDoesNotComplete() throws Exception {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(Collections.emptyMap());
        Mockito.when(this.streamThreadOne.producersClientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(kafkaFutureImpl);
        Mockito.when(this.streamThreadTwo.producersClientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(kafkaFutureImpl);
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                Mockito.when(((GlobalStreamThread) this.globalStreamThreadMockedConstruction.constructed().get(0)).globalConsumerInstanceId((Duration) ArgumentMatchers.any())).thenReturn(new KafkaFutureImpl());
                TimeoutException assertThrows = Assert.assertThrows(TimeoutException.class, () -> {
                    kafkaStreams.clientInstanceIds(Duration.ZERO);
                });
                MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not retrieve global consumer client instance id."));
                MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(java.util.concurrent.TimeoutException.class));
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenThreadProducerFutureDoesNotComplete() {
        Mockito.when(this.streamThreadOne.producersClientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(new KafkaFutureImpl());
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            kafkaStreams.start();
            TimeoutException assertThrows = Assert.assertThrows(TimeoutException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            });
            MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not retrieve producer instance id for processId-StreamThread-1."));
            MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(java.util.concurrent.TimeoutException.class));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldCountDownTimeoutAcrossClient() {
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        this.adminClient.advanceTimeOnClientInstanceId(this.time, Duration.ofMillis(10L).toMillis());
        final MockTime mockTime = this.time;
        final AtomicLong atomicLong = new AtomicLong(50L);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean4 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean5 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean6 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean7 = new AtomicBoolean(false);
        Mockito.when(this.streamThreadOne.consumerClientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(Collections.singletonMap("consumer1", new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Uuid m1get(long j, TimeUnit timeUnit) {
                atomicBoolean.set(true);
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-10L))));
                mockTime.sleep(10L);
                return null;
            }
        }));
        Mockito.when(this.streamThreadTwo.consumerClientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(Collections.singletonMap("consumer2", new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.3
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Uuid m2get(long j, TimeUnit timeUnit) {
                atomicBoolean2.set(true);
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-5L))));
                mockTime.sleep(5L);
                return null;
            }
        }));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(Collections.singletonMap("threadProducer", new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.4
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Uuid m3get(long j, TimeUnit timeUnit) {
                atomicBoolean3.set(true);
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-9L))));
                mockTime.sleep(9L);
                return null;
            }
        }));
        Mockito.when(this.streamThreadOne.producersClientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(kafkaFutureImpl);
        KafkaFutureImpl<Map<String, KafkaFuture<Uuid>>> kafkaFutureImpl2 = new KafkaFutureImpl<Map<String, KafkaFuture<Uuid>>>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.5
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Map<String, KafkaFuture<Uuid>> m4get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
                atomicBoolean4.set(true);
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-7L))));
                mockTime.sleep(7L);
                return (Map) super.get(j, timeUnit);
            }
        };
        kafkaFutureImpl2.complete(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("task1", new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.6
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Uuid m5get(long j, TimeUnit timeUnit) {
                atomicBoolean5.set(true);
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-4L))));
                mockTime.sleep(4L);
                return null;
            }
        }), Utils.mkEntry("task2", new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.7
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Uuid m6get(long j, TimeUnit timeUnit) {
                atomicBoolean6.set(true);
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-6L))));
                mockTime.sleep(6L);
                return null;
            }
        })}));
        Mockito.when(this.streamThreadTwo.producersClientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(kafkaFutureImpl2);
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                Mockito.when(((GlobalStreamThread) this.globalStreamThreadMockedConstruction.constructed().get(0)).globalConsumerInstanceId((Duration) ArgumentMatchers.any())).thenReturn(new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.8
                    /* renamed from: get, reason: merged with bridge method [inline-methods] */
                    public Uuid m7get(long j, TimeUnit timeUnit) {
                        atomicBoolean7.set(true);
                        MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-8L))));
                        mockTime.sleep(8L);
                        return null;
                    }
                });
                kafkaStreams.clientInstanceIds(Duration.ofMillis(60L));
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.equalTo(true));
                MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.equalTo(true));
                MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean3.get()), Matchers.equalTo(true));
                MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean4.get()), Matchers.equalTo(true));
                MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean5.get()), Matchers.equalTo(true));
                MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean6.get()), Matchers.equalTo(true));
                MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean7.get()), Matchers.equalTo(true));
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th3;
        }
    }

    @Deprecated
    private Topology getStatefulTopology(String str, String str2, String str3, String str4, String str5, boolean z) {
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str4) : Stores.inMemoryKeyValueStore(str4), Serdes.String(), Serdes.Long());
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{str}).addProcessor("process", () -> {
            return new Processor<String, String, String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.9
                private ProcessorContext context;

                public void init(ProcessorContext<String, String> processorContext) {
                    this.context = processorContext;
                }

                public void process(Record<String, String> record) {
                    this.context.getStateStore(str4).put(record.key(), 5L);
                    this.context.forward(record.withValue("5"));
                    this.context.commit();
                }
            };
        }, new String[]{"source"}).addStateStore(keyValueStoreBuilder, new String[]{"process"}).addSink("sink", str2, new StringSerializer(), new StringSerializer(), new String[]{"process"});
        topology.addGlobalStore(Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str5) : Stores.inMemoryKeyValueStore(str5), Serdes.String(), Serdes.String()).withLoggingDisabled(), "global", Serdes.String().deserializer(), Serdes.String().deserializer(), str3, str3 + "-processor", new MockProcessorSupplier());
        return topology;
    }

    private StreamsBuilder getBuilderWithSource() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("source-topic");
        return streamsBuilder;
    }

    private void startStreamsAndCheckDirExists(Topology topology, boolean z) {
        MockedConstruction mockConstruction = Mockito.mockConstruction(StateDirectory.class, (stateDirectory, context) -> {
            Mockito.when(stateDirectory.initializeProcessId()).thenReturn(UUID.randomUUID());
            Assert.assertEquals(4L, context.arguments().size());
            Assert.assertEquals(Boolean.valueOf(z), context.arguments().get(NUM_THREADS));
        });
        Throwable th = null;
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(topology, this.props, this.supplier, this.time);
            Throwable th2 = null;
            try {
                try {
                    Assert.assertFalse(mockConstruction.constructed().isEmpty());
                    if (kafkaStreams != null) {
                        if (0 != 0) {
                            try {
                                kafkaStreams.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            kafkaStreams.close();
                        }
                    }
                    if (mockConstruction != null) {
                        if (0 == 0) {
                            mockConstruction.close();
                            return;
                        }
                        try {
                            mockConstruction.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (kafkaStreams != null) {
                    if (th2 != null) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (mockConstruction != null) {
                if (0 != 0) {
                    try {
                        mockConstruction.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    mockConstruction.close();
                }
            }
            throw th8;
        }
    }
}
