package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Timeout(600)
@Tags({@Tag("integration"), @Tag("bazel:shard_count:2")})
/* loaded from: input_file:org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.class */
public class HighAvailabilityTaskAssignorIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, new Properties(), (List<Properties>) Arrays.asList(new Properties() { // from class: org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.1
        {
            setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_0);
        }
    }, new Properties() { // from class: org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.2
        {
            setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_1);
        }
    }, new Properties() { // from class: org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.3
        {
            setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_2);
        }
    }));

    public static Stream<Arguments> data() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"none"}), Arguments.of(new Object[]{"min_traffic"}), Arguments.of(new Object[]{"balance_subtopology"})});
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @MethodSource({"data"})
    @ParameterizedTest
    public void shouldScaleOutWithWarmupTasksAndInMemoryStores(String str, TestInfo testInfo) throws InterruptedException {
        shouldScaleOutWithWarmupTasks(str2 -> {
            return Materialized.as(Stores.inMemoryKeyValueStore(str2));
        }, testInfo, str);
    }

    @MethodSource({"data"})
    @ParameterizedTest
    public void shouldScaleOutWithWarmupTasksAndPersistentStores(String str, TestInfo testInfo) throws InterruptedException {
        shouldScaleOutWithWarmupTasks(str2 -> {
            return Materialized.as(Stores.persistentKeyValueStore(str2));
        }, testInfo, str);
    }

    /* JADX WARN: Failed to calculate best type for var: r29v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r29v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r30v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r30v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 29, insn: 0x030b: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r29 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:77:0x030b */
    /* JADX WARN: Not initialized variable reg: 30, insn: 0x0310: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r30 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:79:0x0310 */
    /* JADX WARN: Type inference failed for: r29v1, types: [org.apache.kafka.streams.KafkaStreams] */
    /* JADX WARN: Type inference failed for: r30v0, types: [java.lang.Throwable] */
    private void shouldScaleOutWithWarmupTasks(Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> function, TestInfo testInfo, String str) throws InterruptedException {
        ?? r29;
        ?? r30;
        String replaceAll = IntegrationTestUtils.safeUniqueTestName(testInfo).replace("HighAvailabilityTaskAssignorIntegrationTest", "HATaskAssignorIT").replaceAll("balance_subtopology", "balance");
        String str2 = "appId_" + System.currentTimeMillis() + "_" + replaceAll;
        String str3 = "input" + replaceAll;
        Set mkSet = Utils.mkSet(new TopicPartition[]{new TopicPartition(str3, 0), new TopicPartition(str3, 1)});
        String str4 = "store" + replaceAll;
        String str5 = str2 + "-store" + replaceAll + "-changelog";
        Set mkSet2 = Utils.mkSet(new TopicPartition[]{new TopicPartition(str5, 0), new TopicPartition(str5, 1)});
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, 2, str3, str5);
        ReentrantLock reentrantLock = new ReentrantLock();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AssignorConfiguration.AssignmentListener assignmentListener = z -> {
            reentrantLock.lock();
            try {
                concurrentHashMap.put(Integer.valueOf(atomicInteger.incrementAndGet()), Boolean.valueOf(z));
                atomicBoolean.set(z);
                reentrantLock.unlock();
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        };
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(str3, function.apply(str4));
        Topology build = streamsBuilder.build();
        produceTestData(str3, 500);
        KafkaStreams kafkaStreams = new KafkaStreams(build, streamsProperties(str2, assignmentListener, str, AssignmentTestUtils.RACK_0));
        Throwable th = null;
        try {
            try {
                KafkaStreams kafkaStreams2 = new KafkaStreams(build, streamsProperties(str2, assignmentListener, str, AssignmentTestUtils.RACK_1));
                Throwable th2 = null;
                KafkaConsumer kafkaConsumer = new KafkaConsumer(getConsumerProperties());
                Throwable th3 = null;
                try {
                    try {
                        kafkaStreams.start();
                        TestUtils.waitForCondition(() -> {
                            return getEndOffsetSum(mkSet, kafkaConsumer) == 500;
                        }, 120000L, () -> {
                            return "Input records haven't all been written to the input topic: " + getEndOffsetSum(mkSet, kafkaConsumer);
                        });
                        TestUtils.waitForCondition(() -> {
                            return getEndOffsetSum(mkSet2, kafkaConsumer) == 500;
                        }, 120000L, () -> {
                            return "Input records haven't all been written to the changelog: " + getEndOffsetSum(mkSet2, kafkaConsumer);
                        });
                        final AtomicLong atomicLong = new AtomicLong(-1L);
                        final AtomicLong atomicLong2 = new AtomicLong(-1L);
                        final CountDownLatch countDownLatch = new CountDownLatch(1);
                        kafkaStreams2.setGlobalStateRestoreListener(new StateRestoreListener() { // from class: org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.4
                            public void onRestoreStart(TopicPartition topicPartition, String str6, long j, long j2) {
                            }

                            public void onBatchRestored(TopicPartition topicPartition, String str6, long j, long j2) {
                                atomicLong2.accumulateAndGet(j2, (j3, j4) -> {
                                    return j3 == -1 ? j4 : j3 + j4;
                                });
                            }

                            public void onRestoreEnd(TopicPartition topicPartition, String str6, long j) {
                                atomicLong.accumulateAndGet(j, (j2, j3) -> {
                                    return j2 == -1 ? j3 : j2 + j3;
                                });
                                countDownLatch.countDown();
                            }
                        });
                        int i = atomicInteger.get();
                        kafkaStreams2.start();
                        TestUtils.waitForCondition(() -> {
                            reentrantLock.lock();
                            try {
                                if (atomicInteger.get() <= i) {
                                    return false;
                                }
                                assertFalseNoRetry(((Boolean) concurrentHashMap.get(Integer.valueOf(i + 1))).booleanValue(), "the first assignment after adding a node should be unstable while we warm up the state.");
                                reentrantLock.unlock();
                                return true;
                            } finally {
                                reentrantLock.unlock();
                            }
                        }, 120000L, "Never saw a first assignment after scale out: " + atomicInteger.get());
                        atomicBoolean.getClass();
                        TestUtils.waitForCondition(atomicBoolean::get, 120000L, "Assignment hasn't become stable: " + atomicInteger.get() + " Note, if this does fail, check and see if the new instance just failed to catch up within the probing rebalance interval. A full minute should be long enough to read ~500 records in any test environment, but you never know...");
                        countDownLatch.await();
                        MatcherAssert.assertThat(Long.valueOf(atomicLong.get()), Matchers.is(0L));
                        MatcherAssert.assertThat(Long.valueOf(atomicLong2.get()), Matchers.is(-1L));
                        if (kafkaConsumer != null) {
                            if (0 != 0) {
                                try {
                                    kafkaConsumer.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                kafkaConsumer.close();
                            }
                        }
                        if (kafkaStreams2 != null) {
                            if (0 != 0) {
                                try {
                                    kafkaStreams2.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                kafkaStreams2.close();
                            }
                        }
                        if (kafkaStreams != null) {
                            if (0 == 0) {
                                kafkaStreams.close();
                                return;
                            }
                            try {
                                kafkaStreams.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th3 = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (kafkaConsumer != null) {
                        if (th3 != null) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th9) {
                                th3.addSuppressed(th9);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th11) {
                            th.addSuppressed(th11);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
                throw th10;
            }
        } catch (Throwable th12) {
            if (r29 != 0) {
                if (r30 != 0) {
                    try {
                        r29.close();
                    } catch (Throwable th13) {
                        r30.addSuppressed(th13);
                    }
                } else {
                    r29.close();
                }
            }
            throw th12;
        }
    }

    private void produceTestData(String str, int i) {
        String kiloByteValue = getKiloByteValue();
        KafkaProducer kafkaProducer = new KafkaProducer(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("acks", "all"), Utils.mkEntry("key.serializer", StringSerializer.class.getName()), Utils.mkEntry("value.serializer", StringSerializer.class.getName())})));
        Throwable th = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                try {
                    kafkaProducer.send(new ProducerRecord(str, String.valueOf(i2), kiloByteValue));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private static Properties getConsumerProperties() {
        return Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("key.deserializer", StringDeserializer.class.getName()), Utils.mkEntry("value.deserializer", StringDeserializer.class.getName())}));
    }

    private static String getKiloByteValue() {
        StringBuilder sb = new StringBuilder(1000);
        for (int i = 0; i < 1000; i++) {
            sb.append('0');
        }
        return sb.toString();
    }

    private static void assertFalseNoRetry(boolean z, String str) {
        if (z) {
            throw new NoRetryException(new AssertionError(str));
        }
    }

    private static Properties streamsProperties(String str, AssignorConfiguration.AssignmentListener assignmentListener, String str2, String str3) {
        return Utils.mkObjectProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("application.id", str), Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath()), Utils.mkEntry("num.standby.replicas", "0"), Utils.mkEntry("acceptable.recovery.lag", "0"), Utils.mkEntry("max.warmup.replicas", "2"), Utils.mkEntry("probing.rebalance.interval.ms", "60000"), Utils.mkEntry("__assignment.listener__", assignmentListener), Utils.mkEntry("commit.interval.ms", 100L), Utils.mkEntry("internal.task.assignor.class", HighAvailabilityTaskAssignor.class.getName()), Utils.mkEntry("num.stream.threads", 40), Utils.mkEntry("default.key.serde", Serdes.StringSerde.class.getName()), Utils.mkEntry("default.value.serde", Serdes.StringSerde.class.getName()), Utils.mkEntry("client.rack", str3), Utils.mkEntry("rack.aware.assignment.strategy", str2)}));
    }

    private static long getEndOffsetSum(Set<TopicPartition> set, Consumer<String, String> consumer) {
        long j = 0;
        Iterator it = consumer.endOffsets(set).values().iterator();
        while (it.hasNext()) {
            j += ((Long) it.next()).longValue();
        }
        return j;
    }
}
