package org.apache.pinot.core.realtime.stream;

import java.util.HashMap;
import org.apache.pinot.common.utils.DataSize;
import org.apache.pinot.common.utils.time.TimeUtils;
import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
import org.apache.pinot.core.realtime.stream.OffsetCriteria;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/realtime/stream/StreamConfigTest.class */
public class StreamConfigTest {
    @Test
    public void testStreamConfig() {
        boolean z = false;
        String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
        String name = KafkaConsumerFactory.class.getName();
        String name2 = KafkaAvroMessageDecoder.class.getName();
        try {
            new StreamConfig(new HashMap());
        } catch (NullPointerException e) {
            z = true;
        }
        Assert.assertTrue(z);
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), "aTopic");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), consumerType);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.factory.class.name"), name);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"), name2);
        StreamConfig streamConfig = new StreamConfig(hashMap);
        hashMap.remove("streamType");
        boolean z2 = false;
        try {
            streamConfig = new StreamConfig(hashMap);
        } catch (NullPointerException e2) {
            z2 = true;
        }
        Assert.assertTrue(z2);
        hashMap.put("streamType", "kafka");
        hashMap.remove(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"));
        boolean z3 = false;
        try {
            streamConfig = new StreamConfig(hashMap);
        } catch (NullPointerException e3) {
            z3 = true;
        }
        Assert.assertTrue(z3);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), "aTopic");
        hashMap.remove(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"));
        boolean z4 = false;
        try {
            streamConfig = new StreamConfig(hashMap);
        } catch (NullPointerException e4) {
            z4 = true;
        }
        Assert.assertTrue(z4);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), consumerType);
        hashMap.remove(StreamConfigProperties.constructStreamProperty("kafka", "consumer.factory.class.name"));
        boolean z5 = false;
        try {
            streamConfig = new StreamConfig(hashMap);
        } catch (NullPointerException e5) {
            z5 = true;
        }
        Assert.assertFalse(z5);
        Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), StreamConfig.getDefaultConsumerFactoryClassName());
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.factory.class.name"), name);
        hashMap.remove(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"));
        boolean z6 = false;
        try {
            new StreamConfig(hashMap);
        } catch (NullPointerException e6) {
            z6 = true;
        }
        Assert.assertTrue(z6);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"), name2);
        StreamConfig streamConfig2 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig2.getType(), "kafka");
        Assert.assertEquals(streamConfig2.getTopicName(), "aTopic");
        Assert.assertEquals(streamConfig2.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
        Assert.assertEquals(streamConfig2.getConsumerFactoryClassName(), name);
        Assert.assertEquals(streamConfig2.getDecoderClass(), name2);
    }

    @Test
    public void testStreamConfigDefaults() {
        String name = KafkaConsumerFactory.class.getName();
        String name2 = KafkaAvroMessageDecoder.class.getName();
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), "aTopic");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "simple");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.factory.class.name"), name);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"), name2);
        StreamConfig streamConfig = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig.getType(), "kafka");
        Assert.assertEquals(streamConfig.getTopicName(), "aTopic");
        Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
        Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), name);
        Assert.assertEquals(streamConfig.getDecoderClass(), name2);
        Assert.assertEquals(streamConfig.getDecoderProperties().size(), 0);
        Assert.assertEquals(streamConfig.getOffsetCriteria(), new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
        Assert.assertEquals(streamConfig.getConnectionTimeoutMillis(), 30000L);
        Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), 5000);
        Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
        Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.getDefaultFlushThresholdTimeMillis());
        Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(), StreamConfig.getDefaultDesiredSegmentSizeBytes());
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "lowLevel,highLevel");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.prop") + ".prop1", "decoderValueString");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.prop.auto.offset.reset"), "smallest");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "connection.timeout.millis"), "10");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "fetch.timeout.millis"), "200");
        hashMap.put("realtime.segment.flush.threshold.size", "500");
        hashMap.put("realtime.segment.flush.threshold.time", "2h");
        hashMap.put("realtime.segment.flush.desired.size", "20M");
        StreamConfig streamConfig2 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig2.getType(), "kafka");
        Assert.assertEquals(streamConfig2.getTopicName(), "aTopic");
        Assert.assertEquals(streamConfig2.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
        Assert.assertEquals(streamConfig2.getConsumerTypes().get(1), StreamConfig.ConsumerType.HIGHLEVEL);
        Assert.assertEquals(streamConfig2.getConsumerFactoryClassName(), name);
        Assert.assertEquals(streamConfig2.getDecoderClass(), name2);
        Assert.assertEquals(streamConfig2.getDecoderProperties().size(), 1);
        Assert.assertEquals((String) streamConfig2.getDecoderProperties().get("prop1"), "decoderValueString");
        Assert.assertEquals(streamConfig2.getOffsetCriteria().isSmallest(), true);
        Assert.assertEquals(streamConfig2.getConnectionTimeoutMillis(), Long.parseLong("10"));
        Assert.assertEquals(streamConfig2.getFetchTimeoutMillis(), Integer.parseInt("200"));
        Assert.assertEquals(streamConfig2.getFlushThresholdRows(), Integer.parseInt("500"));
        Assert.assertEquals(streamConfig2.getFlushThresholdTimeMillis(), TimeUtils.convertPeriodToMillis("2h").longValue());
        Assert.assertEquals(streamConfig2.getFlushSegmentDesiredSizeBytes(), DataSize.toBytes("20M"));
        hashMap.put("realtime.segment.flush.threshold.time", "18000000");
        Assert.assertEquals(new StreamConfig(hashMap).getFlushThresholdTimeMillis(), Long.parseLong("18000000"));
        hashMap.put("realtime.segment.flush.threshold.time", "invalid input");
        Assert.assertEquals(new StreamConfig(hashMap).getFlushThresholdTimeMillis(), StreamConfig.getDefaultFlushThresholdTimeMillis());
    }

    @Test
    public void testStreamConfigValidations() {
        String name = KafkaConsumerFactory.class.getName();
        String name2 = KafkaAvroMessageDecoder.class.getName();
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), "aTopic");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "simple");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.factory.class.name"), name);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"), name2);
        new StreamConfig(hashMap);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "invalidConsumerType");
        boolean z = false;
        try {
            new StreamConfig(hashMap);
        } catch (IllegalArgumentException e) {
            z = true;
        }
        Assert.assertTrue(z);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "simple");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "fetch.timeout.millis"), "timeout");
        Assert.assertEquals(new StreamConfig(hashMap).getFetchTimeoutMillis(), 5000);
        hashMap.remove(StreamConfigProperties.constructStreamProperty("kafka", "fetch.timeout.millis"));
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "connection.timeout.millis"), "timeout");
        Assert.assertEquals(new StreamConfig(hashMap).getConnectionTimeoutMillis(), 30000L);
        hashMap.remove(StreamConfigProperties.constructStreamProperty("kafka", "connection.timeout.millis"));
        hashMap.put("realtime.segment.flush.threshold.size", "rows");
        Assert.assertEquals(new StreamConfig(hashMap).getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
        hashMap.remove("realtime.segment.flush.threshold.size");
        hashMap.put("realtime.segment.flush.threshold.time", "time");
        Assert.assertEquals(new StreamConfig(hashMap).getFlushThresholdTimeMillis(), StreamConfig.getDefaultFlushThresholdTimeMillis());
        hashMap.remove("realtime.segment.flush.threshold.time");
        hashMap.put("realtime.segment.flush.desired.size", "size");
        Assert.assertEquals(new StreamConfig(hashMap).getFlushSegmentDesiredSizeBytes(), StreamConfig.getDefaultDesiredSegmentSizeBytes());
    }

    @Test
    public void testFlushThresholdStreamConfigs() {
        String name = KafkaConsumerFactory.class.getName();
        String name2 = KafkaAvroMessageDecoder.class.getName();
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), "aTopic");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "lowlevel");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.factory.class.name"), name);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"), name2);
        StreamConfig streamConfig = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
        Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.getDefaultFlushThresholdTimeMillis());
        hashMap.put("realtime.segment.flush.threshold.size", "200");
        hashMap.put("realtime.segment.flush.threshold.time", "2h");
        StreamConfig streamConfig2 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig2.getFlushThresholdRows(), Integer.parseInt("200"));
        Assert.assertEquals(streamConfig2.getFlushThresholdTimeMillis(), TimeUtils.convertPeriodToMillis("2h").longValue());
        hashMap.put("realtime.segment.flush.threshold.size.llc", "400");
        hashMap.put("realtime.segment.flush.threshold.time.llc", "4h");
        StreamConfig streamConfig3 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig3.getFlushThresholdRows(), Integer.parseInt("200"));
        Assert.assertEquals(streamConfig3.getFlushThresholdTimeMillis(), TimeUtils.convertPeriodToMillis("2h").longValue());
        hashMap.remove("realtime.segment.flush.threshold.size");
        hashMap.remove("realtime.segment.flush.threshold.time");
        StreamConfig streamConfig4 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig4.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
        Assert.assertEquals(streamConfig4.getFlushThresholdTimeMillis(), StreamConfig.getDefaultFlushThresholdTimeMillis());
        PartitionLevelStreamConfig partitionLevelStreamConfig = new PartitionLevelStreamConfig(hashMap);
        Assert.assertEquals(partitionLevelStreamConfig.getFlushThresholdRows(), Integer.parseInt("400"));
        Assert.assertEquals(partitionLevelStreamConfig.getFlushThresholdTimeMillis(), TimeUtils.convertPeriodToMillis("4h").longValue());
        hashMap.remove("realtime.segment.flush.threshold.size.llc");
        hashMap.remove("realtime.segment.flush.threshold.time.llc");
        hashMap.put("realtime.segment.flush.threshold.size", "200");
        hashMap.put("realtime.segment.flush.threshold.time", "2h");
        PartitionLevelStreamConfig partitionLevelStreamConfig2 = new PartitionLevelStreamConfig(hashMap);
        Assert.assertEquals(partitionLevelStreamConfig2.getFlushThresholdRows(), Integer.parseInt("200"));
        Assert.assertEquals(partitionLevelStreamConfig2.getFlushThresholdTimeMillis(), TimeUtils.convertPeriodToMillis("2h").longValue());
        hashMap.remove("realtime.segment.flush.threshold.size");
        hashMap.remove("realtime.segment.flush.threshold.time");
        PartitionLevelStreamConfig partitionLevelStreamConfig3 = new PartitionLevelStreamConfig(hashMap);
        Assert.assertEquals(partitionLevelStreamConfig3.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
        Assert.assertEquals(partitionLevelStreamConfig3.getFlushThresholdTimeMillis(), StreamConfig.getDefaultFlushThresholdTimeMillis());
    }

    @Test
    public void testConsumerTypes() {
        String name = KafkaConsumerFactory.class.getName();
        String name2 = KafkaAvroMessageDecoder.class.getName();
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), "aTopic");
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.factory.class.name"), name);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"), name2);
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "simple");
        StreamConfig streamConfig = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
        Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
        Assert.assertFalse(streamConfig.hasHighLevelConsumerType());
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "lowlevel");
        StreamConfig streamConfig2 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig2.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
        Assert.assertTrue(streamConfig2.hasLowLevelConsumerType());
        Assert.assertFalse(streamConfig2.hasHighLevelConsumerType());
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "highLevel");
        StreamConfig streamConfig3 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig3.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
        Assert.assertFalse(streamConfig3.hasLowLevelConsumerType());
        Assert.assertTrue(streamConfig3.hasHighLevelConsumerType());
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "highLevel,simple");
        StreamConfig streamConfig4 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig4.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
        Assert.assertEquals(streamConfig4.getConsumerTypes().get(1), StreamConfig.ConsumerType.LOWLEVEL);
        Assert.assertTrue(streamConfig4.hasLowLevelConsumerType());
        Assert.assertTrue(streamConfig4.hasHighLevelConsumerType());
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), "highLevel,lowlevel");
        StreamConfig streamConfig5 = new StreamConfig(hashMap);
        Assert.assertEquals(streamConfig5.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
        Assert.assertEquals(streamConfig5.getConsumerTypes().get(1), StreamConfig.ConsumerType.LOWLEVEL);
        Assert.assertTrue(streamConfig5.hasLowLevelConsumerType());
        Assert.assertTrue(streamConfig5.hasHighLevelConsumerType());
    }
}
