package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.tests.SmokeTestClient;
import org.apache.kafka.streams.tests.SmokeTestDriver;
import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.class */
public class SmokeTestDriverIntegrationTest {

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    /* loaded from: input_file:org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest$Driver.class */
    private static class Driver extends Thread {
        private final String bootstrapServers;
        private final int numKeys;
        private final int maxRecordsPerKey;
        private Exception exception;
        private SmokeTestDriver.VerificationResult result;

        private Driver(String str, int i, int i2) {
            this.exception = null;
            this.bootstrapServers = str;
            this.numKeys = i;
            this.maxRecordsPerKey = i2;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.result = SmokeTestDriver.verify(this.bootstrapServers, SmokeTestDriver.generate(this.bootstrapServers, this.numKeys, this.maxRecordsPerKey, Duration.ofSeconds(20L)), this.maxRecordsPerKey);
            } catch (Exception e) {
                this.exception = e;
            }
        }

        public Exception exception() {
            return this.exception;
        }

        SmokeTestDriver.VerificationResult result() {
            return this.result;
        }
    }

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Test
    public void shouldWorkWithRebalance() throws InterruptedException {
        Exit.setExitProcedure((i, str) -> {
            throw new AssertionError("Test called exit(). code:" + i + " message:" + str);
        });
        Exit.setHaltProcedure((i2, str2) -> {
            throw new AssertionError("Test called halt(). code:" + i2 + " message:" + str2);
        });
        int i3 = 0;
        ArrayList arrayList = new ArrayList();
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, SmokeTestDriver.topics());
        String bootstrapServers = CLUSTER.bootstrapServers();
        Driver driver = new Driver(bootstrapServers, 10, 1000);
        driver.start();
        System.out.println("started driver");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("session.timeout.ms", 10000);
        while (driver.isAlive()) {
            Thread.sleep(1000L);
            int i4 = i3;
            i3++;
            SmokeTestClient smokeTestClient = new SmokeTestClient("streams-" + i4);
            arrayList.add(smokeTestClient);
            smokeTestClient.start(properties);
            if (arrayList.size() >= 3) {
                SmokeTestClient smokeTestClient2 = (SmokeTestClient) arrayList.remove(0);
                smokeTestClient2.closeAsync();
                while (!smokeTestClient2.closed()) {
                    Thread.sleep(100L);
                }
            }
        }
        try {
            driver.join();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((SmokeTestClient) it.next()).closeAsync();
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                SmokeTestClient smokeTestClient3 = (SmokeTestClient) it2.next();
                while (!smokeTestClient3.closed()) {
                    Thread.sleep(100L);
                }
            }
            if (driver.exception() != null) {
                driver.exception().printStackTrace();
                throw new AssertionError(driver.exception());
            }
            Assert.assertTrue(driver.result().result(), driver.result().passed());
        } catch (Throwable th) {
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                ((SmokeTestClient) it3.next()).closeAsync();
            }
            Iterator it4 = arrayList.iterator();
            while (it4.hasNext()) {
                SmokeTestClient smokeTestClient4 = (SmokeTestClient) it4.next();
                while (!smokeTestClient4.closed()) {
                    Thread.sleep(100L);
                }
            }
            throw th;
        }
    }
}
