package io.confluent.monitoring.clients.interceptor;

import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.monitoring.common.TimeBucket;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.ProtoSerde;
import io.confluent.serializers.UberSerde;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.AssertionFailedError;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({MonitoringInterceptor.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:io/confluent/monitoring/clients/interceptor/MonitoringInterceptorTest.class */
public class MonitoringInterceptorTest {
    private static final long PUBLISH_PERIOD = 500;
    private static final String CLIENT_ID = "Test Client";
    private static final String CLUSTER_ID = "Test Cluster";
    private static final String GROUP_ID = "Test Group";
    private KafkaProducer<byte[], byte[]> monitoringProducer;
    private MockClock mockClock;
    private MonitoringInterceptor interceptor;
    private static final long SAMPLE_PERIOD = TimeBucket.SIZE;
    private static final Monitoring.ClientType CLIENT_TYPE = Monitoring.ClientType.CONSUMER;
    private final TopicPartition topicPartition = new TopicPartition("test", 0);
    private final Map<String, Object> configs = new HashMap();
    private List<ProducerRecord<String, byte[]>> sent = new CopyOnWriteArrayList();
    private UberSerde<Monitoring.MonitoringMessage> monitoringMessageUberSerde = new ProtoSerde(Monitoring.MonitoringMessage.getDefaultInstance());

    @Before
    public void setUp() throws Exception {
        this.configs.put("client.id", CLIENT_ID);
        this.configs.put("bootstrap.servers", "bootstrap");
        this.monitoringProducer = (KafkaProducer) EasyMock.createMock(KafkaProducer.class);
        this.mockClock = new MockClock();
    }

    private MonitoringInterceptor createMonitoringInterceptor() {
        return createMonitoringInterceptor(this.configs);
    }

    private MonitoringInterceptor createMonitoringInterceptor(Map<String, ?> map) {
        String obj = map.get("client.id").toString();
        return new MonitoringInterceptor(MonitoringInterceptor.getBaseMonitoringMessage(CLIENT_TYPE, CLUSTER_ID, GROUP_ID, obj), this.monitoringProducer, this.monitoringMessageUberSerde, "_confluent-monitoring", SAMPLE_PERIOD, PUBLISH_PERIOD, 10, obj, this.mockClock);
    }

    @Test(expected = ConfigException.class)
    public void testCreateWithoutClientIdFails() {
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = MonitoringInterceptor.create(CLIENT_TYPE, CLUSTER_ID, GROUP_ID, new HashMap(), this.mockClock);
    }

    @Test
    public void testPreferInterceptorClientIdConfig() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", "other-client");
        hashMap.put("bootstrap.servers", "bootstraps");
        this.interceptor = createMonitoringInterceptor(hashMap);
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 100, 10L, 100L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Iterator<ProducerRecord<String, byte[]>> it = this.sent.iterator();
        while (it.hasNext()) {
            Assert.assertEquals("Invalid client ID in published monitoring message", "other-client", ((Monitoring.MonitoringMessage) this.monitoringMessageUberSerde.deserialize((byte[]) it.next().value())).getClientId());
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testRecordBeforeStartFails() {
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.recordMessageMetric("test", 0, 0L, 0, 0L, 0L);
    }

    @Test(expected = IllegalStateException.class)
    public void testStartAfterShutdownFails() {
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.shutdown();
        this.interceptor.start();
    }

    @Test
    public void testPartitioningOfMetricData() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false, 10);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), SAMPLE_PERIOD, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 3 * SAMPLE_PERIOD, 10, 10L, 10L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Assert.assertEquals(3L, this.sent.size());
        Iterator<ProducerRecord<String, byte[]>> it = this.sent.iterator();
        while (it.hasNext()) {
            Assert.assertEquals((Object) null, it.next().partition());
        }
    }

    @Test
    public void testRecordAfterStartPublishesMetrics() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 100, 10L, 100L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        verifyPublishedMetricsSameTopicPartition(this.topicPartition, 1L, true);
    }

    @Test
    public void testStartShutdownWithoutRecordDoesNotPublishToMonitoringTopic() {
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Assert.assertEquals(0L, this.sent.size());
    }

    @Test
    public void testPublishHeartbeatMessagesForKnownTopicPartition() throws InvalidProtocolBufferException, InterruptedException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), SAMPLE_PERIOD, 10, 10L, 10L);
        Thread.sleep(1500L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Assert.assertTrue(this.sent.size() >= 3);
        verifyPublishedMetricsSameTopicPartition(this.topicPartition, 1L, true);
    }

    @Test(expected = IllegalStateException.class)
    public void testRecordAfterShutdownFails() {
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.shutdown();
        this.interceptor.recordMessageMetric("test", 0, 0L, 100, 10L, 100L);
    }

    @Test
    public void testRecordToInvalidTimestampDoesNotRecordMetric() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.mockClock.setCurrentTimeMillis(1496896413000L);
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric("test1", 0, -1L, 100, 10L, 100L);
        this.interceptor.recordMessageMetric("test2", 0, -1L, 100, 10L, 100L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("test1", 0), 0L);
        hashMap.put(new TopicPartition("test2", 0), 0L);
        verifyPublishedMetrics(hashMap, true);
        Assert.assertEquals(0L, ((AtomicInteger) this.interceptor.invalid.get("test1")).get());
        Assert.assertEquals(1L, ((AtomicInteger) this.interceptor.invalid.get("test2")).get());
    }

    @Test
    public void testRecordMetricWithInvalidLatencyRecordsMetric() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 100, 10L, -10L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        verifyPublishedMetricsSameTopicPartition(this.topicPartition, 1L, true);
    }

    @Test
    public void testRecordMetricsWithInvalidBytesRecordsMetric() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, -10, 10L, 10L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        verifyPublishedMetricsSameTopicPartition(this.topicPartition, 1L, true);
    }

    @Test
    public void testRecordMultipleWindowsPublishesMultipleMonitoringMessages() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), SAMPLE_PERIOD, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 3 * SAMPLE_PERIOD, 10, 10L, 10L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Assert.assertEquals(3L, this.sent.size());
        verifyPublishedMetricsSameTopicPartition(this.topicPartition, 3L, true);
    }

    @Test
    public void testRecordMultiplePartitionsPublishesMultipleMonitoringMessages() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric("test", 1, 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric("test", 2, 0L, 10, 10L, 10L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Assert.assertEquals(3L, this.sent.size());
        HashMap hashMap = new HashMap();
        hashMap.put(this.topicPartition, 1L);
        hashMap.put(new TopicPartition("test", 1), 1L);
        hashMap.put(new TopicPartition("test", 2), 1L);
        verifyPublishedMetrics(hashMap, true);
    }

    @Test
    public void testRecordMultipleTopicsPublishesMultipleMonitoringMessages() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric("test1", 0, 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric("test2", 0, 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric("test3", 0, 0L, 10, 10L, 10L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Assert.assertEquals(3L, this.sent.size());
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("test1", 0), 1L);
        hashMap.put(new TopicPartition("test2", 0), 1L);
        hashMap.put(new TopicPartition("test3", 0), 1L);
        verifyPublishedMetrics(hashMap, true);
    }

    @Test
    public void testRecordSameTopicPartitionWindowPublishesOneMonitoringMessages() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 1L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 2L, 10, 10L, 10L);
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Assert.assertEquals(1L, this.sent.size());
        verifyPublishedMetricsSameTopicPartition(this.topicPartition, 3L, true);
    }

    @Test
    public void testPublishBeforeShutdownMultipleWindows() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        this.monitoringProducer.close();
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 100 * SAMPLE_PERIOD, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 2 * SAMPLE_PERIOD, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 10 * SAMPLE_PERIOD, 10, 10L, 10L);
        try {
            Thread.sleep(1000L);
            EasyMock.verify(new Object[]{this.monitoringProducer});
            Assert.assertTrue(this.sent.size() >= 4);
            verifyPublishedMetricsSameTopicPartition(this.topicPartition, 4L, false);
            this.interceptor.shutdown();
        } catch (InterruptedException e) {
            throw new AssertionFailedError("Did not finishing waiting for publish");
        }
    }

    @Test
    public void testProducerSendFailureDoesNotStopPublishing() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(true);
        expectMonitoringProducerClose();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        for (int i = 0; i < 10; i++) {
            try {
                this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), i * SAMPLE_PERIOD, 10, 10L, 10L);
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new AssertionFailedError("Did not finish waiting for publish");
            }
        }
        this.interceptor.shutdown();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        Assert.assertTrue(10 <= this.sent.size());
        verifyPublishedMetricsSameTopicPartition(this.topicPartition, new Long(10L), true);
    }

    @Test
    public void testHeartbeatMessage() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        this.monitoringProducer.close();
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.interceptor.start();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        try {
            Thread.sleep(1500L);
            EasyMock.verify(new Object[]{this.monitoringProducer});
            Assert.assertTrue(this.sent.size() >= 2);
            Monitoring.MonitoringMessage monitoringMessage = (Monitoring.MonitoringMessage) this.monitoringMessageUberSerde.deserialize((byte[]) this.sent.get(1).value());
            Assert.assertEquals(Monitoring.MessageType.HEARTBEAT, monitoringMessage.getType());
            Assert.assertEquals(0L, monitoringMessage.getWindow());
            Assert.assertTrue(monitoringMessage.getSamplePeriod() > 0);
            Assert.assertEquals(0L, monitoringMessage.getCount());
            this.interceptor.shutdown();
        } catch (InterruptedException e) {
            throw new AssertionFailedError("Did not finishing waiting for publish");
        }
    }

    @Test
    public void testMultipleSessions() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.mockClock.setCurrentTimeMillis(0L);
        this.interceptor.publishing = true;
        this.interceptor.startNewSession();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.publishMetrics();
        Assert.assertEquals(this.interceptor.sessionExpireTime(), TimeBucket.MAX_SESSION_DURATION);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), SAMPLE_PERIOD, 10, 10L, 10L);
        this.mockClock.setCurrentTimeMillis(TimeBucket.MAX_SESSION_DURATION);
        this.interceptor.publishMetrics();
        Assert.assertEquals(this.interceptor.sessionExpireTime(), TimeBucket.MAX_SESSION_DURATION * 2);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), SAMPLE_PERIOD, 10, 10L, 10L);
        this.mockClock.setCurrentTimeMillis(TimeBucket.MAX_SESSION_DURATION * 2);
        this.interceptor.publishMetrics();
        Assert.assertEquals(this.interceptor.sessionExpireTime(), TimeBucket.MAX_SESSION_DURATION * 3);
        this.interceptor.publishMetrics();
        this.mockClock.setCurrentTimeMillis(TimeBucket.MAX_SESSION_DURATION * 3);
        this.interceptor.publishMetrics();
        Assert.assertEquals(this.interceptor.sessionExpireTime(), TimeBucket.MAX_SESSION_DURATION * 4);
        EasyMock.verify(new Object[]{this.monitoringProducer});
        ArrayList arrayList = new ArrayList();
        arrayList.add(ImmutableMap.of(this.topicPartition, 3L));
        arrayList.add(ImmutableMap.of(this.topicPartition, 2L));
        arrayList.add(ImmutableMap.of(this.topicPartition, 2L));
        verifyMessagesInSessions(arrayList, true);
    }

    @Test
    public void testMultipleTopicPartitionsInSessions() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.mockClock.setCurrentTimeMillis(0L);
        this.interceptor.publishing = true;
        this.interceptor.startNewSession();
        Assert.assertEquals(this.interceptor.sessionExpireTime(), TimeBucket.MAX_SESSION_DURATION);
        this.interceptor.recordMessageMetric("test1", 1, 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric("test2", 1, 0L, 10, 10L, 10L);
        this.interceptor.publishMetrics();
        this.mockClock.setCurrentTimeMillis(TimeBucket.MAX_SESSION_DURATION);
        this.interceptor.publishMetrics();
        Assert.assertEquals(this.interceptor.sessionExpireTime(), TimeBucket.MAX_SESSION_DURATION * 2);
        this.interceptor.publishMetrics();
        EasyMock.verify(new Object[]{this.monitoringProducer});
        ArrayList arrayList = new ArrayList();
        arrayList.add(ImmutableMap.of(new TopicPartition("test1", 1), 2L, new TopicPartition("test2", 1), 2L));
        arrayList.add(ImmutableMap.of(new TopicPartition("test1", 1), 1L, new TopicPartition("test2", 1), 1L));
        verifyMessagesInSessions(arrayList, false);
    }

    @Test
    public void testNoSessionExpire() throws InvalidProtocolBufferException {
        expectMonitoringProducerSendMetrics(false);
        EasyMock.replay(new Object[]{this.monitoringProducer});
        this.interceptor = createMonitoringInterceptor();
        this.mockClock.setCurrentTimeMillis(0L);
        this.interceptor.publishing = true;
        this.interceptor.startNewSession();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.mockClock.setCurrentTimeMillis(1000L);
        this.interceptor.publishMetrics();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.interceptor.recordMessageMetric("test2", 1, 0L, 10, 10L, 10L);
        this.mockClock.setCurrentTimeMillis(2000L);
        this.interceptor.publishMetrics();
        this.interceptor.recordMessageMetric(this.topicPartition.topic(), this.topicPartition.partition(), 0L, 10, 10L, 10L);
        this.mockClock.setCurrentTimeMillis(3000L);
        this.interceptor.publishMetrics();
        Assert.assertEquals(this.interceptor.sessionExpireTime(), TimeBucket.MAX_SESSION_DURATION);
        EasyMock.verify(new Object[]{this.monitoringProducer});
        ArrayList arrayList = new ArrayList();
        arrayList.add(ImmutableMap.of(this.topicPartition, 3L, new TopicPartition("test2", 1), 2L));
        verifyMessagesInSessions(arrayList, false);
    }

    private void verifyMessagesInSessions(List<Map<TopicPartition, Long>> list, boolean z) {
        String str = null;
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = null;
        HashMap hashMap2 = null;
        Iterator<ProducerRecord<String, byte[]>> it = this.sent.iterator();
        while (it.hasNext()) {
            Monitoring.MonitoringMessage monitoringMessage = (Monitoring.MonitoringMessage) this.monitoringMessageUberSerde.deserialize((byte[]) it.next().value());
            TopicPartition topicPartition = new TopicPartition(monitoringMessage.getTopic(), monitoringMessage.getPartition());
            Assert.assertNotNull("Session is null", monitoringMessage.getSession());
            if (str == null || !str.equals(monitoringMessage.getSession())) {
                str = monitoringMessage.getSession();
                Assert.assertTrue("Duplicated session", hashSet.add(str));
                hashMap = new HashMap();
                hashMap2 = new HashMap();
                arrayList.add(hashMap);
                arrayList2.add(hashMap2);
            }
            Set set = (Set) hashMap.get(topicPartition);
            if (set == null) {
                set = new HashSet();
                hashMap.put(topicPartition, set);
            }
            Assert.assertTrue("Sequence already received", set.add(Long.valueOf(monitoringMessage.getSequence())));
            if (monitoringMessage.getShutdown()) {
                hashMap2.put(topicPartition, Long.valueOf(monitoringMessage.getSequence()));
            }
        }
        Assert.assertTrue("Sessions mismatch", arrayList.size() == list.size() && arrayList2.size() == list.size());
        Iterator it2 = arrayList.iterator();
        Iterator<Map<TopicPartition, Long>> it3 = list.iterator();
        Iterator it4 = arrayList2.iterator();
        while (it3.hasNext()) {
            Map<TopicPartition, Long> next = it3.next();
            Map map = (Map) it4.next();
            Map map2 = (Map) it2.next();
            Assert.assertEquals("Topic partition mismatch", next.size(), map2.size());
            for (Map.Entry<TopicPartition, Long> entry : next.entrySet()) {
                Assert.assertTrue("Topic partition missing", map2.containsKey(entry.getKey()));
                long longValue = entry.getValue().longValue();
                Set set2 = (Set) map2.get(entry.getKey());
                Assert.assertEquals("Sequence number mismatch", longValue, set2.size());
                long j = 0;
                while (true) {
                    long j2 = j;
                    if (j2 >= set2.size()) {
                        break;
                    }
                    Assert.assertTrue(String.format("Missing sequence number %d", Long.valueOf(j2)), set2.contains(Long.valueOf(j2)));
                    j = j2 + 1;
                }
                if (it3.hasNext() || z) {
                    Assert.assertTrue("Topic-partition is missing shutdown flag", map.containsKey(entry.getKey()));
                    Assert.assertEquals("Shutdown flag is set with unexpected sequence number.", longValue - 1, ((Long) map.get(entry.getKey())).longValue());
                } else {
                    Assert.assertFalse("Topic-partition has unexpected shutdown flag", map.containsKey(entry.getKey()));
                }
            }
        }
    }

    private void verifyPublishedMetricsSameTopicPartition(TopicPartition topicPartition, Long l, boolean z) throws InvalidProtocolBufferException {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, l);
        verifyPublishedMetrics(hashMap, z);
    }

    private void verifyPublishedMetrics(Map<TopicPartition, Long> map, boolean z) throws InvalidProtocolBufferException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        Iterator<ProducerRecord<String, byte[]>> it = this.sent.iterator();
        while (it.hasNext()) {
            Monitoring.MonitoringMessage monitoringMessage = (Monitoring.MonitoringMessage) this.monitoringMessageUberSerde.deserialize((byte[]) it.next().value());
            Assert.assertNotNull(monitoringMessage.getClientId());
            Assert.assertEquals(CLIENT_ID, monitoringMessage.getClientId());
            Assert.assertNotNull(monitoringMessage.getGroup());
            Assert.assertEquals(GROUP_ID, monitoringMessage.getGroup());
            Assert.assertNotNull(monitoringMessage.getClusterId());
            Assert.assertEquals(CLUSTER_ID, monitoringMessage.getClusterId());
            Assert.assertTrue(monitoringMessage.getWindow() >= 0);
            Assert.assertTrue(monitoringMessage.getSamplePeriod() > 0);
            if (monitoringMessage.getType() == Monitoring.MessageType.HEARTBEAT) {
                Assert.assertEquals(0L, monitoringMessage.getCount());
                Assert.assertEquals(0L, monitoringMessage.getAggregateBytes());
            }
            Assert.assertTrue(monitoringMessage.getType() != Monitoring.MessageType.ERROR);
            TopicPartition topicPartition = new TopicPartition(monitoringMessage.getTopic(), monitoringMessage.getPartition());
            Long l = (Long) hashMap.get(topicPartition);
            Set set = (Set) hashMap2.get(topicPartition);
            if (l == null) {
                l = new Long(0L);
                Assert.assertNull(set);
                set = new HashSet();
                hashMap2.put(topicPartition, set);
            }
            hashMap.put(topicPartition, Long.valueOf(l.longValue() + monitoringMessage.getCount()));
            Assert.assertFalse(String.format("Repeating sequence number %d", Long.valueOf(monitoringMessage.getSequence())), set.contains(Long.valueOf(monitoringMessage.getSequence())));
            set.add(Long.valueOf(monitoringMessage.getSequence()));
            if (z && monitoringMessage.getShutdown()) {
                Assert.assertFalse(String.format("Got more than one monitoring message with shutdown flag set for topic %s, partition %d.", monitoringMessage.getTopic(), Integer.valueOf(monitoringMessage.getPartition())), hashMap3.containsKey(topicPartition));
                hashMap3.put(topicPartition, Long.valueOf(monitoringMessage.getSequence()));
            }
        }
        Assert.assertEquals(map, hashMap);
        for (Map.Entry entry : hashMap2.entrySet()) {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= ((Set) entry.getValue()).size()) {
                    break;
                }
                Assert.assertTrue(String.format("Missing sequence number %d", Long.valueOf(j2)), ((Set) entry.getValue()).contains(Long.valueOf(j2)));
                j = j2 + 1;
            }
            if (z) {
                Assert.assertTrue("Topic-partition is missing an monitoring message with shutdown flag", hashMap3.containsKey(entry.getKey()));
                Assert.assertEquals("Shutdown flag is set in monitoring message with unexpected sequence number.", ((Set) entry.getValue()).size() - 1, ((Long) hashMap3.get(entry.getKey())).intValue());
            }
        }
    }

    private void expectMonitoringProducerSendMetrics(boolean z) {
        expectMonitoringProducerSendMetrics(z, 1);
    }

    private void expectMonitoringProducerSendMetrics(boolean z, int i) {
        expectPartitions(i);
        expectSend(z);
    }

    private void expectPartitions(final int i) {
        EasyMock.expect(this.monitoringProducer.partitionsFor((String) EasyMock.anyObject(String.class))).andAnswer(new IAnswer<List<PartitionInfo>>() { // from class: io.confluent.monitoring.clients.interceptor.MonitoringInterceptorTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public List<PartitionInfo> m1answer() throws Throwable {
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < i; i2++) {
                    arrayList.add(EasyMock.createMock(PartitionInfo.class));
                }
                return arrayList;
            }
        }).anyTimes();
    }

    private void expectSend(final boolean z) {
        EasyMock.expect(this.monitoringProducer.send((ProducerRecord) EasyMock.anyObject(ProducerRecord.class), (Callback) EasyMock.anyObject(Callback.class))).andAnswer(new IAnswer<Future<RecordMetadata>>() { // from class: io.confluent.monitoring.clients.interceptor.MonitoringInterceptorTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Future<RecordMetadata> m2answer() throws Throwable {
                ProducerRecord producerRecord = (ProducerRecord) EasyMock.getCurrentArguments()[0];
                ProduceRequestResult produceRequestResult = new ProduceRequestResult(new TopicPartition(producerRecord.topic(), 0));
                FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(produceRequestResult, 0L, 0L, 0L, 0, 0);
                MonitoringInterceptorTest.this.sent.add(producerRecord);
                if (MonitoringInterceptorTest.this.sent.size() == 1 && z) {
                    throw new IOException("Injected exception");
                }
                produceRequestResult.set(0L, -1L, (RuntimeException) null);
                produceRequestResult.done();
                return futureRecordMetadata;
            }
        }).anyTimes();
    }

    private void expectMonitoringProducerClose() {
        this.monitoringProducer.close();
        EasyMock.expectLastCall().once();
    }
}
