package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.transforms.Transforms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/RecordWriterManager.class */
public class RecordWriterManager implements AutoCloseable {
    private final Catalog catalog;
    private final String filePrefix;
    private final long maxFileSize;
    private final int maxNumWriters;

    @VisibleForTesting
    int openWriters = 0;

    @VisibleForTesting
    final Map<WindowedValue<IcebergDestination>, DestinationState> destinations = Maps.newHashMap();
    private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> totalSerializableDataFiles = Maps.newHashMap();
    private boolean isClosed = false;
    private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class);
    private static final DateTimeFormatter HOUR_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
    private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);

    @VisibleForTesting
    static final Cache<TableIdentifier, Table> TABLE_CACHE = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

    /* loaded from: input_file:org/apache/beam/sdk/io/iceberg/RecordWriterManager$DestinationState.class */
    class DestinationState {
        private final IcebergDestination icebergDestination;
        private final PartitionSpec spec;
        private final Schema schema;
        private final PartitionKey routingPartitionKey;
        private final Table table;
        final Cache<PartitionKey, RecordWriter> writers;
        private final InternalRecordWrapper wrapper;
        private final String stateToken = UUID.randomUUID().toString();
        private final List<SerializableDataFile> dataFiles = Lists.newArrayList();

        @VisibleForTesting
        final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();
        private final Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
        private final List<Exception> exceptions = Lists.newArrayList();

        DestinationState(IcebergDestination icebergDestination, Table table) {
            this.icebergDestination = icebergDestination;
            this.schema = table.schema();
            this.spec = table.spec();
            this.routingPartitionKey = new PartitionKey(this.spec, this.schema);
            this.wrapper = new InternalRecordWrapper(this.schema.asStruct());
            this.table = table;
            for (PartitionField partitionField : this.spec.fields()) {
                this.partitionFieldMap.put(partitionField.name(), partitionField);
            }
            this.writers = CacheBuilder.newBuilder().expireAfterAccess(1L, TimeUnit.MINUTES).removalListener(removalNotification -> {
                PartitionKey partitionKey = (PartitionKey) Preconditions.checkStateNotNull((PartitionKey) removalNotification.getKey());
                RecordWriter recordWriter = (RecordWriter) Preconditions.checkStateNotNull((RecordWriter) removalNotification.getValue());
                try {
                    recordWriter.close();
                    RecordWriterManager.this.openWriters--;
                    this.dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), this.partitionFieldMap)));
                } catch (IOException e) {
                    RuntimeException runtimeException = new RuntimeException(String.format("Encountered an error when closing data writer for table '%s', path: %s", icebergDestination.getTableIdentifier(), recordWriter.path()), e);
                    this.exceptions.add(runtimeException);
                    throw runtimeException;
                }
            }).build();
        }

        boolean write(Record record) {
            this.routingPartitionKey.partition(this.wrapper.wrap(record));
            RecordWriter recordWriter = (RecordWriter) this.writers.getIfPresent(this.routingPartitionKey);
            if (recordWriter == null && RecordWriterManager.this.openWriters >= RecordWriterManager.this.maxNumWriters) {
                return false;
            }
            fetchWriterForPartition(this.routingPartitionKey, recordWriter).write(record);
            return true;
        }

        private RecordWriter fetchWriterForPartition(PartitionKey partitionKey, RecordWriter recordWriter) {
            if (recordWriter == null || recordWriter.bytesWritten() > RecordWriterManager.this.maxFileSize) {
                PartitionKey copy = partitionKey.copy();
                this.writers.invalidate(copy);
                recordWriter = createWriter(copy);
                this.writers.put(copy, recordWriter);
            }
            return recordWriter;
        }

        private RecordWriter createWriter(PartitionKey partitionKey) {
            try {
                RecordWriter recordWriter = new RecordWriter(this.table, this.icebergDestination.getFileFormat(), RecordWriterManager.this.filePrefix + "_" + this.stateToken + "_" + this.writerCounts.merge(partitionKey, 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                }).intValue(), partitionKey);
                RecordWriterManager.this.openWriters++;
                return recordWriter;
            } catch (IOException e) {
                throw new RuntimeException(String.format("Encountered an error when creating a RecordWriter for table '%s', partition %s.", this.icebergDestination.getTableIdentifier(), partitionKey), e);
            }
        }
    }

    @VisibleForTesting
    static String getPartitionDataPath(String str, Map<String, PartitionField> map) {
        if (str.isEmpty() || map.isEmpty()) {
            return str;
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = Splitter.on('/').splitToList(str).iterator();
        while (it.hasNext()) {
            List splitToList = Splitter.on('=').splitToList((String) it.next());
            String str2 = (String) splitToList.get(0);
            String str3 = (String) splitToList.get(1);
            String obj = ((PartitionField) Preconditions.checkArgumentNotNull(map.get(str2))).transform().toString();
            if (Transforms.month().toString().equals(obj)) {
                str3 = String.valueOf(YearMonth.parse(str3).getMonthValue());
            } else if (Transforms.hour().toString().equals(obj)) {
                str3 = String.valueOf(ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(str3, HOUR_FORMATTER)));
            }
            arrayList.add(str2 + "=" + str3);
        }
        return String.join("/", arrayList);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriterManager(Catalog catalog, String str, long j, int i) {
        this.catalog = catalog;
        this.filePrefix = str;
        this.maxFileSize = j;
        this.maxNumWriters = i;
    }

    private Table getOrCreateTable(TableIdentifier tableIdentifier, org.apache.beam.sdk.schemas.Schema schema) {
        Table table = (Table) TABLE_CACHE.getIfPresent(tableIdentifier);
        if (table == null) {
            synchronized (TABLE_CACHE) {
                try {
                    table = this.catalog.loadTable(tableIdentifier);
                } catch (NoSuchTableException e) {
                    try {
                        Schema beamSchemaToIcebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
                        table = this.catalog.createTable(tableIdentifier, beamSchemaToIcebergSchema);
                        LOG.info("Created Iceberg table '{}' with schema: {}", tableIdentifier, beamSchemaToIcebergSchema);
                    } catch (AlreadyExistsException e2) {
                        table = this.catalog.loadTable(tableIdentifier);
                    }
                }
                TABLE_CACHE.put(tableIdentifier, table);
            }
        } else {
            table.refresh();
        }
        return table;
    }

    public boolean write(WindowedValue<IcebergDestination> windowedValue, Row row) {
        DestinationState computeIfAbsent = this.destinations.computeIfAbsent(windowedValue, windowedValue2 -> {
            return new DestinationState((IcebergDestination) windowedValue2.getValue(), getOrCreateTable(((IcebergDestination) windowedValue2.getValue()).getTableIdentifier(), row.getSchema()));
        });
        return computeIfAbsent.write(IcebergUtils.beamRowToIcebergRecord(computeIfAbsent.schema, row));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState> entry : this.destinations.entrySet()) {
            DestinationState value = entry.getValue();
            value.writers.invalidateAll();
            if (!value.exceptions.isEmpty()) {
                IllegalStateException illegalStateException = new IllegalStateException(String.format("Encountered %s failed writer(s).", Integer.valueOf(value.exceptions.size())));
                Iterator it = value.exceptions.iterator();
                while (it.hasNext()) {
                    illegalStateException.addSuppressed((Exception) it.next());
                }
                throw illegalStateException;
            }
            if (!value.dataFiles.isEmpty()) {
                this.totalSerializableDataFiles.put(entry.getKey(), new ArrayList(value.dataFiles));
                value.dataFiles.clear();
            }
        }
        this.destinations.clear();
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(this.openWriters == 0, "Expected all data writers to be closed, but found %s data writer(s) still open", this.openWriters);
        this.isClosed = true;
    }

    public Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> getSerializableDataFiles() {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(this.isClosed, "Please close this %s before retrieving its data files.", getClass().getSimpleName());
        return this.totalSerializableDataFiles;
    }
}
