package io.camunda.zeebe.broker.exporter.stream;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterPositionsDistributionTest.class */
public class ExporterPositionsDistributionTest {
    private ExporterPositionsDistributionService exporterPositionsDistributionService;
    private Map<String, Long> exporterPositions;
    private SimplePartitionMessageService partitionMessagingService;

    @Before
    public void setup() {
        this.exporterPositions = new HashMap();
        this.partitionMessagingService = new SimplePartitionMessageService();
        Map<String, Long> map = this.exporterPositions;
        Objects.requireNonNull(map);
        this.exporterPositionsDistributionService = new ExporterPositionsDistributionService((v1, v2) -> {
            r3.put(v1, v2);
        }, this.partitionMessagingService, "topic");
    }

    @Test
    public void shouldSubscribeForGivenTopic() {
        this.exporterPositionsDistributionService.subscribeForExporterPositions((v0) -> {
            v0.run();
        });
        Assertions.assertThat(this.partitionMessagingService.consumers).containsKey("topic");
    }

    @Test
    public void shouldConsumeExporterMessage() {
        ExporterPositionsMessage exporterPositionsMessage = new ExporterPositionsMessage();
        exporterPositionsMessage.putExporter("elastic", 123L);
        exporterPositionsMessage.putExporter("metric", 345L);
        this.exporterPositionsDistributionService.subscribeForExporterPositions((v0) -> {
            v0.run();
        });
        this.exporterPositionsDistributionService.distributeExporterPositions(exporterPositionsMessage);
        Assertions.assertThat(this.exporterPositions).containsEntry("elastic", 123L).containsEntry("metric", 345L);
    }

    @Test
    public void shouldRemoveSubscriptionOnClose() throws Exception {
        ExporterPositionsMessage exporterPositionsMessage = new ExporterPositionsMessage();
        exporterPositionsMessage.putExporter("elastic", 123L);
        exporterPositionsMessage.putExporter("metric", 345L);
        this.exporterPositionsDistributionService.subscribeForExporterPositions((v0) -> {
            v0.run();
        });
        this.exporterPositionsDistributionService.close();
        Assertions.assertThat(this.partitionMessagingService.consumers).isEmpty();
    }
}
