package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.exporter.stream.ExporterStateDistributeMessage;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterStateDistributionTest.class */
public class ExporterStateDistributionTest {
    private ExporterStateDistributionService exporterStateDistributionService;
    private Map<String, ExporterStateDistributeMessage.ExporterStateEntry> exporterState;
    private SimplePartitionMessageService partitionMessagingService;

    @Before
    public void setup() {
        this.exporterState = new HashMap();
        this.partitionMessagingService = new SimplePartitionMessageService();
        Map<String, ExporterStateDistributeMessage.ExporterStateEntry> map = this.exporterState;
        Objects.requireNonNull(map);
        this.exporterStateDistributionService = new ExporterStateDistributionService((v1, v2) -> {
            r3.put(v1, v2);
        }, this.partitionMessagingService, "topic");
    }

    @Test
    public void shouldSubscribeForGivenTopic() {
        this.exporterStateDistributionService.subscribeForExporterState((v0) -> {
            v0.run();
        });
        Assertions.assertThat(this.partitionMessagingService.consumers).containsKey("topic");
    }

    @Test
    public void shouldConsumeExporterMessage() {
        DirectBuffer wrapString = BufferUtil.wrapString("e1");
        DirectBuffer wrapString2 = BufferUtil.wrapString("e2");
        ExporterStateDistributeMessage exporterStateDistributeMessage = new ExporterStateDistributeMessage();
        exporterStateDistributeMessage.putExporter("elastic", 123L, wrapString);
        exporterStateDistributeMessage.putExporter("metric", 345L, wrapString2);
        this.exporterStateDistributionService.subscribeForExporterState((v0) -> {
            v0.run();
        });
        this.exporterStateDistributionService.distributeExporterState(exporterStateDistributeMessage);
        Assertions.assertThat(this.exporterState).containsEntry("elastic", new ExporterStateDistributeMessage.ExporterStateEntry(123L, wrapString)).containsEntry("metric", new ExporterStateDistributeMessage.ExporterStateEntry(345L, wrapString2));
    }

    @Test
    public void shouldRemoveSubscriptionOnClose() throws Exception {
        ExporterStateDistributeMessage exporterStateDistributeMessage = new ExporterStateDistributeMessage();
        exporterStateDistributeMessage.putExporter("elastic", 123L, new UnsafeBuffer());
        exporterStateDistributeMessage.putExporter("metric", 345L, new UnsafeBuffer());
        this.exporterStateDistributionService.subscribeForExporterState((v0) -> {
            v0.run();
        });
        this.exporterStateDistributionService.close();
        Assertions.assertThat(this.partitionMessagingService.consumers).isEmpty();
    }
}
