package io.confluent.kafkarest.integration;

import com.fasterxml.jackson.databind.node.BinaryNode;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import org.eclipse.jetty.server.RequestLog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/CustomLogIntegrationTest.class */
public class CustomLogIntegrationTest extends ClusterTestHarness {
    private static final Logger log = LoggerFactory.getLogger(CustomLogIntegrationTest.class);
    private ScheduledExecutorService executor;
    private final ConcurrentLinkedQueue<String> logEntries;
    private static final String topicName = "topic-1";
    private Properties restConfigs;

    /* loaded from: input_file:io/confluent/kafkarest/integration/CustomLogIntegrationTest$TestRequestLogWriter.class */
    class TestRequestLogWriter implements RequestLog.Writer {
        AtomicInteger entryCounter = new AtomicInteger();

        TestRequestLogWriter() {
        }

        public void write(String str) {
            try {
                Assertions.assertTrue(CustomLogIntegrationTest.this.logEntries.add(str), "Failed to add entry to log, unexpected.");
                CustomLogIntegrationTest.log.info("Log #{}:{}", Integer.valueOf(this.entryCounter.getAndIncrement()), str);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public CustomLogIntegrationTest() {
        super(1, false, false);
        this.executor = Executors.newScheduledThreadPool(4);
        this.logEntries = new ConcurrentLinkedQueue<>();
        this.restConfigs = new Properties();
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.putAll(this.restConfigs);
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    public void setUp() {
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        super.setUp();
        this.restConfigs.clear();
        this.restConfigs.put("dos.filter.delay.ms", -1);
        if (testInfo.getDisplayName().contains("test_whenJettyGlobalRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.sec", 1);
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
        } else if (testInfo.getDisplayName().contains("test_whenNoRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.sec", 99999);
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
            this.restConfigs.put("rate.limit.enable", "true");
            this.restConfigs.put("rate.limit.backend", "resilience4j");
            this.restConfigs.put("rate.limit.permits.per.sec", 99999);
            this.restConfigs.put("rate.limit.per.cluster.permits.per.sec", 99999);
        } else if (testInfo.getDisplayName().contains("test_whenCustomLoggingDisabled_ThenRequestLogDoesntHaveCustomInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("use.custom.request.logging", "false");
            this.restConfigs.put("dos.filter.max.requests.per.sec", 1);
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
        } else if (testInfo.getDisplayName().contains("test_whenJettyNonGlobalRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 1);
            this.restConfigs.put("dos.filter.max.requests.per.sec", 99999);
        } else if (testInfo.getDisplayName().contains("test_whenGlobalPermitRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
            this.restConfigs.put("dos.filter.max.requests.per.sec", 99999);
            this.restConfigs.put("rate.limit.enable", "true");
            this.restConfigs.put("rate.limit.backend", "resilience4j");
            this.restConfigs.put("rate.limit.permits.per.sec", 1);
            this.restConfigs.put("rate.limit.per.cluster.permits.per.sec", 99999);
        } else if (testInfo.getDisplayName().contains("test_whenPerClusterPermitRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
            this.restConfigs.put("dos.filter.max.requests.per.sec", 99999);
            this.restConfigs.put("rate.limit.enable", "true");
            this.restConfigs.put("rate.limit.backend", "resilience4j");
            this.restConfigs.put("rate.limit.permits.per.sec", 99999);
            this.restConfigs.put("rate.limit.per.cluster.permits.per.sec", 1);
        } else if (testInfo.getDisplayName().contains("test_whenGlobalProduceRequestsRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
            this.restConfigs.put("dos.filter.max.requests.per.sec", 99999);
            this.restConfigs.put("rate.limit.enable", "true");
            this.restConfigs.put("api.v3.produce.rate.limit.enabled", "true");
            this.restConfigs.put("rate.limit.backend", "resilience4j");
            this.restConfigs.put("api.v3.produce.rate.limit.max.requests.global.per.sec", 1);
            this.restConfigs.put("api.v3.produce.rate.limit.max.requests.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.bytes.global.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.bytes.per.sec", 99999);
        } else if (testInfo.getDisplayName().contains("test_whenPerTenantProduceRequestsRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
            this.restConfigs.put("dos.filter.max.requests.per.sec", 99999);
            this.restConfigs.put("rate.limit.enable", "true");
            this.restConfigs.put("api.v3.produce.rate.limit.enabled", "true");
            this.restConfigs.put("rate.limit.backend", "resilience4j");
            this.restConfigs.put("api.v3.produce.rate.limit.max.requests.global.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.requests.per.sec", 1);
            this.restConfigs.put("api.v3.produce.rate.limit.max.bytes.global.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.bytes.per.sec", 99999);
        } else if (testInfo.getDisplayName().contains("test_whenGlobalProduceBytesRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
            this.restConfigs.put("dos.filter.max.requests.per.sec", 99999);
            this.restConfigs.put("rate.limit.enable", "true");
            this.restConfigs.put("api.v3.produce.rate.limit.enabled", "true");
            this.restConfigs.put("rate.limit.backend", "resilience4j");
            this.restConfigs.put("api.v3.produce.rate.limit.max.requests.global.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.requests.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.bytes.global.per.sec", 1);
            this.restConfigs.put("api.v3.produce.rate.limit.max.bytes.per.sec", 99999);
        } else {
            if (!testInfo.getDisplayName().contains("test_whenPerTenantProduceBytesRateLimitTriggered_ThenRequestLogHasRelevantInfo")) {
                throw new Exception("Invalid test, doesn't have rest-configs defined in setup.");
            }
            this.restConfigs.put("dos.filter.enabled", "true");
            this.restConfigs.put("dos.filter.max.requests.per.connection.per.sec", 99999);
            this.restConfigs.put("dos.filter.max.requests.per.sec", 99999);
            this.restConfigs.put("rate.limit.enable", "true");
            this.restConfigs.put("api.v3.produce.rate.limit.enabled", "true");
            this.restConfigs.put("rate.limit.backend", "resilience4j");
            this.restConfigs.put("api.v3.produce.rate.limit.max.requests.global.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.requests.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.bytes.global.per.sec", 99999);
            this.restConfigs.put("api.v3.produce.rate.limit.max.bytes.per.sec", 1);
        }
        this.logEntries.clear();
        startRest(new TestRequestLogWriter(), "%s");
        createTopic(topicName, 1, (short) 1);
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    @AfterEach
    public void tearDown() throws Exception {
        stopRest();
    }

    @DisplayName("test_whenCustomLoggingDisabled_ThenRequestLogDoesntHaveCustomInfo")
    @Test
    public void test_whenCustomLoggingDisabled_ThenRequestLogDoesntHaveCustomInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429007, 100 - 1, false, true);
    }

    @DisplayName("test_whenNoRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenNoRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429007, 100 - 100, false, false);
    }

    @DisplayName("test_whenJettyGlobalRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenJettyGlobalRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429007, 100 - 1, false, false);
    }

    @DisplayName("test_whenJettyNonGlobalRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenJettyNonGlobalRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        getClusterId();
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429008, 100 - 1, false, false);
    }

    @DisplayName("test_whenGlobalPermitRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenGlobalPermitRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429005, 100 - 1, false, false);
    }

    @DisplayName("test_whenPerClusterPermitRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenPerClusterPermitRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429006, 100 - 1, false, false);
    }

    @DisplayName("test_whenGlobalProduceRequestsRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenGlobalProduceRequestsRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/" + topicName + "/records/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429001, 100 - 1, true, false);
    }

    @DisplayName("test_whenPerTenantProduceRequestsRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenPerTenantProduceRequestsRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/" + topicName + "/records/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429002, 100 - 1, true, false);
    }

    @DisplayName("test_whenGlobalProduceBytesRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenGlobalProduceBytesRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/" + topicName + "/records/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429003, 100 - 0, true, false);
    }

    @DisplayName("test_whenPerTenantProduceBytesRateLimitTriggered_ThenRequestLogHasRelevantInfo")
    @Test
    public void test_whenPerTenantProduceBytesRateLimitTriggered_ThenRequestLogHasRelevantInfo() {
        hammerAtConstantRate("/v3/clusters/" + getClusterId() + "/topics/" + topicName + "/records/", Duration.ofMillis(1L), 100);
        verifyLog(100, 429004, 100 - 0, true, false);
    }

    private void hammerAtConstantRate(String str, Duration duration, int i) {
        Preconditions.checkArgument(!duration.isNegative(), "rate must be non-negative");
        for (Response response : (List) ((List) IntStream.range(0, i).mapToObj(i2 -> {
            if (!str.contains("/records")) {
                return this.executor.schedule(() -> {
                    return request(str).accept(new String[]{"application/json"}).get();
                }, i2 * duration.toMillis(), TimeUnit.MILLISECONDS);
            }
            ProduceRequest build = ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(ByteString.copyFromUtf8("foo").toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(ByteString.copyFromUtf8("bar").toByteArray())).build()).setOriginalSize(0L).build();
            return this.executor.schedule(() -> {
                return request(str).accept(new String[]{"application/json"}).post(Entity.entity(build, "application/json"));
            }, i2 * duration.toMillis(), TimeUnit.MILLISECONDS);
        }).collect(Collectors.toList())).stream().map(scheduledFuture -> {
            try {
                return (Response) scheduledFuture.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList())) {
            Assertions.assertTrue(!response.getHeaders().containsKey("REST_ERROR_CODE"), "Unexpected header in headers " + response.getHeaders());
            int status = response.getStatus();
            if (status != 200 && status != 429) {
                Assertions.fail(String.format("Expected HTTP 200 or HTTP 429, but got HTTP %d instead: %s", Integer.valueOf(status), response.readEntity(String.class)));
            }
        }
    }

    private void verifyLog(int i, int i2, int i3, boolean z, boolean z2) {
        try {
            TimeUnit.SECONDS.sleep(2L);
        } catch (InterruptedException e) {
            Assertions.assertTrue(false, "Unexpectedly failed to sleep.");
        }
        String str = "429 " + i2;
        if (z) {
            str = "200 " + i2;
        }
        int i4 = 0;
        int i5 = 0;
        while (true) {
            String poll = this.logEntries.poll();
            if (poll == null) {
                break;
            }
            Assertions.assertTrue("200 -".equals(poll) || str.equals(poll), "Log entry is <" + poll + "> Vs it should be either of <200 ->, or <" + str + ">");
            if (str.equals(poll)) {
                i4++;
            }
            i5++;
        }
        if (z2) {
            Assertions.assertEquals(0, i5);
            return;
        }
        int i6 = (int) (0.95d * i3);
        Assertions.assertTrue(i4 >= i6, "Expected # of rate-limited requests to be >= " + i6);
        Assertions.assertEquals(i, i5);
    }
}
