package com.linkedin.kafka.cruisecontrol;

import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import io.confluent.common.EndPoint;
import io.confluent.kafka.test.cluster.EmbeddedKafka;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.cluster.BrokerEndPoint;
import kafka.utils.MockTime;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.Timeout;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/BrokerShutdownIntegrationTestHarness.class */
public abstract class BrokerShutdownIntegrationTestHarness {
    protected EmbeddedKafkaCluster kafkaCluster;
    private EmbeddedKafka brokerToShutdown;
    private BlockingSendClient blockingSendClient;
    protected Time clusterTime;

    @Rule
    public final Timeout globalTimeout = Timeout.millis(120000);
    private int brokerToShutdownId = 0;
    private AtomicBoolean exited = new AtomicBoolean(false);

    @Before
    public void setUp() throws Exception {
        Exit.setExitProcedure((i, str) -> {
            this.exited.set(true);
            exitProcedure();
        });
        this.clusterTime = new MockTime(System.currentTimeMillis(), System.nanoTime());
        this.kafkaCluster = new EmbeddedKafkaCluster(this.clusterTime);
        this.kafkaCluster.startZooKeeper();
        this.kafkaCluster.startBrokers(numBrokers(), new Properties());
        this.brokerToShutdown = (EmbeddedKafka) this.kafkaCluster.kafkas().get(this.brokerToShutdownId);
        EndPoint endPoint = this.brokerToShutdown.endPoint();
        this.blockingSendClient = new BlockingSendClient.Builder(this.brokerToShutdown.kafkaServer().config(), new SystemTime(), "client-id", new LogContext()).build(new BrokerEndPoint(this.brokerToShutdownId, endPoint.host(), endPoint.port()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int numBrokers() {
        return 3;
    }

    protected void exitProcedure() {
    }

    @After
    public void tearDown() {
        if (this.kafkaCluster != null) {
            this.kafkaCluster.shutdown();
        }
        Exit.resetExitProcedure();
        this.exited.set(false);
    }

    public void assertBrokerShutdown() throws InterruptedException {
        TestUtils.waitForCondition(this::brokerExitCalled, "Expected Kafka server to exit");
    }

    public boolean brokerExitCalled() {
        return this.exited.get();
    }

    public EmbeddedKafka brokerToShutdown() {
        return this.brokerToShutdown;
    }

    public Long brokerToShutdownCurrentEpoch() {
        return Long.valueOf(brokerToShutdown().kafkaServer().kafkaController().brokerEpoch());
    }

    public int brokerToShutdownId() {
        return this.brokerToShutdownId;
    }

    public BlockingSendClient blockingSendClient() {
        return this.blockingSendClient;
    }
}
