package org.apache.hudi.utilities.deltastreamer;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.ResourceBundle;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.JsonDFSSource;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.class */
public class HoodieDeltaStreamer implements Serializable {
    private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class);
    public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    private final transient Config cfg;
    private transient DeltaSyncService deltaSyncService;

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer$AsyncCompactService.class */
    public static class AsyncCompactService extends AbstractDeltaStreamerService {
        private transient Compactor compactor;
        private transient JavaSparkContext jssc;
        private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue();
        private transient ReentrantLock queueLock = new ReentrantLock();
        private transient Condition consumed = this.queueLock.newCondition();
        private final int maxConcurrentCompaction = 1;

        public AsyncCompactService(JavaSparkContext javaSparkContext, HoodieWriteClient hoodieWriteClient) {
            this.jssc = javaSparkContext;
            this.compactor = new Compactor(hoodieWriteClient, javaSparkContext);
        }

        public void enqueuePendingCompaction(HoodieInstant hoodieInstant) {
            this.pendingCompactions.add(hoodieInstant);
        }

        public void waitTillPendingCompactionsReducesTo(int i) throws InterruptedException {
            try {
                this.queueLock.lock();
                while (!isShutdown() && this.pendingCompactions.size() > i) {
                    this.consumed.await();
                }
            } finally {
                this.queueLock.unlock();
            }
        }

        private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
            HoodieDeltaStreamer.LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
            HoodieInstant poll = this.pendingCompactions.poll(60L, TimeUnit.SECONDS);
            if (poll != null) {
                try {
                    this.queueLock.lock();
                    this.consumed.signal();
                } finally {
                    this.queueLock.unlock();
                }
            }
            return poll;
        }

        @Override // org.apache.hudi.utilities.deltastreamer.AbstractDeltaStreamerService
        protected Pair<CompletableFuture, ExecutorService> startService() {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.maxConcurrentCompaction);
            return Pair.of(CompletableFuture.allOf((CompletableFuture[]) ((List) IntStream.range(0, this.maxConcurrentCompaction).mapToObj(i -> {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        HoodieDeltaStreamer.LOG.info("Setting Spark Pool name for compaction to hoodiecompact");
                        this.jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME);
                        while (!isShutdownRequested()) {
                            HoodieInstant fetchNextCompactionInstant = fetchNextCompactionInstant();
                            if (null != fetchNextCompactionInstant) {
                                this.compactor.compact(fetchNextCompactionInstant);
                            }
                        }
                        HoodieDeltaStreamer.LOG.info("Compactor shutting down properly!!");
                    } catch (IOException e) {
                        HoodieDeltaStreamer.LOG.error("Compactor executor failed", e);
                        throw new HoodieIOException(e.getMessage(), e);
                    } catch (InterruptedException e2) {
                        HoodieDeltaStreamer.LOG.warn("Compactor executor thread got interrupted exception. Stopping", e2);
                    }
                    return true;
                }, newFixedThreadPool);
            }).collect(Collectors.toList())).stream().toArray(i2 -> {
                return new CompletableFuture[i2];
            })), newFixedThreadPool);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie table. (Will be created if did not exist first time around. If exists, expected to be a hoodie table)", required = true)
        public String targetBasePath;

        @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
        public String targetTableName;

        @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
        public String tableType;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties.")
        public String propsFilePath = "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
        public List<String> configs = new ArrayList();

        @Parameter(names = {"--source-class"}, description = "Subclass of org.apache.hudi.utilities.sources to read data. Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
        public String sourceClassName = JsonDFSSource.class.getName();

        @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
        public String sourceOrderingField = "ts";

        @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
        public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();

        @Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach schemas to input & target table data, built in options: org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See org.apache.hudi.utilities.sources.Source) implementation can implement their own SchemaProvider. For Sources that return Dataset<Row>, the schema is obtained implicitly. However, this CLI option allows overriding the schemaprovider returned by Source.")
        public String schemaProviderClassName = null;

        @Parameter(names = {"--transformer-class"}, description = "subclass of org.apache.hudi.utilities.transform.Transformer. Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which allows a SQL query templated to be passed as a transformation function)")
        public String transformerClassName = null;

        @Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. Default: No limit For e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read")
        public long sourceLimit = Long.MAX_VALUE;

        @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input is purely new data/inserts to gain speed)", converter = OperationConvertor.class)
        public Operation operation = Operation.UPSERT;

        @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
        public Boolean filterDupes = false;

        @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
        public Boolean enableHiveSync = false;

        @Parameter(names = {"--max-pending-compactions"}, description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unlessoutstanding compactions is less than this number")
        public Integer maxPendingCompactions = 5;

        @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running source-fetch -> Transform -> Hudi Write in loop")
        public Boolean continuousMode = false;

        @Parameter(names = {"--min-sync-interval-seconds"}, description = "the min sync interval of each sync in continuous mode")
        public Integer minSyncIntervalSeconds = 0;

        @Parameter(names = {"--spark-master"}, description = "spark master to use.")
        public String sparkMaster = "local[2]";

        @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
        public Boolean commitOnErrors = false;

        @Parameter(names = {"--delta-sync-scheduling-weight"}, description = "Scheduling weight for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingWeight = 1;

        @Parameter(names = {"--compact-scheduling-weight"}, description = "Scheduling weight for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingWeight = 1;

        @Parameter(names = {"--delta-sync-scheduling-minshare"}, description = "Minshare for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingMinShare = 0;

        @Parameter(names = {"--compact-scheduling-minshare"}, description = "Minshare for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingMinShare = 0;

        @Parameter(names = {"--disable-compaction"}, description = "Compaction is enabled for MoR table by default. This flag disables it ")
        public Boolean forceDisableCompaction = false;

        @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
        public String checkpoint = null;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        public boolean isAsyncCompactionEnabled() {
            return this.continuousMode.booleanValue() && !this.forceDisableCompaction.booleanValue() && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(this.tableType));
        }

        public boolean isInlineCompactionEnabled() {
            return (this.continuousMode.booleanValue() || this.forceDisableCompaction.booleanValue() || !HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(this.tableType))) ? false : true;
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer$DeltaSyncService.class */
    public static class DeltaSyncService extends AbstractDeltaStreamerService {
        private final Config cfg;
        private transient SchemaProvider schemaProvider;
        private transient SparkSession sparkSession;
        private transient JavaSparkContext jssc;
        TypedProperties props;
        private AsyncCompactService asyncCompactService;
        private final HoodieTableType tableType;
        private transient DeltaSync deltaSync;

        public DeltaSyncService(Config config, JavaSparkContext javaSparkContext, FileSystem fileSystem, HiveConf hiveConf) throws IOException {
            this.cfg = config;
            this.jssc = javaSparkContext;
            this.sparkSession = SparkSession.builder().config(javaSparkContext.getConf()).getOrCreate();
            if (fileSystem.exists(new Path(config.targetBasePath))) {
                this.tableType = new HoodieTableMetaClient(new Configuration(fileSystem.getConf()), config.targetBasePath, false).getTableType();
                Preconditions.checkArgument(this.tableType.equals(HoodieTableType.valueOf(config.tableType)), "Hoodie table is of type " + this.tableType + " but passed in CLI argument is " + config.tableType);
            } else {
                this.tableType = HoodieTableType.valueOf(config.tableType);
            }
            this.props = UtilHelpers.readConfig(fileSystem, new Path(config.propsFilePath), config.configs).getConfig();
            HoodieDeltaStreamer.LOG.info("Creating delta streamer with configs : " + this.props.toString());
            this.schemaProvider = UtilHelpers.createSchemaProvider(config.schemaProviderClassName, this.props, javaSparkContext);
            if (config.filterDupes.booleanValue()) {
                config.operation = config.operation == Operation.UPSERT ? Operation.INSERT : config.operation;
            }
            this.deltaSync = new DeltaSync(config, this.sparkSession, this.schemaProvider, this.tableType, this.props, javaSparkContext, fileSystem, hiveConf, this::onInitializingWriteClient);
        }

        public DeltaSync getDeltaSync() {
            return this.deltaSync;
        }

        @Override // org.apache.hudi.utilities.deltastreamer.AbstractDeltaStreamerService
        protected Pair<CompletableFuture, ExecutorService> startService() {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                boolean z = false;
                if (this.cfg.isAsyncCompactionEnabled()) {
                    HoodieDeltaStreamer.LOG.info("Setting Spark Pool name for delta-sync to hoodiedeltasync");
                    this.jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME);
                }
                while (!isShutdownRequested()) {
                    try {
                        try {
                            long currentTimeMillis = System.currentTimeMillis();
                            Option<String> syncOnce = this.deltaSync.syncOnce();
                            if (syncOnce.isPresent()) {
                                HoodieDeltaStreamer.LOG.info("Enqueuing new pending compaction instant (" + syncOnce + ")");
                                this.asyncCompactService.enqueuePendingCompaction(new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", (String) syncOnce.get()));
                                this.asyncCompactService.waitTillPendingCompactionsReducesTo(this.cfg.maxPendingCompactions.intValue());
                            }
                            long intValue = (this.cfg.minSyncIntervalSeconds.intValue() * 1000) - (System.currentTimeMillis() - currentTimeMillis);
                            if (intValue > 0) {
                                HoodieDeltaStreamer.LOG.info("Last sync ran less than min sync interval: " + this.cfg.minSyncIntervalSeconds + " s, sleep: " + intValue + " ms.");
                                Thread.sleep(intValue);
                            }
                        } catch (Exception e) {
                            HoodieDeltaStreamer.LOG.error("Shutting down delta-sync due to exception", e);
                            z = true;
                            throw new HoodieException(e.getMessage(), e);
                        }
                    } catch (Throwable th) {
                        shutdownCompactor(z);
                        throw th;
                    }
                }
                shutdownCompactor(false);
                return true;
            }, newFixedThreadPool), newFixedThreadPool);
        }

        private void shutdownCompactor(boolean z) {
            HoodieDeltaStreamer.LOG.info("Delta Sync shutdown. Error ?" + z);
            if (this.asyncCompactService != null) {
                HoodieDeltaStreamer.LOG.warn("Gracefully shutting down compactor");
                this.asyncCompactService.shutdown(false);
            }
        }

        protected Boolean onInitializingWriteClient(HoodieWriteClient hoodieWriteClient) {
            if (this.cfg.isAsyncCompactionEnabled()) {
                this.asyncCompactService = new AsyncCompactService(this.jssc, hoodieWriteClient);
                CompactionUtils.getPendingCompactionInstantTimes(new HoodieTableMetaClient(new Configuration(this.jssc.hadoopConfiguration()), this.cfg.targetBasePath, true)).stream().forEach(hoodieInstant -> {
                    this.asyncCompactService.enqueuePendingCompaction(hoodieInstant);
                });
                this.asyncCompactService.start(bool -> {
                    shutdown(false);
                    return true;
                });
                try {
                    this.asyncCompactService.waitTillPendingCompactionsReducesTo(this.cfg.maxPendingCompactions.intValue());
                } catch (InterruptedException e) {
                    throw new HoodieException(e);
                }
            }
            return true;
        }

        public void close() {
            if (null != this.deltaSync) {
                this.deltaSync.close();
            }
        }

        public SchemaProvider getSchemaProvider() {
            return this.schemaProvider;
        }

        public SparkSession getSparkSession() {
            return this.sparkSession;
        }

        public JavaSparkContext getJavaSparkContext() {
            return this.jssc;
        }

        public AsyncCompactService getAsyncCompactService() {
            return this.asyncCompactService;
        }

        public TypedProperties getProps() {
            return this.props;
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer$Operation.class */
    public enum Operation {
        UPSERT,
        INSERT,
        BULK_INSERT
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer$OperationConvertor.class */
    private static class OperationConvertor implements IStringConverter<Operation> {
        private OperationConvertor() {
        }

        /* renamed from: convert, reason: merged with bridge method [inline-methods] */
        public Operation m15convert(String str) throws ParameterException {
            return Operation.valueOf(str);
        }
    }

    public HoodieDeltaStreamer(Config config, JavaSparkContext javaSparkContext) throws IOException {
        this(config, javaSparkContext, FSUtils.getFs(config.targetBasePath, javaSparkContext.hadoopConfiguration()), getDefaultHiveConf(javaSparkContext.hadoopConfiguration()));
    }

    public HoodieDeltaStreamer(Config config, JavaSparkContext javaSparkContext, FileSystem fileSystem, HiveConf hiveConf) throws IOException {
        this.cfg = config;
        this.deltaSyncService = new DeltaSyncService(config, javaSparkContext, fileSystem, hiveConf);
    }

    public void shutdownGracefully() {
        this.deltaSyncService.shutdown(false);
    }

    private static HiveConf getDefaultHiveConf(Configuration configuration) {
        HiveConf hiveConf = new HiveConf();
        hiveConf.addResource(configuration);
        return hiveConf;
    }

    public void sync() throws Exception {
        if (this.cfg.continuousMode.booleanValue()) {
            this.deltaSyncService.start((v1) -> {
                return onDeltaSyncShutdown(v1);
            });
            this.deltaSyncService.waitForShutdown();
            LOG.info("Delta Sync shutting down");
            return;
        }
        LOG.info("Delta Streamer running only single round");
        try {
            try {
                this.deltaSyncService.getDeltaSync().syncOnce();
                this.deltaSyncService.close();
                LOG.info("Shut down deltastreamer");
            } catch (Exception e) {
                LOG.error("Got error running delta sync once. Shutting down", e);
                throw e;
            }
        } catch (Throwable th) {
            this.deltaSyncService.close();
            LOG.info("Shut down deltastreamer");
            throw th;
        }
    }

    private boolean onDeltaSyncShutdown(boolean z) {
        LOG.info("DeltaSync shutdown. Closing write client. Error?" + z);
        this.deltaSyncService.close();
        return true;
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, (ResourceBundle) null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("delta-streamer-" + config.targetTableName, config.sparkMaster, SchedulerConfGenerator.getSparkSchedulingConfigs(config));
        try {
            new HoodieDeltaStreamer(config, buildSparkContext).sync();
            buildSparkContext.stop();
        } catch (Throwable th) {
            buildSparkContext.stop();
            throw th;
        }
    }

    public DeltaSyncService getDeltaSyncService() {
        return this.deltaSyncService;
    }
}
