package com.cloudera.cmon.firehose;

import com.cloudera.cmon.MetricEnum;
import com.cloudera.cmon.MonitoringTypes;
import com.cloudera.cmon.firehose.event.AgentMessageServiceIPC;
import com.cloudera.cmon.firehose.event.AgentMessages;
import com.cloudera.cmon.firehose.event.AgentMsg;
import com.cloudera.cmon.firehose.event.HTableRecord;
import com.cloudera.cmon.firehose.event.HostRecord;
import com.cloudera.cmon.firehose.event.HostUpdate;
import com.cloudera.cmon.firehose.event.MetricValue;
import com.cloudera.cmon.firehose.event.MetricWriteRecord;
import com.cloudera.cmon.firehose.event.RoleRecord;
import com.cloudera.cmon.firehose.event.ServiceRecord;
import com.cloudera.cmon.firehose.event.TimeSeriesEntityRecord;
import com.cloudera.cmon.firehose.event.WriteMetricsRequest;
import com.cloudera.cmon.kaiser.KaiserTestBase;
import com.cloudera.cmon.tstore.TimeSeriesDataStore;
import com.cloudera.cmon.tstore.TimeSeriesEntityBuilder;
import com.cloudera.cmon.tstore.TimeSeriesMetadataStore;
import com.cloudera.enterprise.EnterpriseServiceException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:com/cloudera/cmon/firehose/AgentMessageServiceTest.class */
public class AgentMessageServiceTest extends KaiserTestBase {
    private TimeSeriesMetadataStore.TimeSeriesEntity hostEntity;
    private TimeSeriesMetadataStore.TimeSeriesEntity htableEntity;
    private TimeSeriesMetadataStore.TimeSeriesEntity regionserverEntity;
    private TimeSeriesMetadataStore.TimeSeriesEntity hbaseEntity;
    private TimeSeriesMetadataStore.TimeSeriesEntity rackEntity;

    @Before
    public void setUp() throws Exception {
        this.htableEntity = TimeSeriesEntityBuilder.getOrCreateHTable(this.tStore, "my-hbase", "my-ns", "my-ns:t1", false);
        this.hostEntity = TimeSeriesEntityBuilder.getOrCreateHost(this.tStore, "host", "hostname", TimeSeriesEntityBuilder.NO_RACK_ID, TimeSeriesEntityBuilder.NO_CLUSTER_ID);
        this.hbaseEntity = TimeSeriesEntityBuilder.getOrCreateService(this.tStore, "my-hbase", "my hbase", "HBASE", TimeSeriesEntityBuilder.NO_CLUSTER_ID);
        this.regionserverEntity = TimeSeriesEntityBuilder.getOrCreateRole(this.tStore, "regionserver-1", "my-hbase", "REGIONSERVER", "HBASE", "host", "hostname", "roleConfigGroup", TimeSeriesEntityBuilder.NO_RACK_ID);
        this.rackEntity = TimeSeriesEntityBuilder.getOrCreateRack(this.tStore, "rack1");
    }

    @Test
    public void testAgentMessageService() throws EnterpriseServiceException, IOException {
        int randomPort = ServiceHelper.getRandomPort();
        FirehosePipeline firehosePipeline = (FirehosePipeline) Mockito.mock(FirehosePipeline.class);
        ((FirehosePipeline) Mockito.doReturn(this.tStore).when(firehosePipeline)).getTimeSeriesStore();
        AgentMessageService agentMessageService = new AgentMessageService("localhost", randomPort, firehosePipeline);
        agentMessageService.start();
        AgentMessageServiceIPC agentMessageServiceIPC = (AgentMessageServiceIPC) SpecificRequestor.getClient(AgentMessageServiceIPC.class, new HttpTransceiver(new URL(String.format("http://localhost:%d", Integer.valueOf(randomPort)))));
        for (int i = 0; i < 5; i++) {
            agentMessageServiceIPC.sendAgentMessages(makeAgentMessages(2));
        }
        ((FirehosePipeline) Mockito.verify(firehosePipeline, Mockito.times(5 * 2))).receiveEvent((FhMessage) Mockito.any(FhMessage.class));
        Assert.assertEquals(5, agentMessageService.getNumAgentMessagesRPCsReceived());
        agentMessageService.stop();
    }

    private AgentMessages makeAgentMessages(int i) {
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(i);
        for (int i2 = 0; i2 < i; i2++) {
            newArrayListWithExpectedSize.add(AgentMsg.newBuilder().setActivityUpdates(new ArrayList(0)).setAttemptMetrics(new ArrayList(0)).setRoleMetrics(new ArrayList(0)).setServiceUpdates(new ArrayList(0)).setHostname("localhost").setHostId("localhost").setTsSecs(new Instant().getMillis() / 1000).setHostUpdate((HostUpdate) null).build());
        }
        return AgentMessages.newBuilder().setAgentMsgs(newArrayListWithExpectedSize).build();
    }

    @Test
    public void testWriteMetrics() {
        Instant now = Instant.now();
        MetricValue build = MetricValue.newBuilder().setId(Integer.valueOf(MetricEnum.ACTIVE_APPLICATIONS.getUniqueMetricId())).setValue(Double.valueOf(5.0d)).build();
        WriteMetricsRequest build2 = WriteMetricsRequest.newBuilder().setWriteRecords(ImmutableList.of(MetricWriteRecord.newBuilder().setEntityRecord(HostRecord.newBuilder().setHostId("host").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(RoleRecord.newBuilder().setRoleName("regionserver-1").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(ServiceRecord.newBuilder().setServiceName("my-hbase").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(HTableRecord.newBuilder().setTableName("my-ns:t1").setServiceName("my-hbase").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(TimeSeriesEntityRecord.newBuilder().setType(MonitoringTypes.RACK_ENTITY_TYPE.toString()).setName("rack1").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(TimeSeriesEntityRecord.newBuilder().setType(MonitoringTypes.CMSERVER_ENTITY_TYPE.toString()).setName("cloudera_manager_server").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build())).build();
        FirehosePipeline firehosePipeline = (FirehosePipeline) Mockito.mock(FirehosePipeline.class);
        ((FirehosePipeline) Mockito.doReturn(this.tStore).when(firehosePipeline)).getTimeSeriesStore();
        new AgentMessageServiceHandler(firehosePipeline).writeMetrics(build2);
        ImmutableList<TimeSeriesMetadataStore.TimeSeriesEntity> of = ImmutableList.of(this.regionserverEntity, this.hbaseEntity, this.rackEntity, this.hostEntity, this.htableEntity, this.cmServerEntity);
        Map read = this.tStore.read(of, now.minus(1000L), now.plus(1000L), ImmutableSet.of(MetricEnum.ACTIVE_APPLICATIONS));
        Assert.assertEquals(6L, read.size());
        for (TimeSeriesMetadataStore.TimeSeriesEntity timeSeriesEntity : of) {
            Assert.assertEquals(1L, ((TimeSeriesDataStore.ReadResult) read.get(timeSeriesEntity)).getResults().size());
            Assert.assertEquals(5.0d, ((TimeSeriesDataStore.DataPoint) ((List) ((TimeSeriesDataStore.ReadResult) read.get(timeSeriesEntity)).getResults().get(MetricEnum.ACTIVE_APPLICATIONS)).get(0)).getValue(), 1.0E-5d);
        }
        Map read2 = this.tStore.read(ImmutableList.of(this.cmServerEntity), now.minus(1000L), now.plus(1000L), ImmutableSet.of(MetricEnum.CM_CLOCK_OFFSET_WITH_SMON));
        Assert.assertEquals(1L, read2.size());
        TimeSeriesDataStore.ReadResult readResult = (TimeSeriesDataStore.ReadResult) read2.get(this.cmServerEntity);
        Assert.assertEquals(1L, readResult.getResults().size());
        Assert.assertEquals(1L, ((List) readResult.getResults().get(MetricEnum.CM_CLOCK_OFFSET_WITH_SMON)).size());
    }

    @Test
    public void testInvalidMetricValues() {
        Instant now = Instant.now();
        WriteMetricsRequest build = WriteMetricsRequest.newBuilder().setWriteRecords(ImmutableList.of(MetricWriteRecord.newBuilder().setEntityRecord(HostRecord.newBuilder().setHostId("host").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(MetricValue.newBuilder().setId(Integer.valueOf(MetricEnum.ACTIVE_APPLICATIONS.getUniqueMetricId())).setValue(Double.valueOf(5.0d)).build(), MetricValue.newBuilder().setId(100000).setValue(Double.valueOf(5.0d)).build(), MetricValue.newBuilder().setId(Integer.valueOf(MetricEnum.ACTIVE_APPLICATIONS_CUMULATIVE.getUniqueMetricId())).setValue("string").build(), MetricValue.newBuilder().setId(Integer.valueOf(MetricEnum.RAW_SIZE.getUniqueMetricId())).setValue(5L).build())).build())).build();
        FirehosePipeline firehosePipeline = (FirehosePipeline) Mockito.mock(FirehosePipeline.class);
        ((FirehosePipeline) Mockito.doReturn(this.tStore).when(firehosePipeline)).getTimeSeriesStore();
        new AgentMessageServiceHandler(firehosePipeline).writeMetrics(build);
        Map read = this.tStore.read(ImmutableList.of(this.hostEntity), now.minus(1000L), now.plus(1000L), ImmutableSet.of(MetricEnum.ACTIVE_APPLICATIONS, MetricEnum.ACTIVE_APPLICATIONS_CUMULATIVE, MetricEnum.RAW_SIZE));
        Assert.assertEquals(1L, read.size());
        for (TimeSeriesDataStore.ReadResult readResult : read.values()) {
            Assert.assertEquals(3L, readResult.getResults().size());
            Assert.assertEquals(1L, ((List) readResult.getResults().get(MetricEnum.ACTIVE_APPLICATIONS)).size());
            Assert.assertEquals(0L, ((List) readResult.getResults().get(MetricEnum.ACTIVE_APPLICATIONS_CUMULATIVE)).size());
            Assert.assertEquals(1L, ((List) readResult.getResults().get(MetricEnum.RAW_SIZE)).size());
        }
    }

    @Test
    public void testDontCreateNewTsids() {
        Assert.assertEquals(6L, Lists.newArrayList(this.tStore.getTimeSeriesEntityStore().getAllEntities()).size());
        Instant now = Instant.now();
        MetricValue build = MetricValue.newBuilder().setId(Integer.valueOf(MetricEnum.ACTIVE_APPLICATIONS.getUniqueMetricId())).setValue(Double.valueOf(5.0d)).build();
        WriteMetricsRequest build2 = WriteMetricsRequest.newBuilder().setWriteRecords(ImmutableList.of(MetricWriteRecord.newBuilder().setEntityRecord(HostRecord.newBuilder().setHostId("unknownHost").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(RoleRecord.newBuilder().setRoleName("unknownRole").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(ServiceRecord.newBuilder().setServiceName("unknownService").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(HTableRecord.newBuilder().setTableName("unknownTable").setServiceName("unknownService").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(TimeSeriesEntityRecord.newBuilder().setType(MonitoringTypes.RACK_ENTITY_TYPE.toString()).setName("unknownTsid").build()).setTimestampMs(now.getMillis()).setMetricValues(ImmutableList.of(build)).build())).build();
        FirehosePipeline firehosePipeline = (FirehosePipeline) Mockito.mock(FirehosePipeline.class);
        ((FirehosePipeline) Mockito.doReturn(this.tStore).when(firehosePipeline)).getTimeSeriesStore();
        new AgentMessageServiceHandler(firehosePipeline).writeMetrics(build2);
        Assert.assertEquals(6L, Lists.newArrayList(this.tStore.getTimeSeriesEntityStore().getAllEntities()).size());
    }

    @Test
    public void testWriteOutsideAcceptanceWindow() {
        Duration acceptanceTimeWindowDuration = CMONConfiguration.getSingleton().getAcceptanceTimeWindowDuration();
        Instant minus = Instant.now().minus(acceptanceTimeWindowDuration).minus(acceptanceTimeWindowDuration);
        Instant plus = Instant.now().plus(acceptanceTimeWindowDuration).plus(acceptanceTimeWindowDuration);
        MetricValue build = MetricValue.newBuilder().setId(Integer.valueOf(MetricEnum.ACTIVE_APPLICATIONS.getUniqueMetricId())).setValue(Double.valueOf(5.0d)).build();
        WriteMetricsRequest build2 = WriteMetricsRequest.newBuilder().setWriteRecords(ImmutableList.of(MetricWriteRecord.newBuilder().setEntityRecord(TimeSeriesEntityRecord.newBuilder().setType(MonitoringTypes.RACK_ENTITY_TYPE.toString()).setName("rack1").build()).setTimestampMs(minus.getMillis()).setMetricValues(ImmutableList.of(build)).build(), MetricWriteRecord.newBuilder().setEntityRecord(TimeSeriesEntityRecord.newBuilder().setType(MonitoringTypes.RACK_ENTITY_TYPE.toString()).setName("rack1").build()).setTimestampMs(plus.getMillis()).setMetricValues(ImmutableList.of(build)).build())).build();
        FirehosePipeline firehosePipeline = (FirehosePipeline) Mockito.mock(FirehosePipeline.class);
        ((FirehosePipeline) Mockito.doReturn(this.tStore).when(firehosePipeline)).getTimeSeriesStore();
        AgentMessageServiceHandler agentMessageServiceHandler = new AgentMessageServiceHandler(firehosePipeline);
        agentMessageServiceHandler.writeMetrics(build2);
        Map read = this.tStore.read(ImmutableList.of(this.rackEntity), minus.minus(1000L), plus.plus(1000L), ImmutableSet.of(MetricEnum.ACTIVE_APPLICATIONS));
        Assert.assertEquals(1L, read.size());
        for (TimeSeriesDataStore.ReadResult readResult : read.values()) {
            Assert.assertEquals(1L, readResult.getResults().size());
            Assert.assertEquals(0L, ((List) readResult.getResults().get(MetricEnum.ACTIVE_APPLICATIONS)).size());
        }
        Assert.assertEquals(2L, agentMessageServiceHandler.getNumWriteRequestsOutsideAcceptanceWindow());
    }

    public static AgentMsg createEmptyAgentMessage(Instant instant) {
        return AgentMsg.newBuilder().setActivityUpdates(new ArrayList(0)).setAttemptMetrics(new ArrayList(0)).setRoleMetrics(new ArrayList(0)).setServiceUpdates(new ArrayList(0)).setHostname("localhost").setHostId("localhost").setTsSecs(instant.getMillis() / 1000).setHostUpdate((HostUpdate) null).build();
    }

    @Test
    public void testSendAgentMessagesOutsideTimeWindow() {
        FirehosePipeline firehosePipeline = (FirehosePipeline) Mockito.mock(FirehosePipeline.class);
        ((FirehosePipeline) Mockito.doReturn(this.tStore).when(firehosePipeline)).getTimeSeriesStore();
        AgentMessageServiceHandler agentMessageServiceHandler = new AgentMessageServiceHandler(firehosePipeline);
        Duration acceptanceTimeWindowDuration = CMONConfiguration.getSingleton().getAcceptanceTimeWindowDuration();
        Instant minus = Instant.now().minus(acceptanceTimeWindowDuration).minus(acceptanceTimeWindowDuration);
        Instant plus = Instant.now().plus(acceptanceTimeWindowDuration).plus(acceptanceTimeWindowDuration);
        agentMessageServiceHandler.sendAgentMessages(AgentMessages.newBuilder().setAgentMsgs(ImmutableList.of(createEmptyAgentMessage(minus), createEmptyAgentMessage(plus))).build());
        Assert.assertEquals(2L, agentMessageServiceHandler.getNumAgentMessagesOutsideAcceptanceWindow());
    }
}
