package org.apache.druid.indexing.worker.shuffle;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Injector;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.loading.LoadSpec;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.utils.CompressionUtils;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.class */
public class ShuffleDataSegmentPusherTest {
    private static final String LOCAL = "local";
    private static final String DEEPSTORE = "deepstore";

    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private IntermediaryDataManager intermediaryDataManager;
    private ShuffleDataSegmentPusher segmentPusher;
    private ObjectMapper mapper;
    private final String intermediateDataStore;
    private File localDeepStore;

    @Parameterized.Parameters(name = "intermediateDataManager={0}")
    public static Collection<Object[]> data() {
        return ImmutableList.of(new Object[]{LOCAL}, new Object[]{DEEPSTORE});
    }

    public ShuffleDataSegmentPusherTest(String str) {
        this.intermediateDataStore = str;
    }

    @Before
    public void setup() throws IOException {
        WorkerConfig workerConfig = new WorkerConfig();
        TaskConfig taskConfig = new TaskConfig((String) null, (String) null, (String) null, (Integer) null, (List) null, false, (Period) null, (Period) null, ImmutableList.of(new StorageLocationConfig(this.temporaryFolder.newFolder(), (HumanReadableBytes) null, (Double) null)), false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), (Boolean) null);
        NoopIndexingServiceClient noopIndexingServiceClient = new NoopIndexingServiceClient();
        if (LOCAL.equals(this.intermediateDataStore)) {
            this.intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, noopIndexingServiceClient);
        } else if (DEEPSTORE.equals(this.intermediateDataStore)) {
            this.localDeepStore = this.temporaryFolder.newFolder("localStorage");
            this.intermediaryDataManager = new DeepStorageIntermediaryDataManager(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig() { // from class: org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusherTest.1
                public File getStorageDirectory() {
                    return ShuffleDataSegmentPusherTest.this.localDeepStore;
                }
            }));
        }
        this.intermediaryDataManager.start();
        this.segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", this.intermediaryDataManager);
        Injector makeStartupInjectorWithModules = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(binder -> {
            binder.bind(LocalDataSegmentPuller.class);
        }));
        this.mapper = new DefaultObjectMapper();
        this.mapper.registerModule(new SimpleModule("loadSpecTest").registerSubtypes(new Class[]{LocalLoadSpec.class}));
        this.mapper.setInjectableValues(new GuiceInjectableValues(makeStartupInjectorWithModules));
        GuiceAnnotationIntrospector guiceAnnotationIntrospector = new GuiceAnnotationIntrospector();
        this.mapper.setAnnotationIntrospectors(new AnnotationIntrospectorPair(guiceAnnotationIntrospector, this.mapper.getSerializationConfig().getAnnotationIntrospector()), new AnnotationIntrospectorPair(guiceAnnotationIntrospector, this.mapper.getDeserializationConfig().getAnnotationIntrospector()));
    }

    @After
    public void teardown() {
        this.intermediaryDataManager.stop();
    }

    @Test
    public void testPush() throws IOException, SegmentLoadingException {
        File generateSegmentDir = generateSegmentDir();
        DataSegment newSegment = newSegment(Intervals.of("2018/2019"));
        DataSegment push = this.segmentPusher.push(generateSegmentDir, newSegment, true);
        Assert.assertEquals(9L, push.getBinaryVersion().intValue());
        Assert.assertEquals(14L, push.getSize());
        File newFolder = this.temporaryFolder.newFolder();
        if (this.intermediaryDataManager instanceof LocalIntermediaryDataManager) {
            Optional findPartitionFile = this.intermediaryDataManager.findPartitionFile("supervisorTaskId", "subTaskId", newSegment.getInterval(), newSegment.getShardSpec().getPartitionNum());
            Assert.assertTrue(findPartitionFile.isPresent());
            CompressionUtils.unzip((ByteSource) findPartitionFile.get(), newFolder, FileUtils.IS_EXCEPTION, false);
        } else if (this.intermediaryDataManager instanceof DeepStorageIntermediaryDataManager) {
            LoadSpec loadSpec = (LoadSpec) this.mapper.convertValue(push.getLoadSpec(), LoadSpec.class);
            Assert.assertTrue(push.getLoadSpec().get("path").toString().startsWith(this.localDeepStore.getAbsolutePath() + "/shuffle-data"));
            loadSpec.loadSegment(newFolder);
        }
        List asList = Arrays.asList(newFolder.listFiles());
        asList.sort(Comparator.comparing((v0) -> {
            return v0.getName();
        }));
        File file = (File) asList.get(0);
        Assert.assertEquals("test", file.getName());
        Assert.assertEquals("test data.", Files.readFirstLine(file, StandardCharsets.UTF_8));
        File file2 = (File) asList.get(1);
        Assert.assertEquals("version.bin", file2.getName());
        Assert.assertArrayEquals(Ints.toByteArray(9), Files.toByteArray(file2));
    }

    private File generateSegmentDir() throws IOException {
        File newFolder = this.temporaryFolder.newFolder();
        Files.asByteSink(new File(newFolder, "version.bin"), new FileWriteMode[0]).write(Ints.toByteArray(9));
        org.apache.commons.io.FileUtils.write(new File(newFolder, "test"), "test data.", StandardCharsets.UTF_8);
        return newFolder;
    }

    private DataSegment newSegment(Interval interval) {
        BucketNumberedShardSpec bucketNumberedShardSpec = (BucketNumberedShardSpec) Mockito.mock(BucketNumberedShardSpec.class);
        Mockito.when(Integer.valueOf(bucketNumberedShardSpec.getBucketId())).thenReturn(0);
        return new DataSegment("dataSource", interval, "version", (Map) null, (List) null, (List) null, bucketNumberedShardSpec, 9, 0L);
    }
}
