/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.cli.commands;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.CompactionAdminClient;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import scala.collection.JavaConversions;
import scala.collection.Map;

@Component
public class CompactionCommand
implements CommandMarker {
    private static Logger log = LogManager.getLogger(CompactionCommand.class);
    private static final String TMP_DIR = "/tmp/";

    @CliAvailabilityIndicator(value={"compactions show all", "compaction show", "compaction run", "compaction schedule"})
    public boolean isAvailable() {
        return HoodieCLI.tableMetadata != null && HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ;
    }

    @CliCommand(value={"compactions show all"}, help="Shows all compactions that are in active timeline")
    public String compactionsAll(@CliOption(key={"includeExtraMetadata"}, help="Include extra metadata", unspecifiedDefaultValue="false") boolean includeExtraMetadata, @CliOption(key={"limit"}, mandatory=false, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws IOException {
        HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
        HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline();
        HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
        Set committed = commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
        List instants = timeline.getInstants().collect(Collectors.toList());
        ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
        Collections.reverse(instants);
        for (int i = 0; i < instants.size(); ++i) {
            HoodieInstant instant = (HoodieInstant)instants.get(i);
            HoodieCompactionPlan workload = null;
            if (!instant.getAction().equals("compaction")) {
                try {
                    workload = AvroUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant((String)instant.getTimestamp())).get()));
                }
                catch (HoodieIOException hoodieIOException) {}
            } else {
                workload = AvroUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant((String)instant.getTimestamp())).get()));
            }
            if (null == workload) continue;
            HoodieInstant.State state = instant.getState();
            if (committed.contains(instant.getTimestamp())) {
                state = HoodieInstant.State.COMPLETED;
            }
            if (includeExtraMetadata) {
                rows.add(new Comparable[]{instant.getTimestamp(), state.toString(), Integer.valueOf(workload.getOperations() == null ? 0 : workload.getOperations().size()), workload.getExtraMetadata().toString()});
                continue;
            }
            rows.add(new Comparable[]{instant.getTimestamp(), state.toString(), Integer.valueOf(workload.getOperations() == null ? 0 : workload.getOperations().size())});
        }
        HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
        TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State").addTableHeaderField("Total FileIds to be Compacted");
        if (includeExtraMetadata) {
            header = header.addTableHeaderField("Extra Metadata");
        }
        return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
    }

    @CliCommand(value={"compaction show"}, help="Shows compaction details for a specific compaction instant")
    public String compactionShow(@CliOption(key={"instant"}, mandatory=true, help="Base path for the target hoodie dataset") String compactionInstantTime, @CliOption(key={"limit"}, mandatory=false, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
        HoodieCompactionPlan workload = AvroUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant((String)compactionInstantTime)).get()));
        ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
        if (null != workload && null != workload.getOperations()) {
            for (HoodieCompactionOperation op : workload.getOperations()) {
                rows.add(new Comparable[]{op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(), Integer.valueOf(op.getDeltaFilePaths().size()), op.getMetrics() == null ? "" : op.getMetrics().toString()});
            }
        }
        HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
        TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id").addTableHeaderField("Base Instant").addTableHeaderField("Data File Path").addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics");
        return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
    }

    @CliCommand(value={"compaction schedule"}, help="Schedule Compaction")
    public String scheduleCompact(@CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="1G", help="Spark executor memory") String sparkMemory) throws Exception {
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
        if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_SCHEDULE.toString(), HoodieCLI.tableMetadata.getBasePath(), HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, sparkMemory});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                return "Failed to run compaction for " + compactionInstantTime;
            }
            return "Compaction successfully completed for " + compactionInstantTime;
        }
        throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
    }

    @CliCommand(value={"compaction run"}, help="Run Compaction for given instant time")
    public String compact(@CliOption(key={"parallelism"}, mandatory=true, help="Parallelism for hoodie compaction") String parallelism, @CliOption(key={"schemaFilePath"}, mandatory=true, help="Path for Avro schema file") String schemaFilePath, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="4G", help="Spark executor memory") String sparkMemory, @CliOption(key={"retry"}, unspecifiedDefaultValue="1", help="Number of retries") String retry, @CliOption(key={"compactionInstant"}, mandatory=false, help="Base path for the target hoodie dataset") String compactionInstantTime) throws Exception {
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
            if (null == compactionInstantTime) {
                Option firstPendingInstant = HoodieCLI.tableMetadata.reloadActiveTimeline().filterCompletedAndCompactionInstants().filter(instant -> instant.getAction().equals("compaction")).firstInstant().map(HoodieInstant::getTimestamp);
                if (!firstPendingInstant.isPresent()) {
                    return "NO PENDING COMPACTION TO RUN";
                }
                compactionInstantTime = (String)firstPendingInstant.get();
            }
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_RUN.toString(), HoodieCLI.tableMetadata.getBasePath(), HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, sparkMemory, retry});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                return "Failed to run compaction for " + compactionInstantTime;
            }
            return "Compaction successfully completed for " + compactionInstantTime;
        }
        throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
    }

    private static String getTmpSerializerFile() {
        return TMP_DIR + UUID.randomUUID().toString() + ".ser";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T deSerializeOperationResult(String inputP, FileSystem fs) throws Exception {
        Path inputPath = new Path(inputP);
        FSDataInputStream fsDataInputStream = fs.open(inputPath);
        ObjectInputStream in = new ObjectInputStream((InputStream)fsDataInputStream);
        try {
            Object result = in.readObject();
            log.info((Object)("Result : " + result));
            Object object = result;
            return (T)object;
        }
        finally {
            in.close();
            fsDataInputStream.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction validate"}, help="Validate Compaction")
    public String validateCompaction(@CliOption(key={"instant"}, mandatory=true, help="Compaction Instant") String compactionInstant, @CliOption(key={"parallelism"}, unspecifiedDefaultValue="3", help="Parallelism") String parallelism, @CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="", help="Spark Master ") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="2G", help="executor memory") String sparkMemory, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        String output = null;
        if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
            try {
                String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
                SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
                sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_VALIDATE.toString(), HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master, sparkMemory});
                Process process = sparkLauncher.launch();
                InputStreamConsumer.captureOutput(process);
                int exitCode = process.waitFor();
                if (exitCode != 0) {
                    String string = "Failed to validate compaction for " + compactionInstant;
                    return string;
                }
                List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
                boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true);
                String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
                ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
                res.stream().forEach(r -> {
                    Comparable[] row = new Comparable[]{((CompactionOperation)r.getOperation()).getFileId(), ((CompactionOperation)r.getOperation()).getBaseInstantTime(), ((CompactionOperation)r.getOperation()).getDataFilePath().isPresent() ? (Comparable)((CompactionOperation)r.getOperation()).getDataFilePath().get() : "", Integer.valueOf(((CompactionOperation)r.getOperation()).getDeltaFilePaths().size()), Boolean.valueOf(r.isSuccess()), r.getException().isPresent() ? ((Exception)r.getException().get()).getMessage() : ""};
                    rows.add(row);
                });
                HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
                TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Base Instant Time").addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid").addTableHeaderField("Error");
                output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
            }
            finally {
                if (HoodieCLI.fs.exists(outputPath)) {
                    HoodieCLI.fs.delete(outputPath, false);
                }
            }
            return output;
        }
        throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction unschedule"}, help="Unschedule Compaction")
    public String unscheduleCompaction(@CliOption(key={"instant"}, mandatory=true, help="Compaction Instant") String compactionInstant, @CliOption(key={"parallelism"}, unspecifiedDefaultValue="3", help="Parallelism") String parallelism, @CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="", help="Spark Master ") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="2G", help="executor memory") String sparkMemory, @CliOption(key={"skipValidation"}, help="skip validation", unspecifiedDefaultValue="false") boolean skipV, @CliOption(key={"dryRun"}, help="Dry Run Mode", unspecifiedDefaultValue="false") boolean dryRun, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        String output = "";
        if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
            try {
                String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
                SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
                sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()});
                Process process = sparkLauncher.launch();
                InputStreamConsumer.captureOutput(process);
                int exitCode = process.waitFor();
                if (exitCode != 0) {
                    String string = "Failed to unschedule compaction for " + compactionInstant;
                    return string;
                }
                List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
                output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction");
            }
            finally {
                if (HoodieCLI.fs.exists(outputPath)) {
                    HoodieCLI.fs.delete(outputPath, false);
                }
            }
            return output;
        }
        throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction unscheduleFileId"}, help="UnSchedule Compaction for a fileId")
    public String unscheduleCompactFile(@CliOption(key={"fileId"}, mandatory=true, help="File Id") String fileId, @CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="", help="Spark Master ") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="2G", help="executor memory") String sparkMemory, @CliOption(key={"skipValidation"}, help="skip validation", unspecifiedDefaultValue="false") boolean skipV, @CliOption(key={"dryRun"}, help="Dry Run Mode", unspecifiedDefaultValue="false") boolean dryRun, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        String output = "";
        if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
            try {
                String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
                SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
                sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), HoodieCLI.tableMetadata.getBasePath(), fileId, outputPathStr, "1", master, sparkMemory, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()});
                Process process = sparkLauncher.launch();
                InputStreamConsumer.captureOutput(process);
                int exitCode = process.waitFor();
                if (exitCode != 0) {
                    String string = "Failed to unschedule compaction for file " + fileId;
                    return string;
                }
                List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
                output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule file from pending compaction");
            }
            finally {
                if (HoodieCLI.fs.exists(outputPath)) {
                    HoodieCLI.fs.delete(outputPath, false);
                }
            }
            return output;
        }
        throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction repair"}, help="Renames the files to make them consistent with the timeline as dictated by Hoodie metadata. Use when compaction unschedule fails partially.")
    public String repairCompaction(@CliOption(key={"instant"}, mandatory=true, help="Compaction Instant") String compactionInstant, @CliOption(key={"parallelism"}, unspecifiedDefaultValue="3", help="Parallelism") String parallelism, @CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="", help="Spark Master ") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="2G", help="executor memory") String sparkMemory, @CliOption(key={"dryRun"}, help="Dry Run Mode", unspecifiedDefaultValue="false") boolean dryRun, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        String output = "";
        if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
            try {
                String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
                SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
                sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_REPAIR.toString(), HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(dryRun).toString()});
                Process process = sparkLauncher.launch();
                InputStreamConsumer.captureOutput(process);
                int exitCode = process.waitFor();
                if (exitCode != 0) {
                    String string = "Failed to unschedule compaction for " + compactionInstant;
                    return string;
                }
                List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
                output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
            }
            finally {
                if (HoodieCLI.fs.exists(outputPath)) {
                    HoodieCLI.fs.delete(outputPath, false);
                }
            }
            return output;
        }
        throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
    }

    private String getRenamesToBePrinted(List<CompactionAdminClient.RenameOpResult> res, Integer limit, String sortByField, boolean descending, boolean headerOnly, String operation) {
        Option result = Option.fromJavaOptional(res.stream().map(r -> r.isExecuted() && r.isSuccess()).reduce(Boolean::logicalAnd));
        if (result.isPresent()) {
            System.out.println("There were some file renames that needed to be done to " + operation);
            if (((Boolean)result.get()).booleanValue()) {
                System.out.println("All renames successfully completed to " + operation + " done !!");
            } else {
                System.out.println("Some renames failed. DataSet could be in inconsistent-state. Try running compaction repair");
            }
            ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
            res.stream().forEach(r -> {
                Comparable[] row = new Comparable[]{((CompactionAdminClient.RenameInfo)r.getOperation()).fileId, ((CompactionAdminClient.RenameInfo)r.getOperation()).srcPath, ((CompactionAdminClient.RenameInfo)r.getOperation()).destPath, Boolean.valueOf(r.isExecuted()), Boolean.valueOf(r.isSuccess()), r.getException().isPresent() ? ((Exception)r.getException().get()).getMessage() : ""};
                rows.add(row);
            });
            HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
            TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Source File Path").addTableHeaderField("Destination File Path").addTableHeaderField("Rename Executed?").addTableHeaderField("Rename Succeeded?").addTableHeaderField("Error");
            return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
        }
        return "No File renames needed to " + operation + ". Operation successful.";
    }
}

