/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.execution.QueryInfo;
import io.trino.plugin.kafka.KafkaQueryRunner;
import io.trino.plugin.kafka.KafkaTopicDescription;
import io.trino.plugin.kafka.util.TestUtils;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.ResultWithQueryId;
import io.trino.testing.assertions.Assert;
import io.trino.testing.kafka.TestingKafka;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestKafkaIntegrationPushDown
extends AbstractTestQueryFramework {
    private static final int MESSAGE_NUM = 1000;
    private static final int TIMESTAMP_TEST_COUNT = 6;
    private static final int TIMESTAMP_TEST_START_INDEX = 2;
    private static final int TIMESTAMP_TEST_END_INDEX = 4;
    private TestingKafka testingKafka;
    private String topicNamePartition;
    private String topicNameOffset;
    private String topicNameCreateTime;
    private String topicNameLogAppend;

    protected QueryRunner createQueryRunner() throws Exception {
        this.testingKafka = (TestingKafka)this.closeAfterClass((AutoCloseable)TestingKafka.create());
        this.topicNamePartition = "test_push_down_partition_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameOffset = "test_push_down_offset_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameCreateTime = "test_push_down_create_time_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameLogAppend = "test_push_down_log_append_" + UUID.randomUUID().toString().replaceAll("-", "_");
        DistributedQueryRunner queryRunner = KafkaQueryRunner.builder(this.testingKafka).setExtraTopicDescription((Map<SchemaTableName, KafkaTopicDescription>)ImmutableMap.builder().put(TestUtils.createEmptyTopicDescription(this.topicNamePartition, new SchemaTableName("default", this.topicNamePartition))).put(TestUtils.createEmptyTopicDescription(this.topicNameOffset, new SchemaTableName("default", this.topicNameOffset))).put(TestUtils.createEmptyTopicDescription(this.topicNameCreateTime, new SchemaTableName("default", this.topicNameCreateTime))).put(TestUtils.createEmptyTopicDescription(this.topicNameLogAppend, new SchemaTableName("default", this.topicNameLogAppend))).buildOrThrow()).setExtraKafkaProperties((Map<String, String>)ImmutableMap.builder().put((Object)"kafka.messages-per-split", (Object)"100").buildOrThrow()).build();
        this.testingKafka.createTopicWithConfig(2, 1, this.topicNamePartition, false);
        this.testingKafka.createTopicWithConfig(2, 1, this.topicNameOffset, false);
        this.testingKafka.createTopicWithConfig(1, 1, this.topicNameCreateTime, false);
        this.testingKafka.createTopicWithConfig(1, 1, this.topicNameLogAppend, true);
        return queryRunner;
    }

    @Test
    public void testPartitionPushDown() {
        this.createMessages(this.topicNamePartition);
        String sql = String.format("SELECT count(*) FROM default.%s WHERE _partition_id=1", this.topicNamePartition);
        Assert.assertEventually(() -> {
            ResultWithQueryId queryResult = this.getDistributedQueryRunner().executeWithQueryId(this.getSession(), sql);
            org.testng.Assert.assertEquals((long)TestKafkaIntegrationPushDown.getQueryInfo(this.getDistributedQueryRunner(), (ResultWithQueryId<MaterializedResult>)queryResult).getQueryStats().getProcessedInputPositions(), (long)500L);
        });
    }

    @Test
    public void testOffsetPushDown() {
        this.createMessages(this.topicNameOffset);
        this.assertProcessedInputPossitions(String.format("SELECT count(*) FROM default.%s WHERE _partition_offset between 2 and 10", this.topicNameOffset), 18L);
        this.assertProcessedInputPossitions(String.format("SELECT count(*) FROM default.%s WHERE _partition_offset > 2 and _partition_offset < 10", this.topicNameOffset), 14L);
        this.assertProcessedInputPossitions(String.format("SELECT count(*) FROM default.%s WHERE _partition_offset = 3", this.topicNameOffset), 2L);
    }

    private void assertProcessedInputPossitions(String sql, long expectedProcessedInputPositions) {
        DistributedQueryRunner queryRunner = this.getDistributedQueryRunner();
        Assert.assertEventually(() -> {
            ResultWithQueryId queryResult = queryRunner.executeWithQueryId(this.getSession(), sql);
            org.testng.Assert.assertEquals((long)TestKafkaIntegrationPushDown.getQueryInfo(queryRunner, (ResultWithQueryId<MaterializedResult>)queryResult).getQueryStats().getProcessedInputPositions(), (long)expectedProcessedInputPositions);
        });
    }

    @Test
    public void testTimestampCreateTimeModePushDown() throws Exception {
        RecordMessage recordMessage = this.createTimestampTestMessages(this.topicNameCreateTime);
        DistributedQueryRunner queryRunner = this.getDistributedQueryRunner();
        String sql = String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", this.topicNameCreateTime, recordMessage.getStartTime(), recordMessage.getEndTime());
        Assert.assertEventually(() -> {
            ResultWithQueryId queryResult = queryRunner.executeWithQueryId(this.getSession(), sql);
            Assertions.assertThat((long)TestKafkaIntegrationPushDown.getQueryInfo(queryRunner, (ResultWithQueryId<MaterializedResult>)queryResult).getQueryStats().getProcessedInputPositions()).isEqualTo(998L);
        });
        Assert.assertEventually(() -> {
            Session sessionWithUpperBoundPushDownEnabled = Session.builder((Session)this.getSession()).setSystemProperty("kafka.timestamp_upper_bound_force_push_down_enabled", "true").build();
            ResultWithQueryId queryResult = queryRunner.executeWithQueryId(sessionWithUpperBoundPushDownEnabled, sql);
            Assertions.assertThat((long)TestKafkaIntegrationPushDown.getQueryInfo(queryRunner, (ResultWithQueryId<MaterializedResult>)queryResult).getQueryStats().getProcessedInputPositions()).isEqualTo(2L);
        });
    }

    @Test
    public void testTimestampLogAppendModePushDown() throws Exception {
        RecordMessage recordMessage = this.createTimestampTestMessages(this.topicNameLogAppend);
        DistributedQueryRunner queryRunner = this.getDistributedQueryRunner();
        String sql = String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", this.topicNameLogAppend, recordMessage.getStartTime(), recordMessage.getEndTime());
        Assert.assertEventually(() -> {
            ResultWithQueryId queryResult = queryRunner.executeWithQueryId(this.getSession(), sql);
            Assertions.assertThat((long)TestKafkaIntegrationPushDown.getQueryInfo(queryRunner, (ResultWithQueryId<MaterializedResult>)queryResult).getQueryStats().getProcessedInputPositions()).isEqualTo(2L);
        });
    }

    private static QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId<MaterializedResult> queryResult) {
        return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId());
    }

    private RecordMessage createTimestampTestMessages(String topicName) throws Exception {
        String startTime = null;
        String endTime = null;
        for (int messageNum = 0; messageNum < 6; ++messageNum) {
            long key = messageNum;
            long value = messageNum;
            RecordMetadata recordMetadata = this.testingKafka.sendMessages(Stream.of(new ProducerRecord(topicName, (Object)key, (Object)value)));
            if (messageNum == 2) {
                startTime = TestKafkaIntegrationPushDown.getTimestamp(recordMetadata);
            } else if (messageNum == 4) {
                endTime = TestKafkaIntegrationPushDown.getTimestamp(recordMetadata);
            }
            Thread.sleep(100L);
        }
        this.testingKafka.sendMessages(LongStream.range(6L, 1000L).mapToObj(id -> new ProducerRecord(topicName, (Object)id, (Object)id)));
        return new RecordMessage(startTime, endTime);
    }

    private static String getTimestamp(RecordMetadata recordMetadata) {
        return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.ofInstant(Instant.ofEpochMilli(recordMetadata.timestamp()), ZoneId.of("UTC")));
    }

    private void createMessages(String topicName) {
        this.testingKafka.sendMessages(IntStream.range(0, 1000).mapToObj(id -> new ProducerRecord(topicName, (Object)id, (Object)id)));
    }

    private static class RecordMessage {
        private final String startTime;
        private final String endTime;

        public RecordMessage(String startTime, String endTime) {
            this.startTime = Objects.requireNonNull(startTime, "startTime result is none");
            this.endTime = Objects.requireNonNull(endTime, "endTime result is none");
        }

        public String getStartTime() {
            return this.startTime;
        }

        public String getEndTime() {
            return this.endTime;
        }
    }
}

