package org.apache.flink.cdc.runtime.operators.transform;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.class */
public class PostTransformOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event>, Serializable {
    private static final long serialVersionUID = 1;
    private final String timezone;
    private final List<TransformRule> transformRules;
    private transient List<PostTransformer> transforms;
    private final Map<TableId, PostTransformChangeInfo> postTransformChangeInfoMap;
    private final List<Tuple2<String, String>> udfFunctions;
    private List<UserDefinedFunctionDescriptor> udfDescriptors;
    private transient Map<String, Object> udfFunctionInstances;
    private transient Map<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor> transformProjectionProcessorMap;
    private transient Map<Tuple2<TableId, TransformFilter>, TransformFilterProcessor> transformFilterProcessorMap;
    private final Map<TableId, Boolean> hasAsteriskMap;
    private final Map<TableId, List<String>> projectedColumnsMap;

    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator$Builder.class */
    public static class Builder {
        private String timezone;
        private final List<TransformRule> transformRules = new ArrayList();
        private final List<Tuple2<String, String>> udfFunctions = new ArrayList();

        public Builder addTransform(String str, @Nullable String str2, @Nullable String str3, String str4, String str5, String str6) {
            this.transformRules.add(new TransformRule(str, str2, str3, str4, str5, str6));
            return this;
        }

        public Builder addTransform(String str, @Nullable String str2, @Nullable String str3) {
            this.transformRules.add(new TransformRule(str, str2, str3, "", "", ""));
            return this;
        }

        public Builder addTimezone(String str) {
            if (((String) PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue()).equals(str)) {
                this.timezone = ZoneId.systemDefault().toString();
            } else {
                this.timezone = str;
            }
            return this;
        }

        public Builder addUdfFunctions(List<Tuple2<String, String>> list) {
            this.udfFunctions.addAll(list);
            return this;
        }

        public PostTransformOperator build() {
            return new PostTransformOperator(this.transformRules, this.timezone, this.udfFunctions);
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private PostTransformOperator(List<TransformRule> list, String str, List<Tuple2<String, String>> list2) {
        this.transformRules = list;
        this.timezone = str;
        this.postTransformChangeInfoMap = new ConcurrentHashMap();
        this.transformFilterProcessorMap = new ConcurrentHashMap();
        this.transformProjectionProcessorMap = new ConcurrentHashMap();
        this.udfFunctions = list2;
        this.udfFunctionInstances = new ConcurrentHashMap();
        this.hasAsteriskMap = new HashMap();
        this.projectedColumnsMap = new HashMap();
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Event>> output) {
        super.setup(streamTask, streamConfig, output);
        this.udfDescriptors = (List) this.udfFunctions.stream().map(tuple2 -> {
            return new UserDefinedFunctionDescriptor((String) tuple2.f0, (String) tuple2.f1);
        }).collect(Collectors.toList());
    }

    public void open() throws Exception {
        super.open();
        this.transforms = (List) this.transformRules.stream().map(transformRule -> {
            return new PostTransformer(new Selectors.SelectorsBuilder().includeTables(transformRule.getTableInclusions()).build(), TransformProjection.of(transformRule.getProjection()).orElse(null), TransformFilter.of(transformRule.getFilter(), this.udfDescriptors).orElse(null));
        }).collect(Collectors.toList());
        this.transformProjectionProcessorMap = new ConcurrentHashMap();
        this.transformFilterProcessorMap = new ConcurrentHashMap();
        this.udfFunctionInstances = new ConcurrentHashMap();
        this.udfDescriptors.forEach(userDefinedFunctionDescriptor -> {
            try {
                this.udfFunctionInstances.put(userDefinedFunctionDescriptor.getName(), Class.forName(userDefinedFunctionDescriptor.getClasspath()).newInstance());
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                throw new RuntimeException("Failed to instantiate UDF function " + userDefinedFunctionDescriptor);
            }
        });
        initializeUdf();
    }

    public void finish() throws Exception {
        super.finish();
        clearOperator();
    }

    public void close() throws Exception {
        super.close();
        clearOperator();
        destroyUdf();
        this.udfFunctionInstances.clear();
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = (Event) streamRecord.getValue();
        if (event instanceof SchemaChangeEvent) {
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
            this.transformProjectionProcessorMap.keySet().removeIf(tuple2 -> {
                return Objects.equals(tuple2.f0, schemaChangeEvent.tableId());
            });
            this.transformFilterProcessorMap.keySet().removeIf(tuple22 -> {
                return Objects.equals(tuple22.f0, schemaChangeEvent.tableId());
            });
            cacheSchema(schemaChangeEvent).ifPresent(schemaChangeEvent2 -> {
                this.output.collect(new StreamRecord(schemaChangeEvent2));
            });
            return;
        }
        if (event instanceof DataChangeEvent) {
            Optional<DataChangeEvent> processDataChangeEvent = processDataChangeEvent((DataChangeEvent) event);
            if (processDataChangeEvent.isPresent()) {
                this.output.collect(new StreamRecord(processDataChangeEvent.get()));
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent schemaChangeEvent) throws Exception {
        TableId tableId = schemaChangeEvent.tableId();
        List emptyList = Collections.emptyList();
        if (schemaChangeEvent instanceof CreateTableEvent) {
            CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
            Set set = (Set) this.transforms.stream().filter(postTransformer -> {
                return postTransformer.getSelectors().isMatch(tableId);
            }).flatMap(postTransformer2 -> {
                return TransformParser.generateProjectionColumns((String) postTransformer2.getProjection().map((v0) -> {
                    return v0.getProjection();
                }).orElse(null), createTableEvent.getSchema().getColumns(), this.udfDescriptors).stream();
            }).map((v0) -> {
                return v0.getColumnName();
            }).collect(Collectors.toSet());
            if (this.transforms.stream().noneMatch(postTransformer3 -> {
                return postTransformer3.getSelectors().isMatch(tableId);
            })) {
                this.hasAsteriskMap.put(tableId, true);
            } else {
                this.hasAsteriskMap.put(tableId, Boolean.valueOf(this.transforms.stream().filter(postTransformer4 -> {
                    return postTransformer4.getSelectors().isMatch(tableId);
                }).anyMatch(postTransformer5 -> {
                    return TransformParser.hasAsterisk((String) postTransformer5.getProjection().map((v0) -> {
                        return v0.getProjection();
                    }).orElse(null));
                })));
            }
            Map<TableId, List<String>> map = this.projectedColumnsMap;
            Stream stream = createTableEvent.getSchema().getColumnNames().stream();
            set.getClass();
            map.put(tableId, stream.filter((v1) -> {
                return r3.contains(v1);
            }).collect(Collectors.toList()));
        } else {
            emptyList = getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames();
        }
        Schema schema = schemaChangeEvent instanceof CreateTableEvent ? ((CreateTableEvent) schemaChangeEvent).getSchema() : SchemaUtils.applySchemaChangeEvent(getPostTransformChangeInfo(tableId).getPreTransformedSchema(), schemaChangeEvent);
        Schema transformSchema = transformSchema(tableId, schema);
        this.postTransformChangeInfoMap.put(tableId, PostTransformChangeInfo.of(tableId, transformSchema, schema));
        return schemaChangeEvent instanceof CreateTableEvent ? Optional.of(new CreateTableEvent(tableId, transformSchema)) : this.hasAsteriskMap.getOrDefault(tableId, true).booleanValue() ? SchemaUtils.transformSchemaChangeEvent(true, emptyList, schemaChangeEvent) : SchemaUtils.transformSchemaChangeEvent(false, this.projectedColumnsMap.get(tableId), schemaChangeEvent);
    }

    private PostTransformChangeInfo getPostTransformChangeInfo(TableId tableId) {
        PostTransformChangeInfo postTransformChangeInfo = this.postTransformChangeInfoMap.get(tableId);
        if (postTransformChangeInfo == null) {
            throw new RuntimeException("Schema for " + tableId + " not found. This shouldn't happen.");
        }
        return postTransformChangeInfo;
    }

    private Schema transformSchema(TableId tableId, Schema schema) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (PostTransformer postTransformer : this.transforms) {
            if (postTransformer.getSelectors().isMatch(tableId) && postTransformer.getProjection().isPresent()) {
                TransformProjection transformProjection = postTransformer.getProjection().get();
                if (transformProjection.isValid()) {
                    if (!this.transformProjectionProcessorMap.containsKey(Tuple2.of(tableId, transformProjection))) {
                        this.transformProjectionProcessorMap.put(Tuple2.of(tableId, transformProjection), TransformProjectionProcessor.of(transformProjection, this.timezone, this.udfDescriptors, getUdfFunctionInstances()));
                    }
                    arrayList.add(this.transformProjectionProcessorMap.get(Tuple2.of(tableId, transformProjection)).processSchemaChangeEvent(schema));
                }
            }
        }
        return arrayList.isEmpty() ? schema : SchemaUtils.inferWiderSchema(arrayList);
    }

    private List<Object> getUdfFunctionInstances() {
        return (List) this.udfDescriptors.stream().map(userDefinedFunctionDescriptor -> {
            return this.udfFunctionInstances.get(userDefinedFunctionDescriptor.getName());
        }).collect(Collectors.toList());
    }

    private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        PostTransformChangeInfo postTransformChangeInfo = getPostTransformChangeInfo(tableId);
        ArrayList<Optional> arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        for (PostTransformer postTransformer : this.transforms) {
            if (postTransformer.getSelectors().isMatch(tableId)) {
                Optional<DataChangeEvent> of = Optional.of(dataChangeEvent);
                Optional<TransformProjection> projection = postTransformer.getProjection();
                Optional<TransformFilter> filter = postTransformer.getFilter();
                if (filter.isPresent() && filter.get().isVaild()) {
                    TransformFilter transformFilter = filter.get();
                    if (!this.transformFilterProcessorMap.containsKey(Tuple2.of(tableId, transformFilter))) {
                        this.transformFilterProcessorMap.put(Tuple2.of(tableId, transformFilter), TransformFilterProcessor.of(postTransformChangeInfo, transformFilter, this.timezone, this.udfDescriptors, getUdfFunctionInstances()));
                    }
                    of = processFilter(this.transformFilterProcessorMap.get(Tuple2.of(tableId, transformFilter)), of.get(), currentTimeMillis);
                }
                if (of.isPresent() && projection.isPresent() && projection.get().isValid()) {
                    TransformProjection transformProjection = projection.get();
                    if (!this.transformProjectionProcessorMap.containsKey(Tuple2.of(tableId, transformProjection)) || !this.transformProjectionProcessorMap.get(Tuple2.of(tableId, transformProjection)).hasTableInfo()) {
                        this.transformProjectionProcessorMap.put(Tuple2.of(tableId, transformProjection), TransformProjectionProcessor.of(postTransformChangeInfo, transformProjection, this.timezone, this.udfDescriptors, getUdfFunctionInstances()));
                    }
                    of = processProjection(this.transformProjectionProcessorMap.get(Tuple2.of(tableId, transformProjection)), of.get(), currentTimeMillis);
                }
                arrayList.add(of);
            }
        }
        if (arrayList.isEmpty()) {
            return processPostProjection(postTransformChangeInfo, dataChangeEvent);
        }
        for (Optional optional : arrayList) {
            if (optional.isPresent()) {
                return processPostProjection(postTransformChangeInfo, (DataChangeEvent) optional.get());
            }
        }
        return Optional.empty();
    }

    private Optional<DataChangeEvent> processFilter(TransformFilterProcessor transformFilterProcessor, DataChangeEvent dataChangeEvent, long j) throws Exception {
        BinaryRecordData binaryRecordData = (BinaryRecordData) dataChangeEvent.before();
        BinaryRecordData binaryRecordData2 = (BinaryRecordData) dataChangeEvent.after();
        if (binaryRecordData2 != null) {
            return transformFilterProcessor.process(binaryRecordData2, j, opTypeToRowKind(dataChangeEvent.op(), '+')) ? Optional.of(dataChangeEvent) : Optional.empty();
        }
        if (binaryRecordData != null && transformFilterProcessor.process(binaryRecordData, j, opTypeToRowKind(dataChangeEvent.op(), '-'))) {
            return Optional.of(dataChangeEvent);
        }
        return Optional.empty();
    }

    private Optional<DataChangeEvent> processProjection(TransformProjectionProcessor transformProjectionProcessor, DataChangeEvent dataChangeEvent, long j) {
        BinaryRecordData binaryRecordData = (BinaryRecordData) dataChangeEvent.before();
        BinaryRecordData binaryRecordData2 = (BinaryRecordData) dataChangeEvent.after();
        if (binaryRecordData != null) {
            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, transformProjectionProcessor.processData(binaryRecordData, j, opTypeToRowKind(dataChangeEvent.op(), '-')));
        }
        if (binaryRecordData2 != null) {
            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, transformProjectionProcessor.processData(binaryRecordData2, j, opTypeToRowKind(dataChangeEvent.op(), '+')));
        }
        return Optional.of(dataChangeEvent);
    }

    private Optional<DataChangeEvent> processPostProjection(PostTransformChangeInfo postTransformChangeInfo, DataChangeEvent dataChangeEvent) throws Exception {
        BinaryRecordData binaryRecordData = (BinaryRecordData) dataChangeEvent.before();
        BinaryRecordData binaryRecordData2 = (BinaryRecordData) dataChangeEvent.after();
        if (binaryRecordData != null) {
            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectRecord(postTransformChangeInfo, binaryRecordData));
        }
        if (binaryRecordData2 != null) {
            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectRecord(postTransformChangeInfo, binaryRecordData2));
        }
        return Optional.of(dataChangeEvent);
    }

    private BinaryRecordData projectRecord(PostTransformChangeInfo postTransformChangeInfo, BinaryRecordData binaryRecordData) {
        ArrayList arrayList = new ArrayList();
        for (RecordData.FieldGetter fieldGetter : postTransformChangeInfo.getPostTransformedFieldGetters()) {
            arrayList.add(fieldGetter.getFieldOrNull(binaryRecordData));
        }
        return postTransformChangeInfo.getRecordDataGenerator().generate(arrayList.toArray(new Object[arrayList.size()]));
    }

    private void clearOperator() {
        this.transforms = null;
        this.transformProjectionProcessorMap = null;
        this.transformFilterProcessorMap = null;
        TransformExpressionCompiler.cleanUp();
    }

    private void initializeUdf() {
        this.udfDescriptors.forEach(userDefinedFunctionDescriptor -> {
            try {
                if (userDefinedFunctionDescriptor.isCdcPipelineUdf()) {
                    Object obj = this.udfFunctionInstances.get(userDefinedFunctionDescriptor.getName());
                    obj.getClass().getMethod("open", new Class[0]).invoke(obj, new Object[0]);
                }
            } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                throw new RuntimeException("Failed to initialize UDF " + userDefinedFunctionDescriptor, e);
            }
        });
    }

    private void destroyUdf() {
        this.udfDescriptors.forEach(userDefinedFunctionDescriptor -> {
            try {
                if (userDefinedFunctionDescriptor.isCdcPipelineUdf()) {
                    Object obj = this.udfFunctionInstances.get(userDefinedFunctionDescriptor.getName());
                    obj.getClass().getMethod("close", new Class[0]).invoke(obj, new Object[0]);
                }
            } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                throw new RuntimeException("Failed to destroy UDF " + userDefinedFunctionDescriptor, e);
            }
        });
    }

    private String opTypeToRowKind(OperationType operationType, char c) {
        return String.format("%c%c", Character.valueOf(c), Character.valueOf(operationType.name().charAt(0)));
    }
}
