package org.apache.druid.delta.input;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.delta.filter.DeltaFilter;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.utils.Streams;
import org.apache.hadoop.conf.Configuration;

/* loaded from: input_file:org/apache/druid/delta/input/DeltaInputSource.class */
public class DeltaInputSource implements SplittableInputSource<DeltaSplit> {
    public static final String TYPE_KEY = "delta";

    @JsonProperty
    private final String tablePath;

    @JsonProperty
    @Nullable
    private final DeltaSplit deltaSplit;

    @JsonProperty
    @Nullable
    private final DeltaFilter filter;

    @JsonCreator
    public DeltaInputSource(@JsonProperty("tablePath") String str, @JsonProperty("deltaSplit") @Nullable DeltaSplit deltaSplit, @JsonProperty("filter") @Nullable DeltaFilter deltaFilter) {
        if (str == null) {
            throw InvalidInput.exception("tablePath cannot be null.", new Object[0]);
        }
        this.tablePath = str;
        this.deltaSplit = deltaSplit;
        this.filter = deltaFilter;
    }

    public boolean needsFormat() {
        return false;
    }

    public InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File file) {
        TableClient createTableClient = createTableClient();
        try {
            ArrayList arrayList = new ArrayList();
            if (this.deltaSplit != null) {
                Row deserialize = deserialize(createTableClient, this.deltaSplit.getStateRow());
                StructType physicalDataReadSchema = ScanStateRow.getPhysicalDataReadSchema(createTableClient, deserialize);
                Iterator<String> it = this.deltaSplit.getFiles().iterator();
                while (it.hasNext()) {
                    arrayList.add(getTransformedDataIterator(createTableClient, deserialize, deserialize(createTableClient, it.next()), physicalDataReadSchema, Optional.empty()));
                }
            } else {
                Snapshot latestSnapshot = Table.forPath(createTableClient, this.tablePath).getLatestSnapshot(createTableClient);
                StructType schema = latestSnapshot.getSchema(createTableClient);
                StructType pruneSchema = pruneSchema(schema, inputRowSchema.getColumnsFilter());
                ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(createTableClient);
                if (this.filter != null) {
                    scanBuilder.withFilter(createTableClient, this.filter.getFilterPredicate(schema));
                }
                Scan build = scanBuilder.withReadSchema(createTableClient, pruneSchema).build();
                CloseableIterator scanFiles = build.getScanFiles(createTableClient);
                Row scanState = build.getScanState(createTableClient);
                StructType physicalDataReadSchema2 = ScanStateRow.getPhysicalDataReadSchema(createTableClient, scanState);
                while (scanFiles.hasNext()) {
                    CloseableIterator rows = ((FilteredColumnarBatch) scanFiles.next()).getRows();
                    while (rows.hasNext()) {
                        arrayList.add(getTransformedDataIterator(createTableClient, scanState, (Row) rows.next(), physicalDataReadSchema2, build.getRemainingFilter()));
                    }
                }
            }
            return new DeltaInputSourceReader(arrayList.iterator(), inputRowSchema);
        } catch (TableNotFoundException e) {
            throw InvalidInput.exception(e, "tablePath[%s] not found.", new Object[]{this.tablePath});
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    public Stream<InputSplit<DeltaSplit>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) {
        if (this.deltaSplit != null) {
            return Stream.of(new InputSplit(this.deltaSplit));
        }
        TableClient createTableClient = createTableClient();
        try {
            Snapshot latestSnapshot = Table.forPath(createTableClient, this.tablePath).getLatestSnapshot(createTableClient);
            StructType schema = latestSnapshot.getSchema(createTableClient);
            ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(createTableClient);
            if (this.filter != null) {
                scanBuilder.withFilter(createTableClient, this.filter.getFilterPredicate(schema));
            }
            Scan build = scanBuilder.withReadSchema(createTableClient, schema).build();
            CloseableIterator scanFiles = build.getScanFiles(createTableClient);
            String serializeRowToJson = RowSerde.serializeRowToJson(build.getScanState(createTableClient));
            return Streams.sequentialStreamFrom(Iterators.transform(scanFiles, filteredColumnarBatch -> {
                CloseableIterator rows = filteredColumnarBatch.getRows();
                ArrayList arrayList = new ArrayList();
                while (rows.hasNext()) {
                    arrayList.add(RowSerde.serializeRowToJson((Row) rows.next()));
                }
                return new DeltaSplit(serializeRowToJson, arrayList);
            })).map((v1) -> {
                return new InputSplit(v1);
            });
        } catch (TableNotFoundException e) {
            throw InvalidInput.exception(e, "tablePath[%s] not found.", new Object[]{this.tablePath});
        }
    }

    public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) {
        return Ints.checkedCast(createSplits(inputFormat, splitHintSpec).count());
    }

    public InputSource withSplit(InputSplit<DeltaSplit> inputSplit) {
        return new DeltaInputSource(this.tablePath, (DeltaSplit) inputSplit.get(), this.filter);
    }

    private Row deserialize(TableClient tableClient, String str) {
        return RowSerde.deserializeRowFromJson(tableClient, str);
    }

    private StructType pruneSchema(StructType structType, ColumnsFilter columnsFilter) {
        List fieldNames = structType.fieldNames();
        Stream stream = fieldNames.stream();
        Objects.requireNonNull(columnsFilter);
        List list = (List) stream.filter(columnsFilter::apply).collect(Collectors.toList());
        if (list.equals(fieldNames)) {
            return structType;
        }
        Stream stream2 = list.stream();
        Objects.requireNonNull(structType);
        return new StructType((List) stream2.map(structType::get).collect(Collectors.toList()));
    }

    private TableClient createTableClient() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            DefaultTableClient create = DefaultTableClient.create(new Configuration());
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return create;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(TableClient tableClient, Row row, Row row2, StructType structType, Optional<Predicate> optional) throws IOException {
        return Scan.transformPhysicalData(tableClient, row, row2, tableClient.getParquetHandler().readParquetFiles(Utils.singletonCloseableIterator(InternalScanFileUtils.getAddFileStatus(row2)), structType, optional));
    }

    @VisibleForTesting
    String getTablePath() {
        return this.tablePath;
    }

    @VisibleForTesting
    DeltaFilter getFilter() {
        return this.filter;
    }
}
