package org.apache.beam.sdk.io.hcatalog;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.io.hcatalog.AutoValue_HCatalogIO_Read;
import org.apache.beam.sdk.io.hcatalog.AutoValue_HCatalogIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
import org.apache.hive.hcatalog.data.transfer.HCatWriter;
import org.apache.hive.hcatalog.data.transfer.ReadEntity;
import org.apache.hive.hcatalog.data.transfer.ReaderContext;
import org.apache.hive.hcatalog.data.transfer.WriteEntity;
import org.apache.hive.hcatalog.data.transfer.WriterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/hcatalog/HCatalogIO.class */
public class HCatalogIO {
    private static final Logger LOG = LoggerFactory.getLogger(HCatalogIO.class);
    private static final long BATCH_SIZE = 1024;
    private static final String DEFAULT_DATABASE = "default";

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/hcatalog/HCatalogIO$BoundedHCatalogSource.class */
    public static class BoundedHCatalogSource extends BoundedSource<HCatRecord> {
        private final Read spec;

        /* loaded from: input_file:org/apache/beam/sdk/io/hcatalog/HCatalogIO$BoundedHCatalogSource$BoundedHCatalogReader.class */
        static class BoundedHCatalogReader extends BoundedSource.BoundedReader<HCatRecord> {
            private final BoundedHCatalogSource source;
            private HCatRecord current;
            private Iterator<HCatRecord> hcatIterator;

            public BoundedHCatalogReader(BoundedHCatalogSource boundedHCatalogSource) {
                this.source = boundedHCatalogSource;
            }

            public boolean start() throws HCatException {
                this.hcatIterator = DataTransferFactory.getHCatReader(this.source.spec.getContext(), this.source.spec.getSplitId().intValue()).read();
                return advance();
            }

            public boolean advance() {
                if (this.hcatIterator.hasNext()) {
                    this.current = this.hcatIterator.next();
                    return true;
                }
                this.current = null;
                return false;
            }

            /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
            public BoundedHCatalogSource m2getCurrentSource() {
                return this.source;
            }

            /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
            public HCatRecord m3getCurrent() {
                if (this.current == null) {
                    throw new NoSuchElementException("Current element is null");
                }
                return this.current;
            }

            public void close() {
            }
        }

        BoundedHCatalogSource(Read read) {
            this.spec = read;
        }

        public Coder<HCatRecord> getOutputCoder() {
            return WritableCoder.of(DefaultHCatRecord.class);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
        }

        public BoundedSource.BoundedReader<HCatRecord> createReader(PipelineOptions pipelineOptions) {
            return new BoundedHCatalogReader(this);
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
            Configuration configuration = new Configuration();
            for (Map.Entry<String, String> entry : this.spec.getConfigProperties().entrySet()) {
                configuration.set(entry.getKey(), entry.getValue());
            }
            IMetaStoreClient iMetaStoreClient = null;
            try {
                HiveConf hiveConf = HCatUtil.getHiveConf(configuration);
                iMetaStoreClient = HCatUtil.getHiveMetastoreClient(hiveConf);
                long fileSizeForTable = StatsUtils.getFileSizeForTable(hiveConf, HCatUtil.getTable(iMetaStoreClient, this.spec.getDatabase(), this.spec.getTable()));
                if (iMetaStoreClient != null) {
                    iMetaStoreClient.close();
                }
                return fileSizeForTable;
            } catch (Throwable th) {
                if (iMetaStoreClient != null) {
                    iMetaStoreClient.close();
                }
                throw th;
            }
        }

        public List<BoundedSource<HCatRecord>> split(long j, PipelineOptions pipelineOptions) throws Exception {
            int i = 1;
            long estimatedSizeBytes = getEstimatedSizeBytes(pipelineOptions);
            if (j > 0 && estimatedSizeBytes > 0) {
                i = (int) Math.ceil(estimatedSizeBytes / j);
            }
            ReaderContext readerContext = getReaderContext(i);
            HCatalogIO.LOG.info("Splitting into bundles of {} bytes: estimated size {}, desired split count {}, actual split count {}", new Object[]{Long.valueOf(j), Long.valueOf(estimatedSizeBytes), Integer.valueOf(i), Integer.valueOf(readerContext.numSplits())});
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < readerContext.numSplits(); i2++) {
                arrayList.add(new BoundedHCatalogSource(this.spec.withContext(readerContext).withSplitId(i2)));
            }
            return arrayList;
        }

        private ReaderContext getReaderContext(long j) throws HCatException {
            ReadEntity build = new ReadEntity.Builder().withDatabase(this.spec.getDatabase()).withTable(this.spec.getTable()).withFilter(this.spec.getFilter()).build();
            HashMap hashMap = new HashMap(this.spec.getConfigProperties());
            hashMap.put("hcat.desired.partition.num.splits", String.valueOf(j));
            return DataTransferFactory.getHCatReader(build, hashMap).prepareRead();
        }
    }

    @VisibleForTesting
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/hcatalog/HCatalogIO$Read.class */
    public static abstract class Read extends PTransform<PBegin, PCollection<HCatRecord>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/hcatalog/HCatalogIO$Read$Builder.class */
        public static abstract class Builder {
            abstract Builder setConfigProperties(Map<String, String> map);

            abstract Builder setDatabase(String str);

            abstract Builder setTable(String str);

            abstract Builder setFilter(String str);

            abstract Builder setSplitId(Integer num);

            abstract Builder setContext(ReaderContext readerContext);

            abstract Read build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Map<String, String> getConfigProperties();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getDatabase();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getTable();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getFilter();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract ReaderContext getContext();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Integer getSplitId();

        abstract Builder toBuilder();

        public Read withConfigProperties(Map<String, String> map) {
            return toBuilder().setConfigProperties(new HashMap(map)).build();
        }

        public Read withDatabase(String str) {
            return toBuilder().setDatabase(str).build();
        }

        public Read withTable(String str) {
            return toBuilder().setTable(str).build();
        }

        public Read withFilter(String str) {
            return toBuilder().setFilter(str).build();
        }

        Read withSplitId(int i) {
            Preconditions.checkArgument(i >= 0, "Invalid split id-" + i);
            return toBuilder().setSplitId(Integer.valueOf(i)).build();
        }

        Read withContext(ReaderContext readerContext) {
            return toBuilder().setContext(readerContext).build();
        }

        public PCollection<HCatRecord> expand(PBegin pBegin) {
            Preconditions.checkArgument(getTable() != null, "withTable() is required");
            Preconditions.checkArgument(getConfigProperties() != null, "withConfigProperties() is required");
            return pBegin.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this)));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("configProperties", getConfigProperties().toString()));
            builder.add(DisplayData.item("table", getTable()));
            builder.addIfNotNull(DisplayData.item("database", getDatabase()));
            builder.addIfNotNull(DisplayData.item("filter", getFilter()));
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/hcatalog/HCatalogIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<HCatRecord>, PDone> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/hcatalog/HCatalogIO$Write$Builder.class */
        public static abstract class Builder {
            abstract Builder setConfigProperties(Map<String, String> map);

            abstract Builder setDatabase(String str);

            abstract Builder setTable(String str);

            abstract Builder setPartition(Map<String, String> map);

            abstract Builder setBatchSize(long j);

            abstract Write build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/hcatalog/HCatalogIO$Write$WriteFn.class */
        public static class WriteFn extends DoFn<HCatRecord, Void> {
            private final Write spec;
            private WriterContext writerContext;
            private HCatWriter slaveWriter;
            private HCatWriter masterWriter;
            private List<HCatRecord> hCatRecordsBatch;

            public WriteFn(Write write) {
                this.spec = write;
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.addIfNotNull(DisplayData.item("database", this.spec.getDatabase()));
                builder.add(DisplayData.item("table", this.spec.getTable()));
                builder.addIfNotNull(DisplayData.item("partition", String.valueOf(this.spec.getPartition())));
                builder.add(DisplayData.item("configProperties", this.spec.getConfigProperties().toString()));
                builder.add(DisplayData.item("batchSize", Long.valueOf(this.spec.getBatchSize())));
            }

            @DoFn.Setup
            public void initiateWrite() throws HCatException {
                this.masterWriter = DataTransferFactory.getHCatWriter(new WriteEntity.Builder().withDatabase(this.spec.getDatabase()).withTable(this.spec.getTable()).withPartition(this.spec.getPartition()).build(), this.spec.getConfigProperties());
                this.writerContext = this.masterWriter.prepareWrite();
                this.slaveWriter = DataTransferFactory.getHCatWriter(this.writerContext);
            }

            @DoFn.StartBundle
            public void startBundle() {
                this.hCatRecordsBatch = new ArrayList();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<HCatRecord, Void>.ProcessContext processContext) throws HCatException {
                this.hCatRecordsBatch.add((HCatRecord) processContext.element());
                if (this.hCatRecordsBatch.size() >= this.spec.getBatchSize()) {
                    flush();
                }
            }

            @DoFn.FinishBundle
            public void finishBundle() throws HCatException {
                flush();
            }

            private void flush() throws HCatException {
                if (this.hCatRecordsBatch.isEmpty()) {
                    return;
                }
                try {
                    try {
                        this.slaveWriter.write(this.hCatRecordsBatch.iterator());
                        this.masterWriter.commit(this.writerContext);
                        this.hCatRecordsBatch.clear();
                    } catch (HCatException e) {
                        HCatalogIO.LOG.error("Exception in flush - write/commit data to Hive", e);
                        this.masterWriter.abort(this.writerContext);
                        throw e;
                    }
                } catch (Throwable th) {
                    this.hCatRecordsBatch.clear();
                    throw th;
                }
            }

            @DoFn.Teardown
            public void tearDown() throws Exception {
                if (this.slaveWriter != null) {
                    this.slaveWriter = null;
                }
                if (this.masterWriter != null) {
                    this.masterWriter = null;
                }
                if (this.writerContext != null) {
                    this.writerContext = null;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Map<String, String> getConfigProperties();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getDatabase();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getTable();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Map<String, String> getPartition();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getBatchSize();

        abstract Builder toBuilder();

        public Write withConfigProperties(Map<String, String> map) {
            return toBuilder().setConfigProperties(new HashMap(map)).build();
        }

        public Write withDatabase(String str) {
            return toBuilder().setDatabase(str).build();
        }

        public Write withTable(String str) {
            return toBuilder().setTable(str).build();
        }

        public Write withPartition(Map<String, String> map) {
            return toBuilder().setPartition(map).build();
        }

        public Write withBatchSize(long j) {
            return toBuilder().setBatchSize(j).build();
        }

        public PDone expand(PCollection<HCatRecord> pCollection) {
            Preconditions.checkArgument(getConfigProperties() != null, "withConfigProperties() is required");
            Preconditions.checkArgument(getTable() != null, "withTable() is required");
            pCollection.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(pCollection.getPipeline());
        }
    }

    public static Write write() {
        return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(BATCH_SIZE).build();
    }

    public static Read read() {
        return new AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build();
    }

    private HCatalogIO() {
    }
}
