package org.apache.hudi.testutils;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;

/* loaded from: input_file:org/apache/hudi/testutils/HoodieFlinkClientTestHarness.class */
public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implements Serializable {
    protected static final Logger LOG = LogManager.getLogger(HoodieFlinkClientTestHarness.class);
    private String testMethodName;
    protected transient FileSystem fs;
    protected transient ExecutorService executorService;
    protected transient HoodieFlinkWriteClient writeClient;
    protected transient HoodieTableFileSystemView tableView;
    protected transient HdfsTestService hdfsTestService;
    protected transient MiniDFSCluster dfsCluster;
    protected transient DistributedFileSystem dfs;
    protected transient Configuration hadoopConf = null;
    protected transient MiniClusterWithClientResource flinkCluster = null;
    protected transient HoodieFlinkEngineContext context = null;
    protected final FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier((RuntimeContext) null);

    /* loaded from: input_file:org/apache/hudi/testutils/HoodieFlinkClientTestHarness$SimpleTestSinkFunction.class */
    public static class SimpleTestSinkFunction implements SinkFunction<HoodieRecord> {
        public static List<HoodieRecord> valuesList = new ArrayList();

        public synchronized void invoke(HoodieRecord hoodieRecord, SinkFunction.Context context) throws Exception {
            valuesList.add(hoodieRecord);
        }
    }

    @BeforeEach
    public void setTestMethodName(TestInfo testInfo) {
        if (testInfo.getTestMethod().isPresent()) {
            this.testMethodName = ((Method) testInfo.getTestMethod().get()).getName();
        } else {
            this.testMethodName = "Unknown";
        }
    }

    protected void initFlinkMiniCluster() {
        this.flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(2).setNumberTaskManagers(1).build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initFileSystem() {
        this.hadoopConf = new Configuration();
        initFileSystemWithConfiguration(this.hadoopConf);
        this.context = new HoodieFlinkEngineContext(this.supplier);
    }

    private void initFileSystemWithConfiguration(Configuration configuration) {
        if (this.basePath == null) {
            throw new IllegalStateException("The base path has not been initialized.");
        }
        this.fs = FSUtils.getFs(this.basePath, configuration);
        if (this.fs instanceof LocalFileSystem) {
            this.fs.setVerifyChecksum(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initMetaClient() throws IOException {
        initMetaClient(getTableType());
    }

    protected void initMetaClient(HoodieTableType hoodieTableType) throws IOException {
        if (this.basePath == null) {
            throw new IllegalStateException("The base path has not been initialized.");
        }
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, hoodieTableType);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<HoodieRecord> tagLocation(HoodieIndex hoodieIndex, List<HoodieRecord> list, HoodieTable hoodieTable) {
        return HoodieList.getList(hoodieIndex.tagLocation(HoodieList.of(list), this.context, hoodieTable));
    }

    protected void cleanupFileSystem() throws IOException {
        if (this.fs != null) {
            LOG.warn("Closing file-system instance used in previous test-run");
            this.fs.close();
            this.fs = null;
        }
    }

    public void cleanupResources() throws IOException {
        cleanupClients();
        cleanupFlinkContexts();
        cleanupTestDataGenerator();
        cleanupFileSystem();
        cleanupDFS();
        cleanupExecutorService();
        System.gc();
    }

    protected void cleanupFlinkMiniCluster() {
        if (this.flinkCluster != null) {
            this.flinkCluster.after();
            this.flinkCluster = null;
        }
    }

    protected void cleanupClients() throws IOException {
        if (this.metaClient != null) {
            this.metaClient = null;
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
        if (this.tableView != null) {
            this.tableView.close();
            this.tableView = null;
        }
    }

    protected void cleanupDFS() throws IOException {
        if (this.hdfsTestService != null) {
            this.hdfsTestService.stop();
            this.dfsCluster.shutdown();
            this.hdfsTestService = null;
            this.dfsCluster = null;
            this.dfs = null;
        }
        FileSystem.closeAll();
    }

    protected void cleanupExecutorService() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.executorService = null;
        }
    }

    protected void cleanupFlinkContexts() {
        if (this.context != null) {
            LOG.info("Closing flink engine context used in previous test-case");
            this.context = null;
        }
    }
}
