/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HadoopFileSystemFactory;
import org.apache.flink.connectors.hive.HiveRowDataPartitionComputer;
import org.apache.flink.connectors.hive.HiveRowPartitionComputer;
import org.apache.flink.connectors.hive.HiveTableMetaStoreFactory;
import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.hive.shaded.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.OutputFormatFactory;
import org.apache.flink.table.filesystem.PartitionComputer;
import org.apache.flink.table.filesystem.RowDataPartitionComputer;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.orc.TypeDescription;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveTableSink
implements AppendStreamTableSink,
PartitionableTableSink,
OverwritableTableSink {
    private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);
    private final boolean userMrWriter;
    private final boolean isBounded;
    private final JobConf jobConf;
    private final CatalogTable catalogTable;
    private final ObjectIdentifier identifier;
    private final TableSchema tableSchema;
    private final String hiveVersion;
    private final HiveShim hiveShim;
    private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap();
    private boolean overwrite = false;
    private boolean dynamicGrouping = false;

    public HiveTableSink(boolean userMrWriter, boolean isBounded, JobConf jobConf, ObjectIdentifier identifier, CatalogTable table) {
        this.userMrWriter = userMrWriter;
        this.isBounded = isBounded;
        this.jobConf = jobConf;
        this.identifier = identifier;
        this.catalogTable = table;
        this.hiveVersion = (String)Preconditions.checkNotNull((Object)jobConf.get("hive-version"), (String)"Hive version is not defined");
        this.hiveShim = HiveShimLoader.loadHiveShim(this.hiveVersion);
        this.tableSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)table.getSchema());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public final DataStreamSink consumeDataStream(DataStream dataStream) {
        String[] partitionColumns = this.getPartitionKeys().toArray(new String[0]);
        String dbName = this.identifier.getDatabaseName();
        String tableName = this.identifier.getObjectName();
        try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(new HiveConf((org.apache.hadoop.conf.Configuration)this.jobConf, HiveConf.class), this.hiveVersion);){
            StreamingFileSink.BulkFormatBuilder builder;
            Table table = client.getTable(dbName, tableName);
            StorageDescriptor sd = table.getSd();
            HiveTableMetaStoreFactory msFactory = new HiveTableMetaStoreFactory(this.jobConf, this.hiveVersion, dbName, tableName);
            HadoopFileSystemFactory fsFactory = new HadoopFileSystemFactory(this.jobConf);
            Class hiveOutputFormatClz = this.hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
            boolean isCompressed = this.jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
            HiveWriterFactory recordWriterFactory = new HiveWriterFactory(this.jobConf, hiveOutputFormatClz, sd.getSerdeInfo(), this.tableSchema, partitionColumns, HiveReflectionUtils.getTableMetadata(this.hiveShim, table), this.hiveShim, isCompressed);
            String extension = Utilities.getFileExtension(this.jobConf, isCompressed, (HiveOutputFormat)hiveOutputFormatClz.newInstance());
            OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("part-" + UUID.randomUUID().toString()).withPartSuffix(extension == null ? "" : extension).build();
            if (this.isBounded) {
                FileSystemOutputFormat.Builder builder2 = new FileSystemOutputFormat.Builder();
                builder2.setPartitionComputer((PartitionComputer)new HiveRowPartitionComputer(this.hiveShim, this.jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal), this.tableSchema.getFieldNames(), this.tableSchema.getFieldDataTypes(), partitionColumns));
                builder2.setDynamicGrouped(this.dynamicGrouping);
                builder2.setPartitionColumns(partitionColumns);
                builder2.setFileSystemFactory((FileSystemFactory)fsFactory);
                builder2.setFormatFactory((OutputFormatFactory)new HiveOutputFormatFactory(recordWriterFactory));
                builder2.setMetaStoreFactory((TableMetaStoreFactory)msFactory);
                builder2.setOverwrite(this.overwrite);
                builder2.setStaticPartitions(this.staticPartitionSpec);
                builder2.setTempPath(new Path(this.toStagingDir(sd.getLocation(), (org.apache.hadoop.conf.Configuration)this.jobConf)));
                builder2.setOutputFileConfig(outputFileConfig);
                DataStreamSink dataStreamSink = dataStream.writeUsingOutputFormat((OutputFormat)builder2.build()).setParallelism(dataStream.getParallelism());
                return dataStreamSink;
            }
            Configuration conf = new Configuration();
            this.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)conf).setString(arg_0, arg_1));
            HiveRowDataPartitionComputer partComputer = new HiveRowDataPartitionComputer(this.hiveShim, this.jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal), this.tableSchema.getFieldNames(), this.tableSchema.getFieldDataTypes(), partitionColumns);
            FileSystemTableSink.TableBucketAssigner assigner = new FileSystemTableSink.TableBucketAssigner((PartitionComputer)partComputer);
            HiveRollingPolicy rollingPolicy = new HiveRollingPolicy(((MemorySize)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE)).getBytes(), ((Duration)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL)).toMillis());
            if (this.userMrWriter) {
                builder = this.bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
                LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
            } else {
                Optional<BulkWriter.Factory<RowData>> bulkFactory = this.createBulkWriterFactory(partitionColumns, sd);
                if (bulkFactory.isPresent()) {
                    builder = ((StreamingFileSink.DefaultBulkFormatBuilder)((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)new Path(sd.getLocation()), (BulkWriter.Factory)new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), (RowDataPartitionComputer)partComputer)).withBucketAssigner((BucketAssigner)assigner)).withRollingPolicy((CheckpointRollingPolicy)rollingPolicy)).withOutputFileConfig(outputFileConfig);
                    LOG.info("Hive streaming sink: Use native parquet&orc writer.");
                } else {
                    builder = this.bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
                    LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
                }
            }
            DataStreamSink dataStreamSink = FileSystemTableSink.createStreamingSink((Configuration)conf, (Path)new Path(sd.getLocation()), this.getPartitionKeys(), (ObjectIdentifier)this.identifier, (boolean)this.overwrite, (DataStream)dataStream, (StreamingFileSink.BucketsBuilder)builder, (TableMetaStoreFactory)msFactory, (FileSystemFactory)fsFactory, (long)((Duration)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL)).toMillis());
            return dataStreamSink;
        }
        catch (TException e) {
            throw new CatalogException("Failed to query Hive metaStore", (Throwable)e);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Failed to create staging dir", (Throwable)e);
        }
        catch (ClassNotFoundException e) {
            throw new FlinkHiveException("Failed to get output format class", e);
        }
        catch (IllegalAccessException | InstantiationException e) {
            throw new FlinkHiveException("Failed to instantiate output format instance", e);
        }
    }

    private StreamingFileSink.BucketsBuilder<RowData, String, ? extends StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilderForMRWriter(HiveWriterFactory recordWriterFactory, StorageDescriptor sd, FileSystemTableSink.TableBucketAssigner assigner, HiveRollingPolicy rollingPolicy, OutputFileConfig outputFileConfig) {
        HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
        return ((HadoopPathBasedBulkFormatBuilder)((Object)new HadoopPathBasedBulkFormatBuilder(new org.apache.hadoop.fs.Path(sd.getLocation()), hadoopBulkFactory, (org.apache.hadoop.conf.Configuration)this.jobConf, (BucketAssigner<RowData, String>)assigner).withRollingPolicy(rollingPolicy))).withOutputFileConfig(outputFileConfig);
    }

    private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(String[] partitionColumns, StorageDescriptor sd) {
        String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
        int formatFieldCount = this.tableSchema.getFieldCount() - partitionColumns.length;
        String[] formatNames = new String[formatFieldCount];
        LogicalType[] formatTypes = new LogicalType[formatFieldCount];
        for (int i = 0; i < formatFieldCount; ++i) {
            formatNames[i] = (String)this.tableSchema.getFieldName(i).get();
            formatTypes[i] = ((DataType)this.tableSchema.getFieldDataType(i).get()).getLogicalType();
        }
        RowType formatType = RowType.of((LogicalType[])formatTypes, (String[])formatNames);
        org.apache.hadoop.conf.Configuration formatConf = new org.apache.hadoop.conf.Configuration((org.apache.hadoop.conf.Configuration)this.jobConf);
        sd.getSerdeInfo().getParameters().forEach((arg_0, arg_1) -> ((org.apache.hadoop.conf.Configuration)formatConf).set(arg_0, arg_1));
        if (serLib.contains("parquet")) {
            return Optional.of(ParquetRowDataBuilder.createWriterFactory(formatType, formatConf, this.hiveVersion.startsWith("3.")));
        }
        if (serLib.contains("orc")) {
            TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType((LogicalType)formatType);
            return Optional.of(this.hiveShim.createOrcBulkWriterFactory(formatConf, typeDescription.toString(), formatTypes));
        }
        return Optional.empty();
    }

    public DataType getConsumedDataType() {
        DataType dataType = this.getTableSchema().toRowDataType();
        return this.isBounded ? dataType : (DataType)dataType.bridgedTo(RowData.class);
    }

    public TableSchema getTableSchema() {
        return this.tableSchema;
    }

    public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) {
        return this;
    }

    public boolean configurePartitionGrouping(boolean supportsGrouping) {
        this.dynamicGrouping = supportsGrouping;
        return supportsGrouping;
    }

    private String toStagingDir(String finalDir, org.apache.hadoop.conf.Configuration conf) throws IOException {
        org.apache.hadoop.fs.Path path;
        FileSystem fs;
        String res = finalDir;
        if (!finalDir.endsWith("/")) {
            res = res + "/";
        }
        Preconditions.checkState(((fs = (path = new org.apache.hadoop.fs.Path(res = res + ".staging_" + System.currentTimeMillis())).getFileSystem(conf)).exists(path) || fs.mkdirs(path) ? 1 : 0) != 0, (Object)("Failed to create staging dir " + path));
        fs.deleteOnExit(path);
        return res;
    }

    private List<String> getPartitionKeys() {
        return this.catalogTable.getPartitionKeys();
    }

    public void setStaticPartition(Map<String, String> partitionSpec) {
        this.staticPartitionSpec = new LinkedHashMap();
        for (String partitionCol : this.getPartitionKeys()) {
            if (!partitionSpec.containsKey(partitionCol)) continue;
            this.staticPartitionSpec.put(partitionCol, partitionSpec.get(partitionCol));
        }
    }

    public void setOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    private static class HiveRollingPolicy
    extends CheckpointRollingPolicy<RowData, String> {
        private final long rollingFileSize;
        private final long rollingTimeInterval;

        private HiveRollingPolicy(long rollingFileSize, long rollingTimeInterval) {
            Preconditions.checkArgument((rollingFileSize > 0L ? 1 : 0) != 0);
            Preconditions.checkArgument((rollingTimeInterval > 0L ? 1 : 0) != 0);
            this.rollingFileSize = rollingFileSize;
            this.rollingTimeInterval = rollingTimeInterval;
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
            return true;
        }

        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) {
            return false;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) {
            try {
                return currentTime - partFileState.getCreationTime() >= this.rollingTimeInterval || partFileState.getSize() > this.rollingFileSize;
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }
}

