package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.class */
public class KafkaTopicGroupingWorkUnitPackerTest {
    private Properties props;

    @BeforeMethod
    public void setUp() {
        this.props = new Properties();
        this.props.setProperty("gobblin.kafka.streaming.containerCapacity", "2");
        this.props.setProperty("kafka.workunit.size.estimator.type", "CUSTOM");
        this.props.setProperty("kafka.workunit.size.estimator.customizedType", "org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator");
    }

    @Test
    public void testSingleTopic() {
        UniversalKafkaSource universalKafkaSource = new UniversalKafkaSource();
        SourceState sourceState = new SourceState(new State(this.props));
        sourceState.setProp("gobblin.kafka.streaming.enableIndexing", false);
        sourceState.setProp("writer.output.dir", Files.createTempDir().getAbsolutePath());
        List pack = new KafkaTopicGroupingWorkUnitPacker(universalKafkaSource, sourceState, Optional.absent()).pack(ImmutableMap.of("topic1", Lists.newArrayList(new WorkUnit[]{getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2), getWorkUnitWithTopicPartition("topic1", 3)})), 10);
        Assert.assertEquals(pack.size(), 2);
        Assert.assertEquals(((WorkUnit) pack.get(0)).getProp("topic.name"), "topic1");
        Assert.assertEquals(((WorkUnit) pack.get(0)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 0)), 1);
        Assert.assertEquals(((WorkUnit) pack.get(0)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 1)), 2);
        Assert.assertEquals(((WorkUnit) pack.get(0)).getPropAsDouble("gobblin.kafka.streaming.containerCapacity"), 2.0d, 0.001d);
        Assert.assertEquals(((WorkUnit) pack.get(1)).getProp("topic.name"), "topic1");
        Assert.assertEquals(((WorkUnit) pack.get(1)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 0)), 3);
        Assert.assertEquals(((WorkUnit) pack.get(1)).getPropAsDouble("gobblin.kafka.streaming.containerCapacity"), 2.0d, 0.001d);
    }

    @Test
    public void testMultiTopic() {
        UniversalKafkaSource universalKafkaSource = new UniversalKafkaSource();
        SourceState sourceState = new SourceState(new State(this.props));
        sourceState.setProp("gobblin.kafka.streaming.enableIndexing", false);
        sourceState.setProp("writer.output.dir", Files.createTempDir().getAbsolutePath());
        List pack = new KafkaTopicGroupingWorkUnitPacker(universalKafkaSource, sourceState, Optional.absent()).pack(ImmutableMap.of("topic1", Lists.newArrayList(new WorkUnit[]{getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2), getWorkUnitWithTopicPartition("topic1", 3)}), "topic2", Lists.newArrayList(new WorkUnit[]{getWorkUnitWithTopicPartition("topic2", 1), getWorkUnitWithTopicPartition("topic2", 2), getWorkUnitWithTopicPartition("topic2", 3)})), 10);
        Assert.assertEquals(pack.size(), 4);
        Assert.assertEquals(((WorkUnit) pack.get(0)).getProp("topic.name"), "topic1");
        Assert.assertEquals(((WorkUnit) pack.get(0)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 0)), 1);
        Assert.assertEquals(((WorkUnit) pack.get(0)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 1)), 2);
        Assert.assertEquals(((WorkUnit) pack.get(0)).getPropAsDouble("gobblin.kafka.streaming.containerCapacity"), 2.0d, 0.001d);
        Assert.assertEquals(((WorkUnit) pack.get(1)).getProp("topic.name"), "topic1");
        Assert.assertEquals(((WorkUnit) pack.get(1)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 0)), 3);
        Assert.assertEquals(((WorkUnit) pack.get(1)).getPropAsDouble("gobblin.kafka.streaming.containerCapacity"), 2.0d, 0.001d);
        Assert.assertEquals(((WorkUnit) pack.get(2)).getProp("topic.name"), "topic2");
        Assert.assertEquals(((WorkUnit) pack.get(2)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 0)), 1);
        Assert.assertEquals(((WorkUnit) pack.get(2)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 1)), 2);
        Assert.assertEquals(((WorkUnit) pack.get(2)).getPropAsDouble("gobblin.kafka.streaming.containerCapacity"), 2.0d, 0.001d);
        Assert.assertEquals(((WorkUnit) pack.get(3)).getProp("topic.name"), "topic2");
        Assert.assertEquals(((WorkUnit) pack.get(3)).getPropAsInt(KafkaUtils.getPartitionPropName("partition.id", 0)), 3);
        Assert.assertEquals(((WorkUnit) pack.get(3)).getPropAsDouble("gobblin.kafka.streaming.containerCapacity"), 2.0d, 0.001d);
    }

    public WorkUnit getWorkUnitWithTopicPartition(String str, int i) {
        WorkUnit workUnit = new WorkUnit(new Extract(Extract.TableType.APPEND_ONLY, "kafka", str));
        workUnit.setProp("topic.name", str);
        workUnit.setProp("partition.id", Integer.toString(i));
        workUnit.setProp("leader.hostandport", "host:1234");
        workUnit.setProp("leader.id", "1");
        workUnit.setProp("workunit.low.water.mark", "0");
        workUnit.setProp("workunit.high.water.mark", "0");
        workUnit.setProp("previousStartFetchEpochTime", "0");
        workUnit.setProp("previousStopFetchEpochTime", "0");
        workUnit.setProp("previousLowWatermark", "0");
        workUnit.setProp("previousHighWatermark", "0");
        workUnit.setProp("previousLatestOffset", "0");
        workUnit.setProp("offsetFetchEpochTime", "0");
        workUnit.setProp("previousOffsetFetchEpochTime", "0");
        return workUnit;
    }

    @Test
    public void testGetContainerCapacityForTopic() {
        KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy containerCapacityComputationStrategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MIN;
        List asList = Arrays.asList(Double.valueOf(1.2d), Double.valueOf(1.4d), Double.valueOf(1.3d), Double.valueOf(1.4d), Double.valueOf(1.2d));
        Assert.assertEquals(KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(asList, containerCapacityComputationStrategy), 1.2d, 1.0E-6d);
        Assert.assertEquals(KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(asList, KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MAX), 1.4d, 1.0E-6d);
        Assert.assertEquals(KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(asList, KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEDIAN), 1.3d, 1.0E-6d);
        Assert.assertEquals(KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(asList, KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEAN), 1.3d, 1.0E-6d);
        Assert.assertEquals(KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(Arrays.asList(Double.valueOf(1.2d), Double.valueOf(1.4d), Double.valueOf(1.3d), Double.valueOf(1.4d)), KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEDIAN), 1.35d, 1.0E-6d);
    }
}
