package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.class */
public class ChangeStreamRecordMapper {
    private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
    private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
    private static final String CHILD_PARTITIONS_RECORD_COLUMN = "child_partitions_record";
    private static final String COMMIT_TIMESTAMP_COLUMN = "commit_timestamp";
    private static final String SERVER_TRANSACTION_ID_COLUMN = "server_transaction_id";
    private static final String IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN = "is_last_record_in_transaction_in_partition";
    private static final String RECORD_SEQUENCE_COLUMN = "record_sequence";
    private static final String TABLE_NAME_COLUMN = "table_name";
    private static final String COLUMN_TYPES_COLUMN = "column_types";
    private static final String MODS_COLUMN = "mods";
    private static final String MOD_TYPE_COLUMN = "mod_type";
    private static final String VALUE_CAPTURE_TYPE_COLUMN = "value_capture_type";
    private static final String NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN = "number_of_records_in_transaction";
    private static final String NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN = "number_of_partitions_in_transaction";
    private static final String NAME_COLUMN = "name";
    private static final String TYPE_COLUMN = "type";
    private static final String IS_PRIMARY_KEY_COLUMN = "is_primary_key";
    private static final String ORDINAL_POSITION_COLUMN = "ordinal_position";
    private static final String KEYS_COLUMN = "keys";
    private static final String OLD_VALUES_COLUMN = "old_values";
    private static final String NEW_VALUES_COLUMN = "new_values";
    private static final String TIMESTAMP_COLUMN = "timestamp";
    private static final String START_TIMESTAMP_COLUMN = "start_timestamp";
    private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
    private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
    private static final String TOKEN_COLUMN = "token";

    public List<ChangeStreamRecord> toChangeStreamRecords(PartitionMetadata partitionMetadata, Struct struct, ChangeStreamResultSetMetadata changeStreamResultSetMetadata) {
        return (List) struct.getStructList(0).stream().flatMap(struct2 -> {
            return toChangeStreamRecord(partitionMetadata, struct2, changeStreamResultSetMetadata);
        }).collect(Collectors.toList());
    }

    private Stream<ChangeStreamRecord> toChangeStreamRecord(PartitionMetadata partitionMetadata, Struct struct, ChangeStreamResultSetMetadata changeStreamResultSetMetadata) {
        Stream map = struct.getStructList(DATA_CHANGE_RECORD_COLUMN).stream().filter(this::isNonNullDataChangeRecord).map(struct2 -> {
            return toDataChangeRecord(partitionMetadata, struct2, changeStreamResultSetMetadata);
        });
        Stream map2 = struct.getStructList(HEARTBEAT_RECORD_COLUMN).stream().filter(this::isNonNullHeartbeatRecord).map(struct3 -> {
            return toHeartbeatRecord(partitionMetadata, struct3, changeStreamResultSetMetadata);
        });
        return Stream.concat(Stream.concat(map, map2), struct.getStructList(CHILD_PARTITIONS_RECORD_COLUMN).stream().filter(this::isNonNullChildPartitionsRecord).map(struct4 -> {
            return toChildPartitionsRecord(partitionMetadata, struct4, changeStreamResultSetMetadata);
        }));
    }

    private boolean isNonNullDataChangeRecord(Struct struct) {
        return !struct.isNull(COMMIT_TIMESTAMP_COLUMN);
    }

    private boolean isNonNullHeartbeatRecord(Struct struct) {
        return !struct.isNull(TIMESTAMP_COLUMN);
    }

    private boolean isNonNullChildPartitionsRecord(Struct struct) {
        return !struct.isNull(START_TIMESTAMP_COLUMN);
    }

    private DataChangeRecord toDataChangeRecord(PartitionMetadata partitionMetadata, Struct struct, ChangeStreamResultSetMetadata changeStreamResultSetMetadata) {
        Timestamp timestamp = struct.getTimestamp(COMMIT_TIMESTAMP_COLUMN);
        return new DataChangeRecord(partitionMetadata.getPartitionToken(), timestamp, struct.getString(SERVER_TRANSACTION_ID_COLUMN), struct.getBoolean(IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN), struct.getString(RECORD_SEQUENCE_COLUMN), struct.getString(TABLE_NAME_COLUMN), (List) struct.getStructList(COLUMN_TYPES_COLUMN).stream().map(this::columnTypeFrom).collect(Collectors.toList()), (List) struct.getStructList(MODS_COLUMN).stream().map(this::modFrom).collect(Collectors.toList()), modTypeFrom(struct.getString(MOD_TYPE_COLUMN)), valueCaptureTypeFrom(struct.getString(VALUE_CAPTURE_TYPE_COLUMN)), struct.getLong(NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN), struct.getLong(NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN), changeStreamRecordMetadataFrom(partitionMetadata, timestamp, changeStreamResultSetMetadata));
    }

    private HeartbeatRecord toHeartbeatRecord(PartitionMetadata partitionMetadata, Struct struct, ChangeStreamResultSetMetadata changeStreamResultSetMetadata) {
        Timestamp timestamp = struct.getTimestamp(TIMESTAMP_COLUMN);
        return new HeartbeatRecord(timestamp, changeStreamRecordMetadataFrom(partitionMetadata, timestamp, changeStreamResultSetMetadata));
    }

    private ChildPartitionsRecord toChildPartitionsRecord(PartitionMetadata partitionMetadata, Struct struct, ChangeStreamResultSetMetadata changeStreamResultSetMetadata) {
        Timestamp timestamp = struct.getTimestamp(START_TIMESTAMP_COLUMN);
        return new ChildPartitionsRecord(timestamp, struct.getString(RECORD_SEQUENCE_COLUMN), (List) struct.getStructList(CHILD_PARTITIONS_COLUMN).stream().map(struct2 -> {
            return childPartitionFrom(partitionMetadata.getPartitionToken(), struct2);
        }).collect(Collectors.toList()), changeStreamRecordMetadataFrom(partitionMetadata, timestamp, changeStreamResultSetMetadata));
    }

    private ColumnType columnTypeFrom(Struct struct) {
        return new ColumnType(struct.getString(NAME_COLUMN), new TypeCode(getJsonString(struct, TYPE_COLUMN)), struct.getBoolean(IS_PRIMARY_KEY_COLUMN), struct.getLong(ORDINAL_POSITION_COLUMN));
    }

    private Mod modFrom(Struct struct) {
        return new Mod(getJsonString(struct, KEYS_COLUMN), struct.isNull(OLD_VALUES_COLUMN) ? null : getJsonString(struct, OLD_VALUES_COLUMN), struct.isNull(NEW_VALUES_COLUMN) ? null : getJsonString(struct, NEW_VALUES_COLUMN));
    }

    private ModType modTypeFrom(String str) {
        try {
            return ModType.valueOf(str);
        } catch (IllegalArgumentException e) {
            return ModType.UNKNOWN;
        }
    }

    private ValueCaptureType valueCaptureTypeFrom(String str) {
        try {
            return ValueCaptureType.valueOf(str);
        } catch (IllegalArgumentException e) {
            return ValueCaptureType.UNKNOWN;
        }
    }

    private ChildPartition childPartitionFrom(String str, Struct struct) {
        HashSet newHashSet = Sets.newHashSet(struct.getStringList(PARENT_PARTITION_TOKENS_COLUMN));
        if (InitialPartition.isInitialPartition(str)) {
            newHashSet.add(str);
        }
        return new ChildPartition(struct.getString(TOKEN_COLUMN), (HashSet<String>) newHashSet);
    }

    private ChangeStreamRecordMetadata changeStreamRecordMetadataFrom(PartitionMetadata partitionMetadata, Timestamp timestamp, ChangeStreamResultSetMetadata changeStreamResultSetMetadata) {
        return ChangeStreamRecordMetadata.newBuilder().withRecordTimestamp(timestamp).withPartitionToken(partitionMetadata.getPartitionToken()).withPartitionStartTimestamp(partitionMetadata.getStartTimestamp()).withPartitionEndTimestamp(partitionMetadata.getEndTimestamp()).withPartitionCreatedAt(partitionMetadata.getCreatedAt()).withPartitionScheduledAt(partitionMetadata.getScheduledAt()).withPartitionRunningAt(partitionMetadata.getRunningAt()).withQueryStartedAt(changeStreamResultSetMetadata.getQueryStartedAt()).withRecordStreamStartedAt(changeStreamResultSetMetadata.getRecordStreamStartedAt()).withRecordStreamEndedAt(changeStreamResultSetMetadata.getRecordStreamEndedAt()).withRecordReadAt(changeStreamResultSetMetadata.getRecordReadAt()).withTotalStreamTimeMillis(changeStreamResultSetMetadata.getTotalStreamDuration().getMillis()).withNumberOfRecordsRead(changeStreamResultSetMetadata.getNumberOfRecordsRead()).build();
    }

    private String getJsonString(Struct struct, String str) {
        if (struct.getColumnType(str).equals(Type.json())) {
            return struct.getJson(str);
        }
        if (struct.getColumnType(str).equals(Type.string())) {
            return struct.getString(str);
        }
        throw new IllegalArgumentException("Can not extract string from value " + str);
    }
}
