package kafka.tier.topic;

import io.confluent.rest.TierTopicHeadDataLossDetectionRequest;
import io.confluent.rest.TierTopicHeadDataLossDetectionResponse;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import kafka.log.LogManager;
import kafka.server.LeaderEndpointSupplier;
import kafka.server.ReplicaManager;
import kafka.tier.store.TierObjectStore;
import kafka.tier.topic.recovery.ValidationSource;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.jdk.javaapi.CollectionConverters;

/* loaded from: input_file:kafka/tier/topic/SoloTierTopicDataLossValidatorTest.class */
public class SoloTierTopicDataLossValidatorTest {
    private static final TierTopicHeadDataLossDetectionRequest DUMMY_REQUEST = new TierTopicHeadDataLossDetectionRequest("identifier", new HashSet());

    private SoloTierTopicDataLossValidator newInstance() {
        return newInstance(Optional.empty(), Optional.empty());
    }

    private SoloTierTopicDataLossValidator newInstance(Optional<Semaphore> optional, Optional<Semaphore> optional2) {
        TierTopicManagerConfig tierTopicManagerConfig = (TierTopicManagerConfig) Mockito.mock(TierTopicManagerConfig.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        Mockito.when(logManager.allLogs()).thenReturn(CollectionConverters.asScala(new ArrayList()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.leaderPartitionsIterator()).thenReturn(CollectionConverters.asScala(new ArrayList()).iterator());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(tierTopicManagerConfig.enableTierTopicDataLossDetection()).thenAnswer(invocationOnMock -> {
            optional2.ifPresent((v0) -> {
                v0.release();
            });
            optional.ifPresent(semaphore -> {
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            return true;
        }).thenReturn(true);
        Mockito.when(Long.valueOf(tierTopicManagerConfig.tierTopicDataLossDetectionMaxTimeoutMs())).thenReturn(900000L);
        return new SoloTierTopicDataLossValidator(tierTopicManagerConfig, (TierTopic) Mockito.mock(TierTopic.class), (TierObjectStore) Mockito.mock(TierObjectStore.class), replicaManager, () -> {
            return (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
        }, (LeaderEndpointSupplier) Mockito.mock(LeaderEndpointSupplier.class), new MockTime(), (Metrics) Mockito.mock(Metrics.class));
    }

    private TierTopicHeadDataLossDetectionResponse detectDataLoss(SoloTierTopicDataLossValidator soloTierTopicDataLossValidator, Producer<byte[], byte[]> producer) {
        try {
            return soloTierTopicDataLossValidator.detectDataLossInTierTopicHead(DUMMY_REQUEST, ValidationSource.ON_DEMAND_VALIDATION, producer, 30000L);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testSoloInvocation() {
        detectDataLoss(newInstance(), (Producer) Mockito.mock(Producer.class));
    }

    @Test
    public void testConcurrentInvocation() throws Exception {
        Semaphore semaphore = new Semaphore(1);
        Semaphore semaphore2 = new Semaphore(1);
        SoloTierTopicDataLossValidator newInstance = newInstance(Optional.of(semaphore), Optional.of(semaphore2));
        Producer<byte[], byte[]> producer = (Producer) Mockito.mock(Producer.class);
        semaphore.acquire();
        semaphore2.acquire();
        AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread(() -> {
            try {
                atomicReference.set(detectDataLoss(newInstance, producer));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        thread.start();
        semaphore2.acquire();
        Assertions.assertTrue(((Exception) Assertions.assertThrows(RuntimeException.class, () -> {
            detectDataLoss(newInstance, producer);
        })).getCause() instanceof UnsupportedOperationException);
        semaphore.release();
        thread.join();
        Assertions.assertTrue(atomicReference.get() != null);
        Assertions.assertTrue(((TierTopicHeadDataLossDetectionResponse) atomicReference.get()).dataLossReportPath().isEmpty());
        Assertions.assertEquals(TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, ((TierTopicHeadDataLossDetectionResponse) atomicReference.get()).completionStatus());
        Assertions.assertTrue(((TierTopicHeadDataLossDetectionResponse) atomicReference.get()).errorMessages().isEmpty());
        TierTopicHeadDataLossDetectionResponse detectDataLoss = detectDataLoss(newInstance, producer);
        Assertions.assertTrue(detectDataLoss != null);
        Assertions.assertTrue(detectDataLoss.dataLossReportPath().isEmpty());
        Assertions.assertEquals(TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, detectDataLoss.completionStatus());
        Assertions.assertTrue(detectDataLoss.errorMessages().isEmpty());
    }
}
