package org.apache.flink.cdc.runtime.operators.transform;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.class */
public class TransformSchemaOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event> {
    private final List<Tuple5<String, String, String, String, String>> transformRules;
    private transient List<Tuple2<Selectors, Optional<TransformProjection>>> transforms;
    private final Map<TableId, TableChangeInfo> tableChangeInfoMap;
    private transient Map<TableId, TransformProjectionProcessor> processorMap;
    private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers;
    private transient ListState<byte[]> state;

    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator$Builder.class */
    public static class Builder {
        private final List<Tuple5<String, String, String, String, String>> transformRules = new ArrayList();

        public Builder addTransform(String str, @Nullable String str2, String str3, String str4, String str5) {
            this.transformRules.add(Tuple5.of(str, str2, str3, str4, str5));
            return this;
        }

        public TransformSchemaOperator build() {
            return new TransformSchemaOperator(this.transformRules);
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private TransformSchemaOperator(List<Tuple5<String, String, String, String, String>> list) {
        this.transformRules = list;
        this.tableChangeInfoMap = new ConcurrentHashMap();
        this.processorMap = new ConcurrentHashMap();
        this.schemaMetadataTransformers = new ArrayList();
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        this.transforms = new ArrayList();
        for (Tuple5<String, String, String, String, String> tuple5 : this.transformRules) {
            String str = (String) tuple5.f0;
            String str2 = (String) tuple5.f1;
            String str3 = (String) tuple5.f2;
            String str4 = (String) tuple5.f3;
            String str5 = (String) tuple5.f4;
            Selectors build = new Selectors.SelectorsBuilder().includeTables(str).build();
            this.transforms.add(new Tuple2<>(build, TransformProjection.of(str2)));
            this.schemaMetadataTransformers.add(new Tuple2<>(build, new SchemaMetadataTransform(str3, str4, str5)));
        }
        this.processorMap = new ConcurrentHashMap();
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.state = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("originalSchemaState", byte[].class));
        if (stateInitializationContext.isRestored()) {
            Iterator it = ((Iterable) this.state.get()).iterator();
            while (it.hasNext()) {
                TableChangeInfo m1944deserialize = TableChangeInfo.SERIALIZER.m1944deserialize(TableChangeInfo.SERIALIZER.getVersion(), (byte[]) it.next());
                this.tableChangeInfoMap.put(m1944deserialize.getTableId(), m1944deserialize);
            }
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.state.update(new ArrayList((Collection) this.tableChangeInfoMap.values().stream().map(tableChangeInfo -> {
            try {
                return TableChangeInfo.SERIALIZER.serialize(tableChangeInfo);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList())));
    }

    public void finish() throws Exception {
        super.finish();
        clearOperator();
    }

    public void close() throws Exception {
        super.close();
        clearOperator();
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = (Event) streamRecord.getValue();
        if (event instanceof CreateTableEvent) {
            this.output.collect(new StreamRecord(cacheCreateTable((CreateTableEvent) event)));
        } else if (event instanceof SchemaChangeEvent) {
            this.output.collect(new StreamRecord(cacheChangeSchema((SchemaChangeEvent) event)));
        } else if (event instanceof DataChangeEvent) {
            this.output.collect(new StreamRecord(processDataChangeEvent((DataChangeEvent) event)));
        }
    }

    private SchemaChangeEvent cacheCreateTable(CreateTableEvent createTableEvent) {
        TableId tableId = createTableEvent.tableId();
        Schema schema = createTableEvent.getSchema();
        CreateTableEvent transformCreateTableEvent = transformCreateTableEvent(createTableEvent);
        this.tableChangeInfoMap.put(tableId, TableChangeInfo.of(tableId, schema, transformCreateTableEvent.getSchema()));
        return transformCreateTableEvent;
    }

    private SchemaChangeEvent cacheChangeSchema(SchemaChangeEvent schemaChangeEvent) {
        TableId tableId = schemaChangeEvent.tableId();
        TableChangeInfo tableChangeInfo = this.tableChangeInfoMap.get(tableId);
        this.tableChangeInfoMap.put(tableId, TableChangeInfo.of(tableId, SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getOriginalSchema(), schemaChangeEvent), SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getTransformedSchema(), schemaChangeEvent)));
        return schemaChangeEvent;
    }

    private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {
        TableId tableId = createTableEvent.tableId();
        for (Tuple2<Selectors, SchemaMetadataTransform> tuple2 : this.schemaMetadataTransformers) {
            if (((Selectors) tuple2.f0).isMatch(tableId)) {
                createTableEvent = new CreateTableEvent(tableId, transformSchemaMetaData(createTableEvent.getSchema(), (SchemaMetadataTransform) tuple2.f1));
            }
        }
        for (Tuple2<Selectors, Optional<TransformProjection>> tuple22 : this.transforms) {
            if (((Selectors) tuple22.f0).isMatch(tableId) && ((Optional) tuple22.f1).isPresent()) {
                TransformProjection transformProjection = (TransformProjection) ((Optional) tuple22.f1).get();
                if (transformProjection.isValid()) {
                    if (!this.processorMap.containsKey(tableId)) {
                        this.processorMap.put(tableId, TransformProjectionProcessor.of(transformProjection));
                    }
                    return this.processorMap.get(tableId).processCreateTableEvent(createTableEvent);
                }
            }
        }
        return createTableEvent;
    }

    private Schema transformSchemaMetaData(Schema schema, SchemaMetadataTransform schemaMetadataTransform) {
        Schema.Builder columns = Schema.newBuilder().setColumns(schema.getColumns());
        if (schemaMetadataTransform.getPrimaryKeys().isEmpty()) {
            columns.primaryKey(schema.primaryKeys());
        } else {
            columns.primaryKey(schemaMetadataTransform.getPrimaryKeys());
        }
        if (schemaMetadataTransform.getPartitionKeys().isEmpty()) {
            columns.partitionKey(schema.partitionKeys());
        } else {
            columns.partitionKey(schemaMetadataTransform.getPartitionKeys());
        }
        if (schemaMetadataTransform.getOptions().isEmpty()) {
            columns.options(schema.options());
        } else {
            columns.options(schemaMetadataTransform.getOptions());
        }
        return columns.build();
    }

    private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        for (Tuple2<Selectors, Optional<TransformProjection>> tuple2 : this.transforms) {
            if (((Selectors) tuple2.f0).isMatch(tableId) && ((Optional) tuple2.f1).isPresent()) {
                TransformProjection transformProjection = (TransformProjection) ((Optional) tuple2.f1).get();
                if (transformProjection.isValid()) {
                    return processProjection(transformProjection, dataChangeEvent);
                }
            }
        }
        return dataChangeEvent;
    }

    private DataChangeEvent processProjection(TransformProjection transformProjection, DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        TableChangeInfo tableChangeInfo = this.tableChangeInfoMap.get(tableId);
        if (!this.processorMap.containsKey(tableId) || !this.processorMap.get(tableId).hasTableChangeInfo()) {
            this.processorMap.put(tableId, TransformProjectionProcessor.of(tableChangeInfo, transformProjection));
        }
        TransformProjectionProcessor transformProjectionProcessor = this.processorMap.get(tableId);
        BinaryRecordData binaryRecordData = (BinaryRecordData) dataChangeEvent.before();
        BinaryRecordData binaryRecordData2 = (BinaryRecordData) dataChangeEvent.after();
        if (binaryRecordData != null) {
            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, transformProjectionProcessor.processFillDataField(binaryRecordData));
        }
        if (binaryRecordData2 != null) {
            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, transformProjectionProcessor.processFillDataField(binaryRecordData2));
        }
        return dataChangeEvent;
    }

    private void clearOperator() {
        this.transforms = null;
        this.processorMap = null;
        this.state = null;
    }
}
