package io.apicurio.registry.streams;

import com.google.common.collect.ImmutableMap;
import io.apicurio.registry.storage.proto.Str;
import io.apicurio.registry.streams.diservice.AsyncBiFunctionService;
import io.apicurio.registry.streams.diservice.AsyncBiFunctionServiceGrpcLocalDispatcher;
import io.apicurio.registry.streams.diservice.DefaultGrpcChannelProvider;
import io.apicurio.registry.streams.diservice.DistributedAsyncBiFunctionService;
import io.apicurio.registry.streams.diservice.LocalService;
import io.apicurio.registry.streams.diservice.proto.AsyncBiFunctionServiceGrpc;
import io.apicurio.registry.streams.distore.DistributedReadOnlyKeyValueStore;
import io.apicurio.registry.streams.distore.ExtReadOnlyKeyValueStore;
import io.apicurio.registry.streams.distore.KeyValueSerde;
import io.apicurio.registry.streams.distore.KeyValueStoreGrpcImplLocalDispatcher;
import io.apicurio.registry.streams.distore.UnknownStatusDescriptionInterceptor;
import io.apicurio.registry.streams.distore.proto.KeyValueStoreGrpc;
import io.apicurio.registry.streams.utils.ForeachActionDispatcher;
import io.apicurio.registry.streams.utils.Lifecycle;
import io.apicurio.registry.streams.utils.LoggingStateRestoreListener;
import io.apicurio.registry.streams.utils.WaitForDataService;
import io.apicurio.registry.types.Current;
import io.apicurio.registry.utils.ConcurrentUtil;
import io.apicurio.registry.utils.kafka.AsyncProducer;
import io.apicurio.registry.utils.kafka.KafkaProperties;
import io.apicurio.registry.utils.kafka.KafkaUtil;
import io.apicurio.registry.utils.kafka.ProducerActions;
import io.apicurio.registry.utils.kafka.ProtoSerde;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Properties;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Disposes;
import javax.enterprise.inject.Produces;
import javax.enterprise.inject.spi.InjectionPoint;
import javax.inject.Singleton;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:io/apicurio/registry/streams/StreamsRegistryConfiguration.class */
public class StreamsRegistryConfiguration {
    private static final Logger log = LoggerFactory.getLogger(StreamsRegistryConfiguration.class);

    private static void close(Object obj) {
        if (obj instanceof AutoCloseable) {
            try {
                ((AutoCloseable) obj).close();
            } catch (Exception e) {
            }
        }
    }

    @Produces
    public Properties properties(InjectionPoint injectionPoint) {
        return KafkaUtil.properties((KafkaProperties) injectionPoint.getAnnotated().getAnnotation(KafkaProperties.class));
    }

    @ApplicationScoped
    @Produces
    public StreamsProperties streamsProperties(@KafkaProperties("registry.streams.topology.") Properties properties) {
        return new StreamsPropertiesImpl(properties);
    }

    @ApplicationScoped
    @Produces
    public ProducerActions<String, Str.StorageValue> storageProducer(@KafkaProperties("registry.streams.storage-producer.") Properties properties) {
        return new AsyncProducer(properties, Serdes.String().serializer(), ProtoSerde.parsedWith(Str.StorageValue.parser()));
    }

    public void stopStorageProducer(@Disposes ProducerActions<String, Str.StorageValue> producerActions) throws Exception {
        producerActions.close();
    }

    @Singleton
    @Produces
    public KafkaStreams storageStreams(StreamsProperties streamsProperties, ForeachAction<? super String, ? super Str.Data> foreachAction) {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsTopologyProvider(streamsProperties, foreachAction).get(), streamsProperties.getProperties());
        kafkaStreams.setGlobalStateRestoreListener(new LoggingStateRestoreListener());
        return kafkaStreams;
    }

    public void init(@Observes StartupEvent startupEvent, KafkaStreams kafkaStreams) {
        kafkaStreams.start();
    }

    public void destroy(@Observes ShutdownEvent shutdownEvent, KafkaStreams kafkaStreams) {
        kafkaStreams.close();
    }

    @Singleton
    @Produces
    public HostInfo storageLocalHost(StreamsProperties streamsProperties) {
        String applicationServer = streamsProperties.getApplicationServer();
        String[] split = applicationServer.split(":");
        log.info("Application server gRPC: '{}'", applicationServer);
        return new HostInfo(split[0], Integer.parseInt(split[1]));
    }

    @ApplicationScoped
    @Produces
    public ExtReadOnlyKeyValueStore<String, Str.Data> storageKeyValueStore(KafkaStreams kafkaStreams, HostInfo hostInfo, StreamsProperties streamsProperties) {
        return new DistributedReadOnlyKeyValueStore(kafkaStreams, hostInfo, streamsProperties.getStorageStoreName(), Serdes.String(), ProtoSerde.parsedWith(Str.Data.parser()), new DefaultGrpcChannelProvider(), true);
    }

    public void destroyStorageStore(@Observes ShutdownEvent shutdownEvent, ExtReadOnlyKeyValueStore<String, Str.Data> extReadOnlyKeyValueStore) {
        close(extReadOnlyKeyValueStore);
    }

    @ApplicationScoped
    @Produces
    public ReadOnlyKeyValueStore<Long, Str.TupleValue> globalIdKeyValueStore(KafkaStreams kafkaStreams, HostInfo hostInfo, StreamsProperties streamsProperties) {
        return new DistributedReadOnlyKeyValueStore(kafkaStreams, hostInfo, streamsProperties.getGlobalIdStoreName(), Serdes.Long(), ProtoSerde.parsedWith(Str.TupleValue.parser()), new DefaultGrpcChannelProvider(), true);
    }

    public void destroyGlobaIdStore(@Observes ShutdownEvent shutdownEvent, ReadOnlyKeyValueStore<Long, Str.TupleValue> readOnlyKeyValueStore) {
        close(readOnlyKeyValueStore);
    }

    @Singleton
    @Produces
    public ForeachActionDispatcher<String, Str.Data> dataDispatcher() {
        return new ForeachActionDispatcher<>();
    }

    @Singleton
    @Produces
    public WaitForDataService waitForDataServiceImpl(ReadOnlyKeyValueStore<String, Str.Data> readOnlyKeyValueStore, ForeachActionDispatcher<String, Str.Data> foreachActionDispatcher) {
        return new WaitForDataService(readOnlyKeyValueStore, foreachActionDispatcher);
    }

    @Singleton
    @Produces
    public LocalService<AsyncBiFunctionService.WithSerdes<String, Long, Str.Data>> localWaitForDataService(WaitForDataService waitForDataService) {
        return new LocalService<>(WaitForDataService.NAME, waitForDataService);
    }

    @ApplicationScoped
    @Current
    @Produces
    public AsyncBiFunctionService<String, Long, Str.Data> waitForDataUpdateService(StreamsProperties streamsProperties, KafkaStreams kafkaStreams, HostInfo hostInfo, LocalService<AsyncBiFunctionService.WithSerdes<String, Long, Str.Data>> localService) {
        return new DistributedAsyncBiFunctionService(kafkaStreams, hostInfo, streamsProperties.getStorageStoreName(), localService, new DefaultGrpcChannelProvider());
    }

    public void destroyWaitForDataUpdateService(@Observes ShutdownEvent shutdownEvent, @Current AsyncBiFunctionService<String, Long, Str.Data> asyncBiFunctionService) {
        close(asyncBiFunctionService);
    }

    @ApplicationScoped
    @Produces
    public Lifecycle storageGrpcServer(HostInfo hostInfo, KeyValueStoreGrpc.KeyValueStoreImplBase keyValueStoreImplBase, AsyncBiFunctionServiceGrpc.AsyncBiFunctionServiceImplBase asyncBiFunctionServiceImplBase) {
        UnknownStatusDescriptionInterceptor unknownStatusDescriptionInterceptor = new UnknownStatusDescriptionInterceptor(ImmutableMap.of(IllegalArgumentException.class, Status.INVALID_ARGUMENT, IllegalStateException.class, Status.FAILED_PRECONDITION, InvalidStateStoreException.class, Status.FAILED_PRECONDITION, Throwable.class, Status.INTERNAL));
        final Server build = ServerBuilder.forPort(hostInfo.port()).addService(ServerInterceptors.intercept(keyValueStoreImplBase, new ServerInterceptor[]{unknownStatusDescriptionInterceptor})).addService(ServerInterceptors.intercept(asyncBiFunctionServiceImplBase, new ServerInterceptor[]{unknownStatusDescriptionInterceptor})).build();
        return new Lifecycle() { // from class: io.apicurio.registry.streams.StreamsRegistryConfiguration.1
            @Override // io.apicurio.registry.streams.utils.Lifecycle
            public void start() {
                try {
                    build.start();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }

            @Override // io.apicurio.registry.streams.utils.Lifecycle
            public void stop() {
                ConcurrentUtil.consumer((v0) -> {
                    v0.awaitTermination();
                }).accept(build.shutdown());
            }

            @Override // io.apicurio.registry.streams.utils.Lifecycle
            public boolean isRunning() {
                return (build.isShutdown() || build.isTerminated()) ? false : true;
            }
        };
    }

    public void init(@Observes StartupEvent startupEvent, Lifecycle lifecycle) {
        lifecycle.start();
    }

    public void destroy(@Observes ShutdownEvent shutdownEvent, Lifecycle lifecycle) {
        lifecycle.stop();
    }

    @Singleton
    @Produces
    public KeyValueStoreGrpc.KeyValueStoreImplBase streamsKeyValueStoreGrpcImpl(KafkaStreams kafkaStreams, StreamsProperties streamsProperties) {
        return new KeyValueStoreGrpcImplLocalDispatcher(kafkaStreams, KeyValueSerde.newRegistry().register(streamsProperties.getStorageStoreName(), Serdes.String(), ProtoSerde.parsedWith(Str.Data.parser())).register(streamsProperties.getGlobalIdStoreName(), Serdes.Long(), ProtoSerde.parsedWith(Str.TupleValue.parser())));
    }

    @Singleton
    @Produces
    public AsyncBiFunctionServiceGrpc.AsyncBiFunctionServiceImplBase storageAsyncBiFunctionServiceGrpcImpl(LocalService<AsyncBiFunctionService.WithSerdes<String, Long, Str.Data>> localService) {
        return new AsyncBiFunctionServiceGrpcLocalDispatcher(Collections.singleton(localService));
    }
}
