package co.cask.cdap.metadata;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.lineage.field.Operation;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.metadata.Metadata;
import co.cask.cdap.api.metadata.MetadataEntity;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.common.InvalidMetadataException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.common.utils.ImmutablePair;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.metadata.lineage.LineageDataset;
import co.cask.cdap.data2.metadata.lineage.field.FieldLineageDataset;
import co.cask.cdap.data2.metadata.lineage.field.FieldLineageInfo;
import co.cask.cdap.data2.metadata.writer.DataAccessLineage;
import co.cask.cdap.data2.metadata.writer.MetadataMessage;
import co.cask.cdap.data2.metadata.writer.MetadataOperation;
import co.cask.cdap.data2.registry.DatasetUsage;
import co.cask.cdap.data2.registry.UsageDataset;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.runtime.workflow.BasicWorkflowToken;
import co.cask.cdap.internal.app.store.AppMetadataStore;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.context.MultiThreadMessagingContext;
import co.cask.cdap.messaging.subscriber.AbstractMessagingSubscriberService;
import co.cask.cdap.metadata.profile.ProfileMetadataMessageProcessor;
import co.cask.cdap.proto.WorkflowNodeStateDetail;
import co.cask.cdap.proto.codec.EntityIdTypeAdapter;
import co.cask.cdap.proto.codec.OperationTypeAdapter;
import co.cask.cdap.proto.element.EntityType;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Inject;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metadata/MetadataSubscriberService.class */
public class MetadataSubscriberService extends AbstractMessagingSubscriberService<MetadataMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataSubscriberService.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(EntityId.class, new EntityIdTypeAdapter()).registerTypeAdapter(Operation.class, new OperationTypeAdapter()).create();
    private final CConfiguration cConf;
    private final DatasetFramework datasetFramework;
    private final MetadataAdmin metadataAdmin;
    private final Transactional transactional;
    private final MultiThreadMessagingContext messagingContext;
    private DatasetId lineageDatasetId;
    private DatasetId fieldLineageDatasetId;
    private DatasetId usageDatasetId;

    /* renamed from: co.cask.cdap.metadata.MetadataSubscriberService$1, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/metadata/MetadataSubscriberService$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type;
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataOperation$Type = new int[MetadataOperation.Type.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataOperation$Type[MetadataOperation.Type.PUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataOperation$Type[MetadataOperation.Type.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataOperation$Type[MetadataOperation.Type.DELETE_ALL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataOperation$Type[MetadataOperation.Type.DELETE_ALL_PROPERTIES.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataOperation$Type[MetadataOperation.Type.DELETE_ALL_TAGS.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type = new int[MetadataMessage.Type.values().length];
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.WORKFLOW_TOKEN.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.WORKFLOW_STATE.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.LINEAGE.ordinal()] = 3;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.FIELD_LINEAGE.ordinal()] = 4;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.USAGE.ordinal()] = 5;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.METADATA_OPERATION.ordinal()] = 6;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.DATASET_OPERATION.ordinal()] = 7;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.PROFILE_ASSIGNMENT.ordinal()] = 8;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.PROFILE_UNASSIGNMENT.ordinal()] = 9;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.ENTITY_CREATION.ordinal()] = 10;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[MetadataMessage.Type.ENTITY_DELETION.ordinal()] = 11;
            } catch (NoSuchFieldError e16) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/metadata/MetadataSubscriberService$DataAccessLineageProcessor.class */
    private final class DataAccessLineageProcessor implements MetadataMessageProcessor {
        private final LineageDataset lineageDataset;

        DataAccessLineageProcessor(DatasetContext datasetContext) {
            this.lineageDataset = LineageDataset.getLineageDataset(datasetContext, MetadataSubscriberService.this.datasetFramework, MetadataSubscriberService.this.lineageDatasetId);
        }

        @Override // co.cask.cdap.metadata.MetadataMessageProcessor
        public void processMessage(MetadataMessage metadataMessage) {
            if (!(metadataMessage.getEntityId() instanceof ProgramRunId)) {
                MetadataSubscriberService.LOG.warn("Missing program run id from the lineage access information. Ignoring the message {}", metadataMessage);
                return;
            }
            DataAccessLineage dataAccessLineage = (DataAccessLineage) metadataMessage.getPayload(MetadataSubscriberService.GSON, DataAccessLineage.class);
            ProgramRunId entityId = metadataMessage.getEntityId();
            if (dataAccessLineage.getDatasetId() != null) {
                this.lineageDataset.addAccess(entityId, dataAccessLineage.getDatasetId(), dataAccessLineage.getAccessType(), dataAccessLineage.getAccessTime(), dataAccessLineage.getComponentId());
            } else if (dataAccessLineage.getStreamId() != null) {
                this.lineageDataset.addAccess(entityId, dataAccessLineage.getStreamId(), dataAccessLineage.getAccessType(), dataAccessLineage.getAccessTime(), dataAccessLineage.getComponentId());
            } else {
                MetadataSubscriberService.LOG.warn("Missing dataset id from the lineage access information. Ignoring the message {}", metadataMessage);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/metadata/MetadataSubscriberService$FieldLineageProcessor.class */
    private final class FieldLineageProcessor implements MetadataMessageProcessor {
        private final FieldLineageDataset fieldLineageDataset;

        FieldLineageProcessor(DatasetContext datasetContext) {
            this.fieldLineageDataset = FieldLineageDataset.getFieldLineageDataset(datasetContext, MetadataSubscriberService.this.datasetFramework, MetadataSubscriberService.this.fieldLineageDatasetId);
        }

        @Override // co.cask.cdap.metadata.MetadataMessageProcessor
        public void processMessage(MetadataMessage metadataMessage) {
            if (!(metadataMessage.getEntityId() instanceof ProgramRunId)) {
                MetadataSubscriberService.LOG.warn("Missing program run id from the field lineage information. Ignoring the message {}", metadataMessage);
                return;
            }
            try {
                this.fieldLineageDataset.addFieldLineageInfo(metadataMessage.getEntityId(), (FieldLineageInfo) metadataMessage.getPayload(MetadataSubscriberService.GSON, FieldLineageInfo.class));
            } catch (Throwable th) {
                MetadataSubscriberService.LOG.warn("Error while deserializing the field lineage information message received from TMS. Ignoring : {}", metadataMessage, th);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/metadata/MetadataSubscriberService$MetadataOperationProcessor.class */
    private class MetadataOperationProcessor implements MetadataMessageProcessor {
        private MetadataOperationProcessor() {
        }

        @Override // co.cask.cdap.metadata.MetadataMessageProcessor
        public void processMessage(MetadataMessage metadataMessage) {
            MetadataOperation metadataOperation = (MetadataOperation) metadataMessage.getPayload(MetadataSubscriberService.GSON, MetadataOperation.class);
            Metadata metadata = metadataOperation.getMetadata();
            MetadataEntity entity = metadataOperation.getEntity();
            MetadataSubscriberService.LOG.trace("Received {} for entity {}: {}", new Object[]{metadataOperation, entity, metadata});
            switch (AnonymousClass1.$SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataOperation$Type[metadataOperation.getType().ordinal()]) {
                case 1:
                    if (metadata != null) {
                        try {
                            if (metadata.getProperties() != null && !metadata.getProperties().isEmpty()) {
                                MetadataSubscriberService.this.metadataAdmin.addProperties(entity, metadata.getProperties());
                            }
                        } catch (InvalidMetadataException e) {
                            MetadataSubscriberService.LOG.warn("Ignoring invalid metadata operation {} from TMS: {}", new Object[]{metadataOperation, MetadataSubscriberService.GSON.toJson(metadataMessage.getRawPayload()), e});
                            return;
                        }
                    }
                    if (metadata != null && metadata.getTags() != null && !metadata.getTags().isEmpty()) {
                        MetadataSubscriberService.this.metadataAdmin.addTags(entity, metadata.getTags());
                    }
                    return;
                case 2:
                    if (metadata != null && metadata.getProperties() != null && !metadata.getProperties().isEmpty()) {
                        MetadataSubscriberService.this.metadataAdmin.removeProperties(entity, metadata.getProperties().keySet());
                    }
                    if (metadata == null || metadata.getTags() == null || metadata.getTags().isEmpty()) {
                        return;
                    }
                    MetadataSubscriberService.this.metadataAdmin.removeTags(entity, metadata.getTags());
                    return;
                case 3:
                    MetadataSubscriberService.this.metadataAdmin.removeMetadata(entity);
                    return;
                case 4:
                    MetadataSubscriberService.this.metadataAdmin.removeProperties(entity);
                    return;
                case 5:
                    MetadataSubscriberService.this.metadataAdmin.removeTags(entity);
                    return;
                default:
                    MetadataSubscriberService.LOG.warn("Ignoring MetadataOperation of unknown type {} for entity {}", metadataOperation.getType(), entity);
                    return;
            }
        }

        /* synthetic */ MetadataOperationProcessor(MetadataSubscriberService metadataSubscriberService, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:co/cask/cdap/metadata/MetadataSubscriberService$UsageProcessor.class */
    private final class UsageProcessor implements MetadataMessageProcessor {
        private final UsageDataset usageDataset;

        UsageProcessor(DatasetContext datasetContext) {
            this.usageDataset = UsageDataset.getUsageDataset(datasetContext, MetadataSubscriberService.this.datasetFramework, MetadataSubscriberService.this.usageDatasetId);
        }

        @Override // co.cask.cdap.metadata.MetadataMessageProcessor
        public void processMessage(MetadataMessage metadataMessage) {
            if (!(metadataMessage.getEntityId() instanceof ProgramId)) {
                MetadataSubscriberService.LOG.warn("Missing program id from the usage information. Ignoring the message {}", metadataMessage);
                return;
            }
            DatasetUsage datasetUsage = (DatasetUsage) metadataMessage.getPayload(MetadataSubscriberService.GSON, DatasetUsage.class);
            ProgramId entityId = metadataMessage.getEntityId();
            if (datasetUsage.getDatasetId() != null) {
                this.usageDataset.register(entityId, datasetUsage.getDatasetId());
            } else if (datasetUsage.getStreamId() != null) {
                this.usageDataset.register(entityId, datasetUsage.getStreamId());
            } else {
                MetadataSubscriberService.LOG.warn("Missing dataset id from the usage information. Ignoring the message {}", metadataMessage);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/metadata/MetadataSubscriberService$WorkflowProcessor.class */
    private final class WorkflowProcessor implements MetadataMessageProcessor {
        private final AppMetadataStore appMetadataStore;

        WorkflowProcessor(DatasetContext datasetContext) {
            this.appMetadataStore = AppMetadataStore.create(MetadataSubscriberService.this.cConf, datasetContext, MetadataSubscriberService.this.datasetFramework);
        }

        @Override // co.cask.cdap.metadata.MetadataMessageProcessor
        public void processMessage(MetadataMessage metadataMessage) {
            if (!(metadataMessage.getEntityId() instanceof ProgramRunId)) {
                MetadataSubscriberService.LOG.warn("Missing program run id from the workflow state information. Ignoring the message {}", metadataMessage);
                return;
            }
            ProgramRunId entityId = metadataMessage.getEntityId();
            switch (AnonymousClass1.$SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[metadataMessage.getType().ordinal()]) {
                case 1:
                    this.appMetadataStore.setWorkflowToken(entityId, (WorkflowToken) metadataMessage.getPayload(MetadataSubscriberService.GSON, BasicWorkflowToken.class));
                    return;
                case 2:
                    this.appMetadataStore.addWorkflowNodeState(entityId, (WorkflowNodeStateDetail) metadataMessage.getPayload(MetadataSubscriberService.GSON, WorkflowNodeStateDetail.class));
                    return;
                default:
                    MetadataSubscriberService.LOG.warn("Unknown message type for workflow state information. Ignoring the message {}", metadataMessage);
                    return;
            }
        }
    }

    @Inject
    MetadataSubscriberService(CConfiguration cConfiguration, MessagingService messagingService, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, MetricsCollectionService metricsCollectionService, MetadataAdmin metadataAdmin) {
        super(NamespaceId.SYSTEM.topic(cConfiguration.get("metadata.messaging.topic")), true, cConfiguration.getInt("metadata.messaging.fetch.size"), cConfiguration.getInt("data.tx.timeout"), cConfiguration.getInt("data.tx.max.timeout"), cConfiguration.getLong("metadata.messaging.poll.delay.millis"), RetryStrategies.fromConfiguration(cConfiguration, "system.metadata."), metricsCollectionService.getContext(ImmutableMap.of("cmp", "master.services", "ins", "0", "ns", NamespaceId.SYSTEM.getNamespace(), "tpc", cConfiguration.get("metadata.messaging.topic"), "co", "metadata.writer")));
        this.lineageDatasetId = LineageDataset.LINEAGE_DATASET_ID;
        this.fieldLineageDatasetId = FieldLineageDataset.FIELD_LINEAGE_DATASET_ID;
        this.usageDatasetId = UsageDataset.USAGE_INSTANCE_ID;
        this.cConf = cConfiguration;
        this.messagingContext = new MultiThreadMessagingContext(messagingService);
        this.datasetFramework = datasetFramework;
        this.metadataAdmin = metadataAdmin;
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), new TransactionSystemClientAdapter(transactionSystemClient), NamespaceId.SYSTEM, Collections.emptyMap(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[]{this.messagingContext})), org.apache.tephra.RetryStrategies.retryOnConflict(20, 100L));
    }

    @VisibleForTesting
    MetadataSubscriberService setLineageDatasetId(DatasetId datasetId) {
        this.lineageDatasetId = datasetId;
        return this;
    }

    @VisibleForTesting
    MetadataSubscriberService setFieldLineageDatasetId(DatasetId datasetId) {
        this.fieldLineageDatasetId = datasetId;
        return this;
    }

    @VisibleForTesting
    MetadataSubscriberService setUsageDatasetId(DatasetId datasetId) {
        this.usageDatasetId = datasetId;
        return this;
    }

    protected MessagingContext getMessagingContext() {
        return this.messagingContext;
    }

    protected Transactional getTransactional() {
        return this.transactional;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: decodeMessage, reason: merged with bridge method [inline-methods] */
    public MetadataMessage m323decodeMessage(Message message) {
        return (MetadataMessage) GSON.fromJson(message.getPayloadAsString(), MetadataMessage.class);
    }

    @Nullable
    protected String loadMessageId(DatasetContext datasetContext) {
        return AppMetadataStore.create(this.cConf, datasetContext, this.datasetFramework).retrieveSubscriberState(getTopicId().getTopic(), "metadata.writer");
    }

    protected void storeMessageId(DatasetContext datasetContext, String str) {
        AppMetadataStore.create(this.cConf, datasetContext, this.datasetFramework).persistSubscriberState(getTopicId().getTopic(), "metadata.writer", str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean shouldRunInSeparateTx(MetadataMessage metadataMessage) {
        EntityType entityType = metadataMessage.getEntityId().getEntityType();
        return entityType.equals(EntityType.INSTANCE) || entityType.equals(EntityType.NAMESPACE);
    }

    protected void processMessages(DatasetContext datasetContext, Iterator<ImmutablePair<String, MetadataMessage>> it) {
        HashMap hashMap = new HashMap();
        while (it.hasNext()) {
            MetadataMessage metadataMessage = (MetadataMessage) it.next().getSecond();
            MetadataMessageProcessor metadataMessageProcessor = (MetadataMessageProcessor) hashMap.computeIfAbsent(metadataMessage.getType(), type -> {
                switch (AnonymousClass1.$SwitchMap$co$cask$cdap$data2$metadata$writer$MetadataMessage$Type[type.ordinal()]) {
                    case 1:
                    case 2:
                        return new WorkflowProcessor(datasetContext);
                    case 3:
                        return new DataAccessLineageProcessor(datasetContext);
                    case 4:
                        return new FieldLineageProcessor(datasetContext);
                    case 5:
                        return new UsageProcessor(datasetContext);
                    case 6:
                        return new MetadataOperationProcessor(this, null);
                    case 7:
                        return new DatasetOperationMessageProcessor(this.datasetFramework);
                    case 8:
                    case 9:
                    case 10:
                    case 11:
                        return new ProfileMetadataMessageProcessor(this.cConf, datasetContext, this.datasetFramework);
                    default:
                        return null;
                }
            });
            if (metadataMessageProcessor == null) {
                LOG.warn("Unsupported metadata message type {}. Message ignored.", metadataMessage.getType());
            } else {
                metadataMessageProcessor.processMessage(metadataMessage);
            }
        }
    }
}
