package org.apache.flink.cdc.runtime.operators.transform;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.class */
public class TransformDataOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event> {
    private SchemaEvolutionClient schemaEvolutionClient;
    private final OperatorID schemaOperatorID;
    private final String timezone;
    private final List<Tuple3<String, String, String>> transformRules;
    private transient List<Tuple4<Selectors, Optional<TransformProjection>, Optional<TransformFilter>, Boolean>> transforms;
    private final Map<TableId, TableInfo> tableInfoMap;
    private transient Map<TransformProjection, TransformProjectionProcessor> transformProjectionProcessorMap;
    private transient Map<TransformFilter, TransformFilterProcessor> transformFilterProcessorMap;

    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator$Builder.class */
    public static class Builder {
        private final List<Tuple3<String, String, String>> transformRules = new ArrayList();
        private OperatorID schemaOperatorID;
        private String timezone;

        public Builder addTransform(String str, @Nullable String str2, @Nullable String str3) {
            this.transformRules.add(Tuple3.of(str, str2, str3));
            return this;
        }

        public Builder addSchemaOperatorID(OperatorID operatorID) {
            this.schemaOperatorID = operatorID;
            return this;
        }

        public Builder addTimezone(String str) {
            if (((String) PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue()).equals(str)) {
                this.timezone = ZoneId.systemDefault().toString();
            } else {
                this.timezone = str;
            }
            return this;
        }

        public TransformDataOperator build() {
            return new TransformDataOperator(this.transformRules, this.schemaOperatorID, this.timezone);
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private TransformDataOperator(List<Tuple3<String, String, String>> list, OperatorID operatorID, String str) {
        this.transformRules = list;
        this.schemaOperatorID = operatorID;
        this.timezone = str;
        this.tableInfoMap = new ConcurrentHashMap();
        this.transformFilterProcessorMap = new ConcurrentHashMap();
        this.transformProjectionProcessorMap = new ConcurrentHashMap();
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Event>> output) {
        super.setup(streamTask, streamConfig, output);
        this.schemaEvolutionClient = new SchemaEvolutionClient(streamTask.getEnvironment().getOperatorCoordinatorEventGateway(), this.schemaOperatorID);
    }

    public void open() throws Exception {
        super.open();
        this.transforms = (List) this.transformRules.stream().map(tuple3 -> {
            String str = (String) tuple3.f0;
            String str2 = (String) tuple3.f1;
            String str3 = (String) tuple3.f2;
            return new Tuple4(new Selectors.SelectorsBuilder().includeTables(str).build(), TransformProjection.of(str2), TransformFilter.of(str3), Boolean.valueOf(containFilteredComputedColumn(str2, str3)));
        }).collect(Collectors.toList());
        this.transformFilterProcessorMap = new ConcurrentHashMap();
        this.transformProjectionProcessorMap = new ConcurrentHashMap();
    }

    public void finish() throws Exception {
        super.finish();
        clearOperator();
    }

    public void close() throws Exception {
        super.close();
        clearOperator();
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = (Event) streamRecord.getValue();
        if (event instanceof SchemaChangeEvent) {
            this.output.collect(new StreamRecord(cacheSchema((SchemaChangeEvent) event)));
        } else if (event instanceof DataChangeEvent) {
            Optional<DataChangeEvent> processDataChangeEvent = processDataChangeEvent((DataChangeEvent) event);
            if (processDataChangeEvent.isPresent()) {
                this.output.collect(new StreamRecord(processDataChangeEvent.get()));
            }
        }
    }

    private SchemaChangeEvent cacheSchema(SchemaChangeEvent schemaChangeEvent) throws Exception {
        TableId tableId = schemaChangeEvent.tableId();
        Schema schema = schemaChangeEvent instanceof CreateTableEvent ? ((CreateTableEvent) schemaChangeEvent).getSchema() : SchemaUtils.applySchemaChangeEvent(getTableInfoFromSchemaEvolutionClient(tableId).getSchema(), schemaChangeEvent);
        transformSchema(tableId, schema);
        this.tableInfoMap.put(tableId, TableInfo.of(tableId, schema));
        return schemaChangeEvent;
    }

    private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws Exception {
        TableInfo tableInfo = this.tableInfoMap.get(tableId);
        if (tableInfo == null) {
            Optional<Schema> latestSchema = this.schemaEvolutionClient.getLatestSchema(tableId);
            if (!latestSchema.isPresent()) {
                throw new RuntimeException("Could not find schema message from SchemaRegistry for " + tableId);
            }
            tableInfo = TableInfo.of(tableId, latestSchema.get());
        }
        return tableInfo;
    }

    private void transformSchema(TableId tableId, Schema schema) throws Exception {
        for (Tuple4<Selectors, Optional<TransformProjection>, Optional<TransformFilter>, Boolean> tuple4 : this.transforms) {
            if (((Selectors) tuple4.f0).isMatch(tableId) && ((Optional) tuple4.f1).isPresent()) {
                TransformProjection transformProjection = (TransformProjection) ((Optional) tuple4.f1).get();
                if (transformProjection.isValid()) {
                    if (!this.transformProjectionProcessorMap.containsKey(transformProjection)) {
                        this.transformProjectionProcessorMap.put(transformProjection, TransformProjectionProcessor.of(transformProjection));
                    }
                    this.transformProjectionProcessorMap.get(transformProjection).processSchemaChangeEvent(schema);
                }
            }
        }
    }

    private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        ArrayList<Optional<DataChangeEvent>> arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        for (Tuple4<Selectors, Optional<TransformProjection>, Optional<TransformFilter>, Boolean> tuple4 : this.transforms) {
            Selectors selectors = (Selectors) tuple4.f0;
            Boolean bool = (Boolean) tuple4.f3;
            if (selectors.isMatch(tableId)) {
                Optional<DataChangeEvent> of = Optional.of(dataChangeEvent);
                Optional optional = (Optional) tuple4.f1;
                if (bool.booleanValue() && optional.isPresent() && ((TransformProjection) optional.get()).isValid()) {
                    TransformProjection transformProjection = (TransformProjection) optional.get();
                    if (!this.transformProjectionProcessorMap.containsKey(transformProjection) || !this.transformProjectionProcessorMap.get(transformProjection).hasTableInfo()) {
                        this.transformProjectionProcessorMap.put(transformProjection, TransformProjectionProcessor.of(getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, this.timezone));
                    }
                    of = processProjection(this.transformProjectionProcessorMap.get(transformProjection), of.get(), currentTimeMillis);
                }
                Optional optional2 = (Optional) tuple4.f2;
                if (optional2.isPresent() && ((TransformFilter) optional2.get()).isVaild()) {
                    TransformFilter transformFilter = (TransformFilter) optional2.get();
                    if (!this.transformFilterProcessorMap.containsKey(transformFilter)) {
                        this.transformFilterProcessorMap.put(transformFilter, TransformFilterProcessor.of(getTableInfoFromSchemaEvolutionClient(tableId), transformFilter, this.timezone));
                    }
                    of = processFilter(this.transformFilterProcessorMap.get(transformFilter), of.get(), currentTimeMillis);
                }
                if (!bool.booleanValue() && of.isPresent() && optional.isPresent() && ((TransformProjection) optional.get()).isValid()) {
                    TransformProjection transformProjection2 = (TransformProjection) optional.get();
                    if (!this.transformProjectionProcessorMap.containsKey(transformProjection2) || !this.transformProjectionProcessorMap.get(transformProjection2).hasTableInfo()) {
                        this.transformProjectionProcessorMap.put(transformProjection2, TransformProjectionProcessor.of(getTableInfoFromSchemaEvolutionClient(tableId), transformProjection2, this.timezone));
                    }
                    of = processProjection(this.transformProjectionProcessorMap.get(transformProjection2), of.get(), currentTimeMillis);
                }
                arrayList.add(of);
            }
        }
        if (arrayList.isEmpty()) {
            return Optional.of(dataChangeEvent);
        }
        for (Optional<DataChangeEvent> optional3 : arrayList) {
            if (optional3.isPresent()) {
                return optional3;
            }
        }
        return Optional.empty();
    }

    private Optional<DataChangeEvent> processFilter(TransformFilterProcessor transformFilterProcessor, DataChangeEvent dataChangeEvent, long j) throws Exception {
        BinaryRecordData binaryRecordData = (BinaryRecordData) dataChangeEvent.before();
        BinaryRecordData binaryRecordData2 = (BinaryRecordData) dataChangeEvent.after();
        if (binaryRecordData2 != null) {
            return transformFilterProcessor.process(binaryRecordData2, j) ? Optional.of(dataChangeEvent) : Optional.empty();
        }
        if (binaryRecordData != null && transformFilterProcessor.process(binaryRecordData, j)) {
            return Optional.of(dataChangeEvent);
        }
        return Optional.empty();
    }

    private Optional<DataChangeEvent> processProjection(TransformProjectionProcessor transformProjectionProcessor, DataChangeEvent dataChangeEvent, long j) throws Exception {
        BinaryRecordData binaryRecordData = (BinaryRecordData) dataChangeEvent.before();
        BinaryRecordData binaryRecordData2 = (BinaryRecordData) dataChangeEvent.after();
        if (binaryRecordData != null) {
            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, transformProjectionProcessor.processData(binaryRecordData, j));
        }
        if (binaryRecordData2 != null) {
            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, transformProjectionProcessor.processData(binaryRecordData2, j));
        }
        return Optional.of(dataChangeEvent);
    }

    private boolean containFilteredComputedColumn(String str, String str2) {
        if (StringUtils.isNullOrWhitespaceOnly(str) || StringUtils.isNullOrWhitespaceOnly(str2)) {
            return false;
        }
        List<String> parseComputedColumnNames = TransformParser.parseComputedColumnNames(str);
        List<String> parseFilterColumnNameList = TransformParser.parseFilterColumnNameList(str2);
        Iterator<String> it = parseComputedColumnNames.iterator();
        while (it.hasNext()) {
            if (parseFilterColumnNameList.contains(it.next())) {
                return true;
            }
        }
        return false;
    }

    private void clearOperator() {
        this.transforms = null;
        this.transformProjectionProcessorMap = null;
        this.transformFilterProcessorMap = null;
        TransformExpressionCompiler.cleanUp();
    }
}
