package kafka.server;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.api.IntegrationTestHarness;
import kafka.cluster.Partition;
import kafka.controller.OfflineReplica$;
import kafka.controller.PartitionAndReplica;
import kafka.utils.CoreUtils$;
import kafka.utils.Exit$;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import kafka.utils.TestUtils$Checkpoint$;
import kafka.utils.TestUtils$Roll$;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.event.Level;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.ArrayOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.VolatileObjectRef;
import scala.runtime.java8.JFunction0;

/* compiled from: LogDirFailureTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005ee\u0001B\r\u001b\u0001}AQA\n\u0001\u0005\u0002\u001dBqA\u000b\u0001C\u0002\u0013\u00051\u0006\u0003\u00043\u0001\u0001\u0006I\u0001\f\u0005\bg\u0001\u0011\r\u0011\"\u0001,\u0011\u0019!\u0004\u0001)A\u0005Y!9Q\u0007\u0001b\u0001\n\u0003Y\u0003B\u0002\u001c\u0001A\u0003%A\u0006C\u00048\u0001\t\u0007I\u0011\u0002\u001d\t\r\u0005\u0003\u0001\u0015!\u0003:\u0011\u001d\u0011\u0005A1A\u0005\n-Baa\u0011\u0001!\u0002\u0013a\u0003b\u0002#\u0001\u0005\u0004%\te\u000b\u0005\u0007\u000b\u0002\u0001\u000b\u0011\u0002\u0017\t\u000b\u0019\u0003A\u0011I$\t\u000bq\u0003A\u0011A/\t\u000by\u0004A\u0011A@\t\u000f\u0005%\u0001\u0001\"\u0001\u0002\f!9\u0011Q\u0003\u0001\u0005\u0002\u0005]\u0001bBA\u0011\u0001\u0011\u0005\u00111\u0005\u0005\b\u0003[\u0001A\u0011AA\u0018\u0011\u001d\tI\u0004\u0001C\u0001\u0003wAq!!\u0012\u0001\t\u0003\t9\u0005C\u0004\u0002h\u0001!\t!!\u001b\t\u000f\u0005=\u0004\u0001\"\u0003\u0002r\t\tBj\\4ESJ4\u0015-\u001b7ve\u0016$Vm\u001d;\u000b\u0005ma\u0012AB:feZ,'OC\u0001\u001e\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u0011\u0011\u0005\u0005\"S\"\u0001\u0012\u000b\u0005\rb\u0012aA1qS&\u0011QE\t\u0002\u0017\u0013:$Xm\u001a:bi&|g\u000eV3ti\"\u000b'O\\3tg\u00061A(\u001b8jiz\"\u0012\u0001\u000b\t\u0003S\u0001i\u0011AG\u0001\u000eaJ|G-^2fe\u000e{WO\u001c;\u0016\u00031\u0002\"!\f\u0019\u000e\u00039R\u0011aL\u0001\u0006g\u000e\fG.Y\u0005\u0003c9\u00121!\u00138u\u00039\u0001(o\u001c3vG\u0016\u00148i\\;oi\u0002\nQbY8ogVlWM]\"pk:$\u0018AD2p]N,X.\u001a:D_VtG\u000fI\u0001\fEJ|7.\u001a:D_VtG/\u0001\u0007ce>\\WM]\"pk:$\b%A\u0003u_BL7-F\u0001:!\tQt(D\u0001<\u0015\taT(\u0001\u0003mC:<'\"\u0001 \u0002\t)\fg/Y\u0005\u0003\u0001n\u0012aa\u0015;sS:<\u0017A\u0002;pa&\u001c\u0007%\u0001\u0007qCJ$\u0018\u000e^5p]:+X.A\u0007qCJ$\u0018\u000e^5p]:+X\u000eI\u0001\fY><G)\u001b:D_VtG/\u0001\u0007m_\u001e$\u0015N]\"pk:$\b%A\u0003tKR,\u0006\u000f\u0006\u0002I\u0017B\u0011Q&S\u0005\u0003\u0015:\u0012A!\u00168ji\")AJ\u0004a\u0001\u001b\u0006AA/Z:u\u0013:4w\u000e\u0005\u0002O-6\tqJ\u0003\u0002$!*\u0011\u0011KU\u0001\bUV\u0004\u0018\u000e^3s\u0015\t\u0019F+A\u0003kk:LGOC\u0001V\u0003\ry'oZ\u0005\u0003/>\u0013\u0001\u0002V3ti&sgm\u001c\u0015\u0003\u001de\u0003\"A\u0014.\n\u0005m{%A\u0003\"fM>\u0014X-R1dQ\u0006!C/Z:u!J|G-^2f\u000bJ\u0014xN\u001d$s_64\u0015-\u001b7ve\u0016|e\u000eT8h%>dG\u000e\u0006\u0002I=\")ql\u0004a\u0001A\u00061\u0011/^8sk6\u0004\"!\u00195\u000f\u0005\t4\u0007CA2/\u001b\u0005!'BA3\u001f\u0003\u0019a$o\\8u}%\u0011qML\u0001\u0007!J,G-\u001a4\n\u0005\u0001K'BA4/Q\ty1\u000e\u0005\u0002m_6\tQN\u0003\u0002o!\u00061\u0001/\u0019:b[NL!\u0001]7\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH\u000f\u000b\u0003\u0010ebL\bCA:w\u001b\u0005!(BA;n\u0003!\u0001(o\u001c<jI\u0016\u0014\u0018BA<u\u0005-1\u0016\r\\;f'>,(oY3\u0002\u000fM$(/\u001b8hg2\u0012!\u0010`\u0011\u0002w\u0006\u0011!p[\u0011\u0002{\u0006)1N]1gi\u0006iB/Z:u\u0019><G)\u001b:O_RLg-[2bi&|g\u000eV5nK>,H\u000fF\u0002I\u0003\u0003AQa\u0018\tA\u0002\u0001D#\u0001E6)\u000bA\u0011\b0a\u0002-\u0003q\fA\u0004^3ti&{U\t_2faRLwN\u001c#ve&tw\rT8h%>dG\u000eF\u0002I\u0003\u001bAQaX\tA\u0002\u0001D#!E6)\u000bE\u0011\b0a\u0005-\u0005id\u0018a\u0010;fgRT6N\u0011:pW\u0016\u0014x+\u001b;i\u001f2$\u0017J\u001c;fe\n\u0013xn[3s!J|Go\\2pYNCw.\u001e7e\u0011\u0006dGo\u00148M_\u001e$\u0015N\u001d$bS2,(/\u001a\u000b\u0002\u0011\"\u001a!#a\u0007\u0011\u00079\u000bi\"C\u0002\u0002 =\u0013A\u0001V3ti\u00069C/Z:u!J|G-^2f\u000bJ\u0014xN\u001d$s_64\u0015-\u001b7ve\u0016|en\u00115fG.\u0004x.\u001b8u)\rA\u0015Q\u0005\u0005\u0006?N\u0001\r\u0001\u0019\u0015\u0003'-DSa\u0005:y\u0003Wa#A\u001f?\u0002?Q,7\u000f^%P\u000bb\u001cW\r\u001d;j_:$UO]5oO\u000eCWmY6q_&tG\u000fF\u0002I\u0003cAQa\u0018\u000bA\u0002\u0001D#\u0001F6)\u000bQ\u0011\b0a\u000e-\u0005id\u0018\u0001\u000e;fgR\u0014V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bI\u00063G/\u001a:M_\u001e$\u0015N\u001d$bS2,(/Z(o\r>dGn\\<feR\u0019\u0001*!\u0010\t\u000b}+\u0002\u0019\u00011)\u0005UY\u0007&B\u000bsq\u0006\rCF\u0001>}\u0003)\"Xm\u001d;Qe>$WoY3FeJ|'o\u001d$s_6dun\u001a#je\u001a\u000b\u0017\u000e\\;sK>sG*Z1eKJ$2\u0001SA%\u0011\u001d\tYE\u0006a\u0001\u0003\u001b\n1BZ1jYV\u0014X\rV=qKB!\u0011qJA1\u001d\u0011\t\t&a\u0017\u000f\t\u0005M\u0013q\u000b\b\u0004G\u0006U\u0013\"A\u000f\n\u0007\u0005eC$A\u0003vi&d7/\u0003\u0003\u0002^\u0005}\u0013!\u0003+fgR,F/\u001b7t\u0015\r\tI\u0006H\u0005\u0005\u0003G\n)GA\tM_\u001e$\u0015N\u001d$bS2,(/\u001a+za\u0016TA!!\u0018\u0002`\u0005)C/Z:u!J|G-^2f\u0003\u001a$XM\u001d'pO\u0012K'OR1jYV\u0014Xm\u00148MK\u0006$WM\u001d\u000b\u0006\u0011\u0006-\u0014Q\u000e\u0005\b\u0003\u0017:\u0002\u0019AA'\u0011\u0015yv\u00031\u0001a\u0003u\u0019XOY:de&\u0014W-\u00118e/\u0006LGOR8s\u0003N\u001c\u0018n\u001a8nK:$H#\u0002%\u0002t\u0005U\u0004\"B\u001c\u0019\u0001\u0004\u0001\u0007bBA<1\u0001\u0007\u0011\u0011P\u0001\tG>t7/^7feBA\u00111PAE\u0003\u001b\u000bi)\u0004\u0002\u0002~)!\u0011qOA@\u0015\u0011\t\t)a!\u0002\u000f\rd\u0017.\u001a8ug*\u0019Q$!\"\u000b\u0007\u0005\u001dE+\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0005\u0003\u0017\u000biH\u0001\u0005D_:\u001cX/\\3s!\u0015i\u0013qRAJ\u0013\r\t\tJ\f\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0004[\u0005U\u0015bAAL]\t!!)\u001f;f\u0001")
/* loaded from: input_file:kafka/server/LogDirFailureTest.class */
public class LogDirFailureTest extends IntegrationTestHarness {
    private final int producerCount = 1;
    private final int consumerCount = 1;
    private final int brokerCount = 2;
    private final String topic = "topic";
    private final int partitionNum = 12;
    private final int logDirCount = 3;

    public int producerCount() {
        return this.producerCount;
    }

    public int consumerCount() {
        return this.consumerCount;
    }

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return this.brokerCount;
    }

    private String topic() {
        return this.topic;
    }

    private int partitionNum() {
        return this.partitionNum;
    }

    @Override // kafka.api.IntegrationTestHarness
    public int logDirCount() {
        return this.logDirCount;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        createTopic(topic(), partitionNum(), brokerCount(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        ensureConsistentKRaftMetadata();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testProduceErrorFromFailureOnLogRoll(String str) {
        testProduceErrorsFromLogDirFailureOnLeader(TestUtils$Roll$.MODULE$);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest
    public void testLogDirNotificationTimeout(String str) {
        producerConfig().setProperty("retries", "0");
        producerConfig().setProperty("enable.idempotence", "false");
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        int id = ((PartitionInfo) CollectionConverters$.MODULE$.ListHasAsScala(createProducer.partitionsFor(topic())).asScala().find(partitionInfo -> {
            return BoxesRunTime.boxToBoolean($anonfun$testLogDirNotificationTimeout$1(partitionInfo));
        }).get()).leader().id();
        KafkaBroker kafkaBroker = (KafkaBroker) brokers().find(kafkaBroker2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testLogDirNotificationTimeout$2(id, kafkaBroker2));
        }).get();
        controllerServer().shutdown();
        controllerServer().awaitShutdown();
        TestUtils$.MODULE$.causeLogDirFailure(TestUtils$Checkpoint$.MODULE$, kafkaBroker, topicPartition);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testLogDirNotificationTimeout$3(kafkaBroker)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail($anonfun$testLogDirNotificationTimeout$4(kafkaBroker));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        kafkaBroker.awaitShutdown();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testIOExceptionDuringLogRoll(String str) {
        testProduceAfterLogDirFailureOnLeader(TestUtils$Roll$.MODULE$, str);
    }

    @Test
    public void testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure() {
        int brokerCount = brokerCount();
        String zkConnect = zkConnect();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        Option<SecurityProtocol> option = None$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        Option<File> option2 = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        Option<Properties> option3 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        Option<String> option4 = None$.MODULE$;
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(brokerCount, zkConnect, true, true, RandomPort, option, option2, option3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, option4, 3, false, 1, (short) 1, false);
        createBrokerConfig.put("inter.broker.protocol.version", "0.11.0");
        createBrokerConfig.put(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, "0.11.0");
        VolatileObjectRef create = VolatileObjectRef.create(None$.MODULE$);
        Exit$ exit$ = Exit$.MODULE$;
        Exit.setHaltProcedure(new Exit$.anon.1((obj, option5) -> {
            return $anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$1(create, BoxesRunTime.unboxToInt(obj), option5);
        }));
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaServer createServer = testUtils$15.createServer(fromProps, Time.SYSTEM, None$.MODULE$, 0, true, false);
        try {
            File file = new File((String) fromProps.logDirs().head());
            CoreUtils$ coreUtils$ = CoreUtils$.MODULE$;
            JFunction0.mcV.sp spVar = () -> {
                Utils.delete(file);
            };
            CoreUtils$ coreUtils$2 = CoreUtils$.MODULE$;
            coreUtils$.swallow(spVar, this, Level.WARN);
            Files.createFile(file.toPath(), new FileAttribute[0]);
            Assertions.assertTrue(file.isFile());
            TestUtils$ testUtils$17 = TestUtils$.MODULE$;
            TestUtils$ testUtils$18 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!((Option) create.elem).contains(BoxesRunTime.boxToInteger(1))) {
                if (System.currentTimeMillis() > currentTimeMillis + 60000) {
                    Assertions.fail("timed out waiting for broker to halt");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(60000L), 100L));
            }
        } finally {
            if (createServer != null) {
                TestUtils$ testUtils$19 = TestUtils$.MODULE$;
                $colon.colon colonVar = new $colon.colon(createServer, Nil$.MODULE$);
                TestUtils$ testUtils$20 = TestUtils$.MODULE$;
                testUtils$19.shutdownServers(colonVar, true);
            }
            Exit$ exit$2 = Exit$.MODULE$;
            Exit.resetHaltProcedure();
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testProduceErrorFromFailureOnCheckpoint(String str) {
        testProduceErrorsFromLogDirFailureOnLeader(TestUtils$Checkpoint$.MODULE$);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testIOExceptionDuringCheckpoint(String str) {
        testProduceAfterLogDirFailureOnLeader(TestUtils$Checkpoint$.MODULE$, str);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testReplicaFetcherThreadAfterLogDirFailureOnFollower(String str) {
        producerConfig().setProperty("retries", "0");
        producerConfig().setProperty("enable.idempotence", "false");
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        PartitionInfo partitionInfo = (PartitionInfo) CollectionConverters$.MODULE$.ListHasAsScala(createProducer.partitionsFor(topic())).asScala().find(partitionInfo2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$1(partitionInfo2));
        }).get();
        int id = partitionInfo.leader().id();
        KafkaBroker kafkaBroker = (KafkaBroker) brokers().find(kafkaBroker2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$2(id, kafkaBroker2));
        }).get();
        int unboxToInt = BoxesRunTime.unboxToInt(ArrayOps$.MODULE$.find$extension(Predef$.MODULE$.intArrayOps((int[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(partitionInfo.replicas()), node -> {
            return BoxesRunTime.boxToInteger(node.id());
        }, ClassTag$.MODULE$.Int())), i -> {
            return i != id;
        }).get());
        KafkaBroker kafkaBroker3 = (KafkaBroker) brokers().find(kafkaBroker4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$5(unboxToInt, kafkaBroker4));
        }).get();
        kafkaBroker3.replicaManager().markPartitionOffline(topicPartition);
        int unboxToInt2 = BoxesRunTime.unboxToInt(RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(1), partitionNum()).find(i2 -> {
            return kafkaBroker.replicaManager().onlinePartition(new TopicPartition(this.topic(), i2)).flatMap(partition -> {
                return partition.leaderLogIfLocal();
            }).isDefined();
        }).get());
        createProducer.send(new ProducerRecord(topic(), Predef$.MODULE$.int2Integer(unboxToInt2), topic().getBytes(), "message".getBytes())).get();
        Assertions.assertEquals(brokerCount(), ((Partition) kafkaBroker.replicaManager().onlinePartition(new TopicPartition(topic(), unboxToInt2)).get()).inSyncReplicaIds().size());
        kafkaBroker3.replicaManager().replicaFetcherManager().fetcherThreadMap().values().foreach(replicaFetcherThread -> {
            $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$8(replicaFetcherThread);
            return BoxedUnit.UNIT;
        });
    }

    public void testProduceErrorsFromLogDirFailureOnLeader(TestUtils.LogDirFailureType logDirFailureType) {
        producerConfig().setProperty("retries", "0");
        producerConfig().setProperty("enable.idempotence", "false");
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        ProducerRecord producerRecord = new ProducerRecord(topic(), Predef$.MODULE$.int2Integer(0), "key".getBytes(), "value".getBytes());
        int id = ((PartitionInfo) CollectionConverters$.MODULE$.ListHasAsScala(createProducer.partitionsFor(topic())).asScala().find(partitionInfo -> {
            return BoxesRunTime.boxToBoolean($anonfun$testProduceErrorsFromLogDirFailureOnLeader$1(partitionInfo));
        }).get()).leader().id();
        TestUtils$.MODULE$.causeLogDirFailure(logDirFailureType, (KafkaBroker) brokers().find(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testProduceErrorsFromLogDirFailureOnLeader$2(id, kafkaBroker));
        }).get(), topicPartition);
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createProducer.send(producerRecord).get(6000L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertTrue((executionException.getCause() instanceof KafkaStorageException) || (executionException.getCause() instanceof NotLeaderOrFollowerException));
    }

    public void testProduceAfterLogDirFailureOnLeader(TestUtils.LogDirFailureType logDirFailureType, String str) {
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        subscribeAndWaitForAssignment(topic(), createConsumer);
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        ProducerRecord producerRecord = new ProducerRecord(topic(), Predef$.MODULE$.int2Integer(0), "key".getBytes(), "value".getBytes());
        int id = ((PartitionInfo) CollectionConverters$.MODULE$.ListHasAsScala(createProducer.partitionsFor(topic())).asScala().find(partitionInfo -> {
            return BoxesRunTime.boxToBoolean($anonfun$testProduceAfterLogDirFailureOnLeader$1(partitionInfo));
        }).get()).leader().id();
        KafkaBroker kafkaBroker = (KafkaBroker) brokers().find(kafkaBroker2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testProduceAfterLogDirFailureOnLeader$2(id, kafkaBroker2));
        }).get();
        createProducer.send(producerRecord).get();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        testUtils$.consumeRecords(createConsumer, 1, 15000L);
        File causeLogDirFailure = TestUtils$.MODULE$.causeLogDirFailure(logDirFailureType, kafkaBroker, topicPartition);
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testProduceAfterLogDirFailureOnLeader$3(this, createProducer, producerRecord, id)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Expected new leader for the partition");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        createProducer.send(producerRecord).get(6000L, TimeUnit.MILLISECONDS);
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        testUtils$6.pollUntilAtLeastNumRecords(createConsumer, 1, 15000L);
        if (str == null || !str.equals("kraft")) {
            Assertions.assertTrue(zkClient().getAllLogDirEventNotifications().isEmpty());
            Assertions.assertTrue(((KafkaServer) servers().find(kafkaServer -> {
                return BoxesRunTime.boxToBoolean($anonfun$testProduceAfterLogDirFailureOnLeader$13(kafkaServer));
            }).get()).kafkaController().controllerContext().replicasInState(topic(), OfflineReplica$.MODULE$).contains(new PartitionAndReplica(new TopicPartition(topic(), 0), id)));
            return;
        }
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testProduceAfterLogDirFailureOnLeader$6(this, id, causeLogDirFailure)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail("Expected to find an offline log dir");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private void subscribeAndWaitForAssignment(String str, Consumer<byte[], byte[]> consumer) {
        consumer.subscribe(Collections.singletonList(str));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            consumer.poll(Duration.ofMillis(100L));
            if ($anonfun$subscribeAndWaitForAssignment$1(consumer)) {
                return;
            }
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Expected non-empty assignment");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 0L));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testLogDirNotificationTimeout$1(PartitionInfo partitionInfo) {
        return partitionInfo.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testLogDirNotificationTimeout$2(int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId() == i;
    }

    public static final /* synthetic */ boolean $anonfun$testLogDirNotificationTimeout$3(KafkaBroker kafkaBroker) {
        BrokerState brokerState = kafkaBroker.brokerState();
        BrokerState brokerState2 = BrokerState.SHUTTING_DOWN;
        return brokerState == null ? brokerState2 == null : brokerState.equals(brokerState2);
    }

    public static final /* synthetic */ String $anonfun$testLogDirNotificationTimeout$4(KafkaBroker kafkaBroker) {
        return new StringBuilder(51).append("Expected broker to be in NOT_RUNNING state but was ").append(kafkaBroker.brokerState()).toString();
    }

    public static final /* synthetic */ Nothing$ $anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$1(VolatileObjectRef volatileObjectRef, int i, Option option) {
        volatileObjectRef.elem = new Some(BoxesRunTime.boxToInteger(i));
        throw new IllegalArgumentException();
    }

    public static final /* synthetic */ String $anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$4() {
        return "timed out waiting for broker to halt";
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$1(PartitionInfo partitionInfo) {
        return partitionInfo.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$2(int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId() == i;
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$5(int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId() == i;
    }

    public static final /* synthetic */ void $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$8(ReplicaFetcherThread replicaFetcherThread) {
        Assertions.assertFalse(replicaFetcherThread.isShutdownComplete(), "ReplicaFetcherThread should still be working if its partition count > 0");
    }

    public static final /* synthetic */ boolean $anonfun$testProduceErrorsFromLogDirFailureOnLeader$1(PartitionInfo partitionInfo) {
        return partitionInfo.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceErrorsFromLogDirFailureOnLeader$2(int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId() == i;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$1(PartitionInfo partitionInfo) {
        return partitionInfo.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$2(int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId() == i;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$4(PartitionInfo partitionInfo) {
        return partitionInfo.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$3(LogDirFailureTest logDirFailureTest, KafkaProducer kafkaProducer, ProducerRecord producerRecord, int i) {
        kafkaProducer.send(producerRecord);
        return ((PartitionInfo) CollectionConverters$.MODULE$.ListHasAsScala(kafkaProducer.partitionsFor(logDirFailureTest.topic())).asScala().find(partitionInfo -> {
            return BoxesRunTime.boxToBoolean($anonfun$testProduceAfterLogDirFailureOnLeader$4(partitionInfo));
        }).get()).leader().id() != i;
    }

    public static final /* synthetic */ String $anonfun$testProduceAfterLogDirFailureOnLeader$5() {
        return "Expected new leader for the partition";
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$7(int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.config().nodeId() == i;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$9(File file, BrokerServer brokerServer) {
        return brokerServer.logDirFailureChannel().hasOfflineLogDir(file.toPath().toString());
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$10(LogDirFailureTest logDirFailureTest, int i, BrokerServer brokerServer) {
        return ArrayOps$.MODULE$.contains$extension(Predef$.MODULE$.intArrayOps((int[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(brokerServer.replicaManager().metadataCache().getClusterMetadata(brokerServer.clusterId(), brokerServer.config().interBrokerListenerName()).partition(new TopicPartition(logDirFailureTest.topic(), 0)).offlineReplicas()), node -> {
            return BoxesRunTime.boxToInteger(node.id());
        }, ClassTag$.MODULE$.Int())), BoxesRunTime.boxToInteger(i));
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$6(LogDirFailureTest logDirFailureTest, int i, File file) {
        Option map = logDirFailureTest.brokers().find(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testProduceAfterLogDirFailureOnLeader$7(i, kafkaBroker));
        }).map(kafkaBroker2 -> {
            return (BrokerServer) kafkaBroker2;
        });
        return map.exists(brokerServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$testProduceAfterLogDirFailureOnLeader$9(file, brokerServer));
        }) && map.exists(brokerServer2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testProduceAfterLogDirFailureOnLeader$10(logDirFailureTest, i, brokerServer2));
        });
    }

    public static final /* synthetic */ String $anonfun$testProduceAfterLogDirFailureOnLeader$12() {
        return "Expected to find an offline log dir";
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$13(KafkaServer kafkaServer) {
        return kafkaServer.kafkaController().isActive();
    }

    public static final /* synthetic */ boolean $anonfun$subscribeAndWaitForAssignment$1(Consumer consumer) {
        return !consumer.assignment().isEmpty();
    }

    public static final /* synthetic */ String $anonfun$subscribeAndWaitForAssignment$2() {
        return "Expected non-empty assignment";
    }

    public LogDirFailureTest() {
        serverConfig().setProperty("replica.high.watermark.checkpoint.interval.ms", "60000");
        serverConfig().setProperty("num.replica.fetchers", "1");
        serverConfig().setProperty("log.dir.failure.timeout.ms", "5000");
        serverConfig().setProperty("controlled.shutdown.enable", "false");
    }
}
