package org.apache.hudi;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.ResourceBundle;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.operator.InstantGenerateOperator;
import org.apache.hudi.operator.KeyedWriteProcessFunction;
import org.apache.hudi.operator.KeyedWriteProcessOperator;
import org.apache.hudi.sink.CommitSink;
import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
import org.apache.hudi.util.StreamerUtil;

/* loaded from: input_file:org/apache/hudi/HoodieFlinkStreamer.class */
public class HoodieFlinkStreamer {

    /* loaded from: input_file:org/apache/hudi/HoodieFlinkStreamer$Config.class */
    public static class Config extends Configuration {

        @Parameter(names = {"--kafka-topic"}, description = "kafka topic", required = true)
        public String kafkaTopic;

        @Parameter(names = {"--kafka-group-id"}, description = "kafka consumer group id", required = true)
        public String kafkaGroupId;

        @Parameter(names = {"--kafka-bootstrap-servers"}, description = "kafka bootstrap.servers", required = true)
        public String kafkaBootstrapServers;

        @Parameter(names = {"--flink-checkpoint-path"}, description = "flink checkpoint path")
        public String flinkCheckPointPath;

        @Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie table. (Will be created if did not exist first time around. If exists, expected to be a hoodie table)", required = true)
        public String targetBasePath;

        @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
        public String targetTableName;

        @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
        public String tableType;

        @Parameter(names = {"--flink-block-retry-times"}, description = "Times to retry when latest instant has not completed")
        public String blockRetryTime = "10";

        @Parameter(names = {"--flink-block-retry-interval"}, description = "Seconds between two tries when latest instant has not completed")
        public String blockRetryInterval = "1";

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties.")
        public String propsFilePath = "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter.")
        public List<String> configs = new ArrayList();

        @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
        public String sourceOrderingField = "ts";

        @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
        public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();

        @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input is purely new data/inserts to gain speed)", converter = OperationConverter.class)
        public WriteOperationType operation = WriteOperationType.UPSERT;

        @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
        public Boolean filterDupes = false;

        @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
        public Boolean commitOnErrors = false;

        @Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.")
        public Long checkpointInterval = 5000L;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;
    }

    /* loaded from: input_file:org/apache/hudi/HoodieFlinkStreamer$OperationConverter.class */
    private static class OperationConverter implements IStringConverter<WriteOperationType> {
        private OperationConverter() {
        }

        /* renamed from: convert, reason: merged with bridge method [inline-methods] */
        public WriteOperationType m0convert(String str) throws ParameterException {
            return WriteOperationType.valueOf(str);
        }
    }

    public static void main(String[] strArr) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Config config = new Config();
        JCommander jCommander = new JCommander(config, (ResourceBundle) null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        executionEnvironment.enableCheckpointing(config.checkpointInterval.longValue());
        executionEnvironment.getConfig().setGlobalJobParameters(config);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        executionEnvironment.disableOperatorChaining();
        if (config.flinkCheckPointPath != null) {
            executionEnvironment.setStateBackend(new FsStateBackend(config.flinkCheckPointPath));
        }
        TypedProperties props = StreamerUtil.getProps(config);
        props.put("bootstrap.servers", config.kafkaBootstrapServers);
        props.put("group.id", config.kafkaGroupId);
        props.put("hoodie.datasource.write.payload.class", config.payloadClassName);
        props.put("hoodie.datasource.write.precombine.field", config.sourceOrderingField);
        executionEnvironment.addSource(new FlinkKafkaConsumer(config.kafkaTopic, new SimpleStringSchema(), props)).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(new JsonStringToHoodieRecordMapFunction(props)).name("kafka_to_hudi_record").uid("kafka_to_hudi_record_uid").transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()).name("instant_generator").uid("instant_generator_id").setParallelism(1).keyBy((v0) -> {
            return v0.getPartitionPath();
        }).transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() { // from class: org.apache.hudi.HoodieFlinkStreamer.1
        }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())).name("write_process").uid("write_process_uid").setParallelism(executionEnvironment.getParallelism()).addSink(new CommitSink()).name("commit_sink").uid("commit_sink_uid").setParallelism(1);
        executionEnvironment.execute(config.targetTableName);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1481056889:
                if (implMethodName.equals("getPartitionPath")) {
                    z = true;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/model/HoodieRecord") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getPartitionPath();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
