package co.cask.cdap.internal.app.runtime.flow;

import co.cask.cdap.api.annotation.Batch;
import co.cask.cdap.api.annotation.HashPartition;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.RoundRobin;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.FlowletDefinition;
import co.cask.cdap.api.metrics.MetricDeleteQuery;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.queue.QueueSpecification;
import co.cask.cdap.app.queue.QueueSpecificationGenerator;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerGroupConfig;
import co.cask.cdap.data2.queue.DequeueStrategy;
import co.cask.cdap.data2.transaction.ForwardingTransactionAware;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.queue.QueueConfigurer;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.queue.SimpleQueueSpecificationGenerator;
import co.cask.cdap.internal.io.ReflectionSchemaGenerator;
import co.cask.cdap.internal.io.SchemaGenerator;
import co.cask.cdap.internal.lang.MethodVisitor;
import co.cask.cdap.internal.lang.Reflections;
import co.cask.cdap.internal.lang.Visitor;
import co.cask.cdap.internal.specification.FlowletMethod;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.common.hash.Hashing;
import com.google.common.io.Closeables;
import com.google.common.reflect.TypeToken;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowUtils.class */
public final class FlowUtils {
    public static final String FLOWLET_SCOPE = "flowlet";
    private static final Logger LOG = LoggerFactory.getLogger(FlowUtils.class);

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowUtils$ConsumerGroupConfigurer.class */
    private static final class ConsumerGroupConfigurer extends ForwardingTransactionAware implements Closeable {
        private final QueueConfigurer queueConfigurer;
        private final List<ConsumerGroupConfig> groupConfigs;

        private ConsumerGroupConfigurer(QueueConfigurer queueConfigurer, Iterable<? extends ConsumerGroupConfig> iterable) {
            this.queueConfigurer = queueConfigurer;
            this.groupConfigs = ImmutableList.copyOf(iterable);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void configure() throws Exception {
            this.queueConfigurer.configureGroups(this.groupConfigs);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.queueConfigurer.close();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: delegate, reason: merged with bridge method [inline-methods] */
        public TransactionAware m190delegate() {
            return this.queueConfigurer;
        }
    }

    @Deprecated
    public static long generateConsumerGroupId(Id.Program program, String str) {
        return generateConsumerGroupId(program.toEntityId(), str);
    }

    public static long generateConsumerGroupId(ProgramId programId, String str) {
        String namespace = programId.getNamespace();
        return Hashing.md5().newHasher().putString(NamespaceId.DEFAULT.getEntityName().equals(namespace) ? "developer" : namespace).putString(programId.getApplication()).putString(programId.getProgram()).putString(str).hash().asLong();
    }

    public static ConsumerGroupConfig createConsumerGroupConfig(long j, int i, Method method) {
        HashPartition annotation = method.getAnnotation(HashPartition.class);
        RoundRobin annotation2 = method.getAnnotation(RoundRobin.class);
        DequeueStrategy dequeueStrategy = DequeueStrategy.FIFO;
        String str = null;
        Preconditions.checkArgument(annotation == null || annotation2 == null, "Only one strategy allowed for process() method: %s", new Object[]{method.getName()});
        if (annotation != null) {
            dequeueStrategy = DequeueStrategy.HASH;
            str = annotation.value();
            Preconditions.checkArgument(!str.isEmpty(), "Partition key cannot be empty: %s", new Object[]{method.getName()});
        } else if (annotation2 != null) {
            dequeueStrategy = DequeueStrategy.ROUND_ROBIN;
        }
        return new ConsumerGroupConfig(j, i, dequeueStrategy, str);
    }

    public static Multimap<String, QueueName> configureQueue(Program program, FlowSpecification flowSpecification, final StreamAdmin streamAdmin, QueueAdmin queueAdmin, TransactionExecutorFactory transactionExecutorFactory) {
        Table<QueueSpecificationGenerator.Node, String, Set<QueueSpecification>> create = new SimpleQueueSpecificationGenerator(new ApplicationId(program.getNamespaceId(), program.getApplicationId())).create(flowSpecification);
        HashMultimap create2 = HashMultimap.create();
        ImmutableSetMultimap.Builder builder = ImmutableSetMultimap.builder();
        Iterator it = flowSpecification.getFlowlets().entrySet().iterator();
        while (it.hasNext()) {
            String str = (String) ((Map.Entry) it.next()).getKey();
            Iterator it2 = Iterables.concat(create.column(str).values()).iterator();
            while (it2.hasNext()) {
                builder.put(str, ((QueueSpecification) it2.next()).getQueueName());
            }
        }
        Iterator it3 = Iterables.concat(create.values()).iterator();
        while (it3.hasNext()) {
            QueueName queueName = ((QueueSpecification) it3.next()).getQueueName();
            create2.putAll(queueName, getAllConsumerGroups(program, flowSpecification, queueName, create));
        }
        try {
            final ArrayList newArrayList = Lists.newArrayList();
            final HashMap newHashMap = Maps.newHashMap();
            for (Map.Entry entry : create2.asMap().entrySet()) {
                LOG.info("Queue config for {} : {}", entry.getKey(), entry.getValue());
                if (((QueueName) entry.getKey()).isStream()) {
                    HashMap newHashMap2 = Maps.newHashMap();
                    for (ConsumerGroupConfig consumerGroupConfig : (Collection) entry.getValue()) {
                        newHashMap2.put(Long.valueOf(consumerGroupConfig.getGroupId()), Integer.valueOf(consumerGroupConfig.getGroupSize()));
                    }
                    newHashMap.put(((QueueName) entry.getKey()).toStreamId(), newHashMap2);
                } else {
                    newArrayList.add(new ConsumerGroupConfigurer(queueAdmin.getQueueConfigurer((QueueName) entry.getKey()), (Iterable) entry.getValue()));
                }
            }
            try {
                Transactions.createTransactionExecutor(transactionExecutorFactory, newArrayList).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowUtils.1
                    public void apply() throws Exception {
                        Iterator it4 = newArrayList.iterator();
                        while (it4.hasNext()) {
                            ((ConsumerGroupConfigurer) it4.next()).configure();
                        }
                        for (Map.Entry entry2 : newHashMap.entrySet()) {
                            streamAdmin.configureGroups((StreamId) entry2.getKey(), (Map) entry2.getValue());
                        }
                    }
                });
                Iterator it4 = newArrayList.iterator();
                while (it4.hasNext()) {
                    Closeables.closeQuietly((ConsumerGroupConfigurer) it4.next());
                }
                return builder.build();
            } catch (Throwable th) {
                Iterator it5 = newArrayList.iterator();
                while (it5.hasNext()) {
                    Closeables.closeQuietly((ConsumerGroupConfigurer) it5.next());
                }
                throw th;
            }
        } catch (Exception e) {
            LOG.error("Failed to configure queues", e);
            throw Throwables.propagate(e);
        }
    }

    public static void reconfigure(Iterable<QueueName> iterable, final long j, final int i, StreamAdmin streamAdmin, QueueAdmin queueAdmin, TransactionExecutorFactory transactionExecutorFactory) throws Exception {
        final ArrayList newArrayList = Lists.newArrayList();
        for (QueueName queueName : iterable) {
            if (queueName.isStream()) {
                streamAdmin.configureInstances(queueName.toStreamId(), j, i);
            } else {
                newArrayList.add(queueAdmin.getQueueConfigurer(queueName));
            }
        }
        try {
            Transactions.createTransactionExecutor(transactionExecutorFactory, newArrayList).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowUtils.2
                public void apply() throws Exception {
                    Iterator it = newArrayList.iterator();
                    while (it.hasNext()) {
                        ((QueueConfigurer) it.next()).configureInstances(j, i);
                    }
                }
            });
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                Closeables.closeQuietly((QueueConfigurer) it.next());
            }
        } catch (Throwable th) {
            Iterator it2 = newArrayList.iterator();
            while (it2.hasNext()) {
                Closeables.closeQuietly((QueueConfigurer) it2.next());
            }
            throw th;
        }
    }

    private static Set<ConsumerGroupConfig> getAllConsumerGroups(Program program, FlowSpecification flowSpecification, QueueName queueName, Table<QueueSpecificationGenerator.Node, String, Set<QueueSpecification>> table) {
        HashSet newHashSet = Sets.newHashSet();
        ReflectionSchemaGenerator reflectionSchemaGenerator = new ReflectionSchemaGenerator();
        for (Map.Entry entry : flowSpecification.getFlowlets().entrySet()) {
            String str = (String) entry.getKey();
            for (QueueSpecification queueSpecification : Iterables.concat(table.column(str).values())) {
                if (queueSpecification.getQueueName().equals(queueName)) {
                    try {
                        FlowletDefinition flowletDefinition = (FlowletDefinition) entry.getValue();
                        addConsumerGroup(queueSpecification, program.getClassLoader().loadClass(flowletDefinition.getFlowletSpec().getClassName()), generateConsumerGroupId(program.getId(), str), flowletDefinition.getInstances(), reflectionSchemaGenerator, newHashSet);
                    } catch (ClassNotFoundException e) {
                        throw Throwables.propagate(e);
                    }
                }
            }
        }
        return newHashSet;
    }

    private static void addConsumerGroup(final QueueSpecification queueSpecification, Type type, final long j, final int i, final SchemaGenerator schemaGenerator, final Collection<ConsumerGroupConfig> collection) {
        final HashSet newHashSet = Sets.newHashSet();
        Reflections.visit((Object) null, type, new MethodVisitor() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowUtils.3
            public void visit(Object obj, Type type2, Type type3, Method method) throws Exception {
                ProcessInput annotation;
                if (newHashSet.add(FlowletMethod.create(method, type2)) && (annotation = method.getAnnotation(ProcessInput.class)) != null) {
                    HashSet newHashSet2 = Sets.newHashSet(annotation.value());
                    if (newHashSet2.isEmpty()) {
                        newHashSet2.add("");
                    }
                    TypeToken of = TypeToken.of(type2);
                    TypeToken resolveType = of.resolveType(method.getGenericParameterTypes()[0]);
                    if (method.isAnnotationPresent(Batch.class) && Iterator.class.equals(resolveType.getRawType())) {
                        Preconditions.checkArgument(resolveType.getType() instanceof ParameterizedType, "Only ParameterizedType is supported for batch Iterator.");
                        resolveType = of.resolveType(((ParameterizedType) resolveType.getType()).getActualTypeArguments()[0]);
                    }
                    if (queueSpecification.getInputSchema().equals(schemaGenerator.generate(resolveType.getType()))) {
                        if (newHashSet2.contains(queueSpecification.getQueueName().getSimpleName()) || newHashSet2.contains("")) {
                            collection.add(FlowUtils.createConsumerGroupConfig(j, i, method));
                        }
                    }
                }
            }
        }, new Visitor[0]);
    }

    public static void deleteFlowPendingMetrics(MetricStore metricStore, @Nullable String str, @Nullable String str2, @Nullable String str3) throws Exception {
        Preconditions.checkArgument(str != null || str2 == null, "Namespace may only be null if AppId is null");
        Preconditions.checkArgument(str2 != null || str3 == null, "AppId may only be null if FlowId is null");
        Set singleton = Collections.singleton("system.queue.pending");
        HashMap newHashMap = Maps.newHashMap();
        if (str != null) {
            newHashMap.put("ns", str);
            if (str2 != null) {
                newHashMap.put("app", str2);
                if (str3 != null) {
                    newHashMap.put("fl", str3);
                }
            }
        }
        LOG.info("Deleting 'system.queue.pending' metric for context {}", newHashMap);
        metricStore.delete(new MetricDeleteQuery(0L, (System.currentTimeMillis() / 1000) + 1, singleton, newHashMap));
    }

    private FlowUtils() {
    }
}
