package io.confluent.kafkarest.resources.v1;

import io.confluent.kafkarest.AdminClientWrapper;
import io.confluent.kafkarest.ConsumerManager;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.DefaultKafkaRestContext;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.MetadataObserver;
import io.confluent.kafkarest.ProducerPool;
import io.confluent.kafkarest.ScalaConsumersContext;
import io.confluent.kafkarest.SimpleConsumerManager;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.EntityUtils;
import io.confluent.kafkarest.extension.InstantConverterProvider;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import io.confluent.rest.RestConfigException;
import io.confluent.rest.validation.JacksonMessageBodyProvider;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.ws.rs.core.Application;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/kafkarest/resources/v1/PartitionsResourceConsumeTest.class */
public class PartitionsResourceConsumeTest extends JerseyTest {
    private static final int PARTITION = 1;
    private static final long COUNT = 10;
    private KafkaConsumerManager kafkaConsumerManager;
    private SimpleConsumerManager simpleConsumerManager;
    private static final Instant TIMESTAMP = Instant.ofEpochMilli(1000);
    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ISO_INSTANT;
    private static final byte[] KEY = "key".getBytes(StandardCharsets.UTF_8);
    private static final byte[] VALUE = "value".getBytes(StandardCharsets.UTF_8);
    private static final String TOPIC = "topic";
    private static final long OFFSET = 30;
    private static final List<ConsumerRecord<byte[], byte[]>> RECORDS = Collections.unmodifiableList(Collections.singletonList(new ConsumerRecord(TOPIC, KEY, VALUE, 1, OFFSET)));

    protected Application configure() {
        ResourceConfig resourceConfig = new ResourceConfig();
        resourceConfig.register(new PartitionsResource(createKafkaRestContext()));
        resourceConfig.register(InstantConverterProvider.class);
        resourceConfig.register(new JacksonMessageBodyProvider());
        return resourceConfig;
    }

    private KafkaRestContext createKafkaRestContext() {
        this.kafkaConsumerManager = (KafkaConsumerManager) EasyMock.createMock(KafkaConsumerManager.class);
        this.simpleConsumerManager = (SimpleConsumerManager) EasyMock.createMock(SimpleConsumerManager.class);
        try {
            return new DefaultKafkaRestContext(new KafkaRestConfig(), (ProducerPool) null, this.kafkaConsumerManager, (AdminClientWrapper) null, new ScalaConsumersContext((MetadataObserver) null, (ConsumerManager) null, this.simpleConsumerManager));
        } catch (RestConfigException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private <K, V> void expectConsume(String str, int i, long j, long j2, EmbeddedFormat embeddedFormat, List<ConsumerRecord<K, V>> list) {
        Capture newInstance = Capture.newInstance();
        this.simpleConsumerManager.consume((String) EasyMock.eq(str), EasyMock.eq(i), EasyMock.eq(j), EasyMock.eq(j2), (EmbeddedFormat) EasyMock.eq(embeddedFormat), (ConsumerReadCallback) EasyMock.capture(newInstance));
        EasyMock.expectLastCall().andAnswer(() -> {
            ((ConsumerReadCallback) newInstance.getValue()).onCompletion(list, (Exception) null);
            return null;
        });
    }

    @Before
    public void setUpMocks() {
        EasyMock.reset(new Object[]{this.kafkaConsumerManager, this.simpleConsumerManager});
    }

    @Test
    public void getMessages_withTimestamp_returnsMessagesAtTimestamp() {
        EasyMock.expect(this.kafkaConsumerManager.getOffsetForTime(TOPIC, 1, TIMESTAMP)).andReturn(Optional.of(Long.valueOf(OFFSET)));
        expectConsume(TOPIC, 1, OFFSET, COUNT, EmbeddedFormat.BINARY, RECORDS);
        EasyMock.replay(new Object[]{this.kafkaConsumerManager, this.simpleConsumerManager});
        Assert.assertEquals("[{\"topic\":\"topic\",\"key\":\"" + EntityUtils.encodeBase64Binary(KEY) + "\",\"value\":\"" + EntityUtils.encodeBase64Binary(VALUE) + "\",\"partition\":1,\"offset\":" + OFFSET + "}]", (String) target("/topics/{topic}/partitions/{partition}/messages").resolveTemplate(TOPIC, TOPIC).resolveTemplate("partition", 1).queryParam("timestamp", new Object[]{DATE_TIME_FORMATTER.format(TIMESTAMP)}).queryParam("count", new Object[]{Long.valueOf(COUNT)}).request(new String[]{"application/vnd.kafka.binary.v1+json"}).get(String.class));
        EasyMock.verify(new Object[]{this.kafkaConsumerManager, this.simpleConsumerManager});
    }

    @Test
    public void getMessages_withTimestamp_noSuchOffset_returnsEmpty() {
        EasyMock.expect(this.kafkaConsumerManager.getOffsetForTime(TOPIC, 1, TIMESTAMP)).andReturn(Optional.empty());
        EasyMock.replay(new Object[]{this.kafkaConsumerManager, this.simpleConsumerManager});
        Assert.assertEquals("[]", (String) target("/topics/{topic}/partitions/{partition}/messages").resolveTemplate(TOPIC, TOPIC).resolveTemplate("partition", 1).queryParam("timestamp", new Object[]{DATE_TIME_FORMATTER.format(TIMESTAMP)}).request(new String[]{"application/vnd.kafka.binary.v1+json"}).get(String.class));
        EasyMock.verify(new Object[]{this.kafkaConsumerManager, this.simpleConsumerManager});
    }
}
