package org.apache.hudi.client.functional;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.class */
public class TestRemoteFileSystemViewWithMetadataTable extends HoodieClientTestHarness {
    private static final Logger LOG = LogManager.getLogger(TestRemoteFileSystemViewWithMetadataTable.class);

    /* loaded from: input_file:org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable$TestViewLookUpCallable.class */
    class TestViewLookUpCallable implements Callable<Boolean> {
        private final RemoteHoodieTableFileSystemView view;
        private final Pair<String, String> partitionFileIdPair;
        private final String expectedCommitTime;

        public TestViewLookUpCallable(RemoteHoodieTableFileSystemView remoteHoodieTableFileSystemView, Pair<String, String> pair, String str) {
            this.view = remoteHoodieTableFileSystemView;
            this.partitionFileIdPair = pair;
            this.expectedCommitTime = str;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            Option latestFileSlice = this.view.getLatestFileSlice((String) this.partitionFileIdPair.getLeft(), (String) this.partitionFileIdPair.getRight());
            boolean z = latestFileSlice.isPresent() && this.expectedCommitTime.equals(FSUtils.getCommitTime(new Path(((HoodieBaseFile) ((FileSlice) latestFileSlice.get()).getBaseFile().get()).getPath()).getName()));
            if (!z) {
                TestRemoteFileSystemViewWithMetadataTable.LOG.error("The timeline server does not return the correct result: latestFileSliceReturned=" + latestFileSlice + " expectedCommitTime=" + this.expectedCommitTime);
            }
            return Boolean.valueOf(z);
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        initPath();
        initSparkContexts();
        initFileSystem();
        initMetaClient();
        initTimelineService();
        this.dataGen = new HoodieTestDataGenerator(8070L);
    }

    @AfterEach
    public void tearDown() throws Exception {
        cleanupTimelineService();
        cleanupClients();
        cleanupSparkContexts();
        cleanupFileSystem();
        cleanupExecutorService();
        this.dataGen = null;
        System.gc();
    }

    @Override // org.apache.hudi.testutils.HoodieClientTestHarness
    public void initTimelineService() {
        HoodieLocalEngineContext hoodieLocalEngineContext = new HoodieLocalEngineContext(this.metaClient.getHadoopConf());
        try {
            HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(incrementTimelineServicePortToUse())).build()).build();
            this.timelineService = new TimelineService(hoodieLocalEngineContext, new Configuration(), TimelineService.Config.builder().enableMarkerRequests(true).serverPort(build.getViewStorageConfig().getRemoteViewServerPort().intValue()).build(), FileSystem.get(new Configuration()), FileSystemViewManager.createViewManager(this.context, build.getMetadataConfig(), build.getViewStorageConfig(), build.getCommonConfig(), () -> {
                return new HoodieBackedTestDelayedTableMetadata(this.context, build.getMetadataConfig(), this.basePath, build.getViewStorageConfig().getSpillableDir(), true);
            }));
            this.timelineService.startService();
            timelineServicePort = this.timelineService.getServerPort();
            LOG.info("Started timeline server on port: " + timelineServicePort);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMORGetLatestFileSliceWithMetadataTable(boolean z) throws IOException {
        SparkRDDWriteClient createWriteClient = createWriteClient(z ? Option.of(this.timelineService) : Option.empty());
        for (int i = 0; i < 3; i++) {
            writeToTable(i, createWriteClient);
        }
        HoodieActiveTimeline activeTimeline = HoodieTableMetaClient.builder().setConf(this.metaClient.getHadoopConf()).setBasePath(this.basePath).build().getActiveTimeline();
        HoodieInstant hoodieInstant = (HoodieInstant) activeTimeline.lastInstant().get();
        Assertions.assertTrue(((HoodieInstant) activeTimeline.lastInstant().get()).getAction().equals("commit"));
        List list = (List) ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) activeTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class)).getPartitionToWriteStats().entrySet().stream().flatMap(entry -> {
            String str = (String) entry.getKey();
            return ((List) entry.getValue()).stream().map(hoodieWriteStat -> {
                return Pair.of(str, hoodieWriteStat.getFileId());
            });
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < 128) {
            arrayList.addAll(list);
        }
        int serverPort = z ? this.timelineService.getServerPort() : ((EmbeddedTimelineService) createWriteClient.getTimelineServer().get()).getRemoteFileSystemViewConfig().getRemoteViewServerPort().intValue();
        LOG.info("Connecting to Timeline Server: " + serverPort);
        RemoteHoodieTableFileSystemView remoteHoodieTableFileSystemView = new RemoteHoodieTableFileSystemView("localhost", serverPort, this.metaClient);
        List list2 = (List) arrayList.stream().map(pair -> {
            return new TestViewLookUpCallable(remoteHoodieTableFileSystemView, pair, hoodieInstant.getTimestamp());
        }).collect(Collectors.toList());
        ArrayList arrayList2 = new ArrayList();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        list2.forEach(testViewLookUpCallable -> {
            arrayList2.add(newCachedThreadPool.submit(testViewLookUpCallable));
        });
        Assertions.assertTrue(((Boolean) arrayList2.stream().map(future -> {
            try {
                return (Boolean) future.get();
            } catch (Exception e) {
                LOG.error(e);
                return false;
            }
        }).reduce((bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        }).get()).booleanValue());
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }

    private SparkRDDWriteClient createWriteClient(Option<TimelineService> option) {
        return new SparkRDDWriteClient(this.context, HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withMergeSmallFileGroupCandidatesLimit(0).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(3).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_ONLY).withRemoteServerPort(option.isPresent() ? Integer.valueOf(((TimelineService) option.get()).getServerPort()) : (Integer) FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue()).build()).withAutoCommit(false).forTable("test_mor_table").build(), option);
    }

    private void writeToTable(int i, SparkRDDWriteClient sparkRDDWriteClient) throws IOException {
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
        sparkRDDWriteClient.commit(createNewInstantTime, sparkRDDWriteClient.upsert(this.jsc.parallelize(i == 0 ? this.dataGen.generateInserts(createNewInstantTime, 100) : this.dataGen.generateUpdates(createNewInstantTime, 100), 1), createNewInstantTime), Option.empty(), "deltacommit", Collections.emptyMap());
        sparkRDDWriteClient.scheduleCompaction(Option.empty());
        sparkRDDWriteClient.runAnyPendingCompactions();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -514811601:
                if (implMethodName.equals("lambda$initTimelineService$93b8362d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/common/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/config/HoodieWriteConfig;)Lorg/apache/hudi/metadata/HoodieTableMetadata;")) {
                    TestRemoteFileSystemViewWithMetadataTable testRemoteFileSystemViewWithMetadataTable = (TestRemoteFileSystemViewWithMetadataTable) serializedLambda.getCapturedArg(0);
                    HoodieWriteConfig hoodieWriteConfig = (HoodieWriteConfig) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return new HoodieBackedTestDelayedTableMetadata(this.context, hoodieWriteConfig.getMetadataConfig(), this.basePath, hoodieWriteConfig.getViewStorageConfig().getSpillableDir(), true);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
