package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.cdc.runtime.operators.transform.PreTransformChangeInfo;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.class */
public class SchemaRegistryRequestHandler implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
    private final MetadataApplier metadataApplier;
    private final SchemaManager schemaManager;
    private final SchemaDerivation schemaDerivation;
    private volatile Throwable currentChangeException;
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final OperatorCoordinator.Context context;
    private final Set<Integer> activeSinkWriters = ConcurrentHashMap.newKeySet();
    private final Set<Integer> flushedSinkWriters = ConcurrentHashMap.newKeySet();
    private final ExecutorService schemaChangeThreadPool = Executors.newSingleThreadExecutor();
    private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents = new ArrayList();
    private volatile List<SchemaChangeEvent> currentFinishedSchemaChanges = new ArrayList();
    private volatile List<SchemaChangeEvent> currentIgnoredSchemaChanges = new ArrayList();
    private volatile RequestStatus schemaChangeStatus = RequestStatus.IDLE;
    private final List<Integer> pendingSubTaskIds = new ArrayList();
    private final Object schemaChangeRequestLock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$cdc$common$event$SchemaChangeEventType = new int[SchemaChangeEventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$cdc$common$event$SchemaChangeEventType[SchemaChangeEventType.ADD_COLUMN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$cdc$common$event$SchemaChangeEventType[SchemaChangeEventType.DROP_COLUMN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$cdc$common$event$SchemaChangeEventType[SchemaChangeEventType.RENAME_COLUMN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler$RequestStatus.class */
    public enum RequestStatus {
        IDLE,
        WAITING_FOR_FLUSH,
        APPLYING,
        FINISHED
    }

    public SchemaRegistryRequestHandler(MetadataApplier metadataApplier, SchemaManager schemaManager, SchemaDerivation schemaDerivation, SchemaChangeBehavior schemaChangeBehavior, OperatorCoordinator.Context context) {
        this.metadataApplier = metadataApplier;
        this.schemaManager = schemaManager;
        this.schemaDerivation = schemaDerivation;
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.context = context;
    }

    public void handleSchemaChangeRequest(SchemaChangeRequest schemaChangeRequest, CompletableFuture<CoordinationResponse> completableFuture) {
        int subTaskId = schemaChangeRequest.getSubTaskId();
        synchronized (this.schemaChangeRequestLock) {
            if (this.schemaChangeStatus == RequestStatus.IDLE) {
                if (this.pendingSubTaskIds.isEmpty()) {
                    LOG.info("Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.", new Object[]{schemaChangeRequest.getSchemaChangeEvent(), schemaChangeRequest.getTableId().toString(), Integer.valueOf(subTaskId)});
                } else {
                    if (this.pendingSubTaskIds.get(0).intValue() != subTaskId) {
                        LOG.info("Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).", new Object[]{schemaChangeRequest.getSchemaChangeEvent(), schemaChangeRequest.getTableId().toString(), Integer.valueOf(subTaskId), this.pendingSubTaskIds});
                        if (!this.pendingSubTaskIds.contains(Integer.valueOf(subTaskId))) {
                            this.pendingSubTaskIds.add(Integer.valueOf(subTaskId));
                        }
                        completableFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.busy()));
                        return;
                    }
                    LOG.info("Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.", new Object[]{schemaChangeRequest.getSchemaChangeEvent(), schemaChangeRequest.getTableId().toString(), Integer.valueOf(subTaskId)});
                    this.pendingSubTaskIds.remove(0);
                }
                SchemaChangeEvent schemaChangeEvent = schemaChangeRequest.getSchemaChangeEvent();
                if (this.schemaManager.isOriginalSchemaChangeEventRedundant(schemaChangeEvent)) {
                    LOG.info("Event {} has been addressed before, ignoring it.", schemaChangeEvent);
                    clearCurrentSchemaChangeRequest();
                    LOG.info("SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", schemaChangeRequest);
                    completableFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.duplicate()));
                    return;
                }
                this.schemaManager.applyOriginalSchemaChange(schemaChangeEvent);
                List<SchemaChangeEvent> calculateDerivedSchemaChangeEvents = calculateDerivedSchemaChangeEvents(schemaChangeRequest.getSchemaChangeEvent());
                if (calculateDerivedSchemaChangeEvents.isEmpty()) {
                    LOG.info("Event {} is omitted from sending to downstream, ignoring it.", schemaChangeEvent);
                    clearCurrentSchemaChangeRequest();
                    LOG.info("SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", schemaChangeRequest);
                    completableFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.ignored()));
                    return;
                }
                LOG.info("SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");
                this.schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
                this.currentDerivedSchemaChangeEvents = new ArrayList(calculateDerivedSchemaChangeEvents);
                completableFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.accepted(calculateDerivedSchemaChangeEvents)));
            } else {
                LOG.info("Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).", new Object[]{schemaChangeRequest, Integer.valueOf(subTaskId), this.pendingSubTaskIds});
                if (!this.pendingSubTaskIds.contains(Integer.valueOf(subTaskId))) {
                    this.pendingSubTaskIds.add(Integer.valueOf(subTaskId));
                }
                completableFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.busy()));
            }
        }
    }

    private void applySchemaChange(TableId tableId, List<SchemaChangeEvent> list) {
        Iterator<SchemaChangeEvent> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            SchemaChangeEvent next = it.next();
            if (next.getType() != SchemaChangeEventType.CREATE_TABLE && this.schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
                this.currentIgnoredSchemaChanges.add(next);
            } else if (this.metadataApplier.acceptsSchemaEvolutionType(next.getType())) {
                try {
                    this.metadataApplier.applySchemaChange(next);
                    LOG.info("Applied schema change {} to table {}.", next, tableId);
                    this.schemaManager.applyEvolvedSchemaChange(next);
                    this.currentFinishedSchemaChanges.add(next);
                } catch (Throwable th) {
                    LOG.error("Failed to apply schema change {} to table {}. Caused by: {}", new Object[]{next, tableId, th});
                    if (!shouldIgnoreException(th)) {
                        this.currentChangeException = th;
                        break;
                    }
                    LOG.warn("Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", next, th);
                }
            } else {
                LOG.info("Ignored schema change {} to table {}.", next, tableId);
                this.currentIgnoredSchemaChanges.add(next);
            }
        }
        Preconditions.checkState(this.schemaChangeStatus == RequestStatus.APPLYING, "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " + this.schemaChangeStatus);
        this.schemaChangeStatus = RequestStatus.FINISHED;
        LOG.info("SchemaChangeStatus switched from APPLYING to FINISHED for request {}.", this.currentDerivedSchemaChangeEvents);
    }

    public void registerSinkWriter(int i) {
        LOG.info("Register sink subtask {}.", Integer.valueOf(i));
        this.activeSinkWriters.add(Integer.valueOf(i));
    }

    public void flushSuccess(TableId tableId, int i, int i2) {
        this.flushedSinkWriters.add(Integer.valueOf(i));
        if (this.activeSinkWriters.size() < i2) {
            LOG.info("Not all active sink writers have been registered. Current {}, expected {}.", Integer.valueOf(this.activeSinkWriters.size()), Integer.valueOf(i2));
        } else if (this.flushedSinkWriters.equals(this.activeSinkWriters)) {
            Preconditions.checkState(this.schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH, "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not " + this.schemaChangeStatus);
            this.schemaChangeStatus = RequestStatus.APPLYING;
            LOG.info("All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString());
            this.schemaChangeThreadPool.submit(() -> {
                applySchemaChange(tableId, this.currentDerivedSchemaChangeEvents);
            });
        }
    }

    public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> completableFuture) {
        Preconditions.checkState(this.schemaChangeStatus != RequestStatus.IDLE, "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
        if (this.schemaChangeStatus != RequestStatus.FINISHED) {
            completableFuture.complete(CoordinationResponseUtils.wrap(new SchemaChangeProcessingResponse()));
            return;
        }
        this.schemaChangeStatus = RequestStatus.IDLE;
        LOG.info("SchemaChangeStatus switched from FINISHED to IDLE for request {}", this.currentDerivedSchemaChangeEvents);
        completableFuture.complete(CoordinationResponseUtils.wrap(new SchemaChangeResultResponse(clearCurrentSchemaChangeRequest())));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.schemaChangeThreadPool != null) {
            this.schemaChangeThreadPool.shutdown();
        }
    }

    private List<SchemaChangeEvent> calculateDerivedSchemaChangeEvents(SchemaChangeEvent schemaChangeEvent) {
        return SchemaChangeBehavior.LENIENT.equals(this.schemaChangeBehavior) ? (List) this.schemaDerivation.applySchemaChange(schemaChangeEvent).stream().flatMap(schemaChangeEvent2 -> {
            return lenientizeSchemaChangeEvent(schemaChangeEvent2).stream();
        }).collect(Collectors.toList()) : this.schemaDerivation.applySchemaChange(schemaChangeEvent);
    }

    private List<SchemaChangeEvent> lenientizeSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) {
        if (schemaChangeEvent instanceof CreateTableEvent) {
            return Collections.singletonList(schemaChangeEvent);
        }
        TableId tableId = schemaChangeEvent.tableId();
        Schema orElseThrow = this.schemaManager.getLatestEvolvedSchema(tableId).orElseThrow(() -> {
            return new IllegalStateException("Evolved schema does not exist, not ready for schema change event " + schemaChangeEvent);
        });
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$cdc$common$event$SchemaChangeEventType[schemaChangeEvent.getType().ordinal()]) {
            case PreTransformChangeInfo.Serializer.VERSION_BEFORE_STATE_COMPATIBILITY /* 1 */:
                return Collections.singletonList(new AddColumnEvent(tableId, (List) ((AddColumnEvent) schemaChangeEvent).getAddedColumns().stream().map(columnWithPosition -> {
                    return new AddColumnEvent.ColumnWithPosition(Column.physicalColumn(columnWithPosition.getAddColumn().getName(), columnWithPosition.getAddColumn().getType().nullable(), columnWithPosition.getAddColumn().getComment(), columnWithPosition.getAddColumn().getDefaultValueExpression()));
                }).collect(Collectors.toList())));
            case 2:
                Stream stream = ((DropColumnEvent) schemaChangeEvent).getDroppedColumnNames().stream();
                orElseThrow.getClass();
                Map map = (Map) stream.map(orElseThrow::getColumn).flatMap(optional -> {
                    return (Stream) optional.map((v0) -> {
                        return Stream.of(v0);
                    }).orElse(Stream.empty());
                }).filter(column -> {
                    return !column.getType().isNullable();
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getName();
                }, column2 -> {
                    return column2.getType().nullable();
                }));
                return map.isEmpty() ? Collections.emptyList() : Collections.singletonList(new AlterColumnTypeEvent(tableId, map));
            case 3:
                ArrayList arrayList = new ArrayList();
                HashMap hashMap = new HashMap();
                ((RenameColumnEvent) schemaChangeEvent).getNameMapping().forEach((str, str2) -> {
                    Column column3 = (Column) orElseThrow.getColumn(str).orElseThrow(() -> {
                        return new IllegalArgumentException("Non-existed column " + str + " in evolved schema.");
                    });
                    if (!column3.getType().isNullable()) {
                        hashMap.put(str, column3.getType().nullable());
                    }
                    arrayList.add(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn(str2, column3.getType().nullable(), column3.getComment(), column3.getDefaultValueExpression())));
                });
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new AddColumnEvent(tableId, arrayList));
                if (!hashMap.isEmpty()) {
                    arrayList2.add(new AlterColumnTypeEvent(tableId, hashMap));
                }
                return arrayList2;
            default:
                return Collections.singletonList(schemaChangeEvent);
        }
    }

    private boolean shouldIgnoreException(Throwable th) {
        return (th instanceof UnsupportedSchemaChangeEventException) && this.schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE;
    }

    private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
        if (this.currentChangeException != null) {
            this.context.failJob(new RuntimeException("Failed to apply schema change.", this.currentChangeException));
        }
        ArrayList arrayList = new ArrayList(this.currentFinishedSchemaChanges);
        this.flushedSinkWriters.clear();
        this.currentDerivedSchemaChangeEvents.clear();
        this.currentFinishedSchemaChanges.clear();
        this.currentIgnoredSchemaChanges.clear();
        this.currentChangeException = null;
        return arrayList;
    }
}
