package org.apache.flink.cdc.runtime.partitioning;

import java.io.Serializable;
import java.time.Duration;
import java.util.Optional;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.class */
public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEvent> implements OneInputStreamOperator<Event, PartitioningEvent>, Serializable {
    private static final long serialVersionUID = 1;
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(serialVersionUID);
    private final OperatorID schemaOperatorId;
    private final int downstreamParallelism;
    private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;
    private transient SchemaEvolutionClient schemaEvolutionClient;
    private transient LoadingCache<TableId, HashFunction<DataChangeEvent>> cachedHashFunctions;

    public PrePartitionOperator(OperatorID operatorID, int i, HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.schemaOperatorId = operatorID;
        this.downstreamParallelism = i;
        this.hashFunctionProvider = hashFunctionProvider;
    }

    public void open() throws Exception {
        super.open();
        this.schemaEvolutionClient = new SchemaEvolutionClient(getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway(), this.schemaOperatorId);
        this.cachedHashFunctions = createCache();
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        SchemaChangeEvent schemaChangeEvent = (Event) streamRecord.getValue();
        if (schemaChangeEvent instanceof SchemaChangeEvent) {
            TableId tableId = schemaChangeEvent.tableId();
            this.cachedHashFunctions.put(tableId, recreateHashFunction(tableId));
            broadcastEvent(schemaChangeEvent);
        } else if (schemaChangeEvent instanceof FlushEvent) {
            broadcastEvent(schemaChangeEvent);
        } else if (schemaChangeEvent instanceof DataChangeEvent) {
            partitionBy((DataChangeEvent) schemaChangeEvent);
        }
    }

    private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception {
        this.output.collect(new StreamRecord(new PartitioningEvent(dataChangeEvent, ((HashFunction) this.cachedHashFunctions.get(dataChangeEvent.tableId())).hashcode(dataChangeEvent) % this.downstreamParallelism)));
    }

    private void broadcastEvent(Event event) {
        for (int i = 0; i < this.downstreamParallelism; i++) {
            this.output.collect(new StreamRecord(new PartitioningEvent(EventSerializer.INSTANCE.copy(event), i)));
        }
    }

    private Schema loadLatestSchemaFromRegistry(TableId tableId) {
        try {
            Optional<Schema> latestEvolvedSchema = this.schemaEvolutionClient.getLatestEvolvedSchema(tableId);
            if (latestEvolvedSchema.isPresent()) {
                return latestEvolvedSchema.get();
            }
            throw new IllegalStateException(String.format("Schema is never registered or outdated for table \"%s\"", tableId));
        } catch (Exception e) {
            throw new RuntimeException(String.format("Failed to request latest schema for table \"%s\"", tableId), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HashFunction<DataChangeEvent> recreateHashFunction(TableId tableId) {
        return this.hashFunctionProvider.getHashFunction(tableId, loadLatestSchemaFromRegistry(tableId));
    }

    private LoadingCache<TableId, HashFunction<DataChangeEvent>> createCache() {
        return CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build(new CacheLoader<TableId, HashFunction<DataChangeEvent>>() { // from class: org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.1
            public HashFunction<DataChangeEvent> load(TableId tableId) {
                return PrePartitionOperator.this.recreateHashFunction(tableId);
            }
        });
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
    }
}
