package com.cloudera.enterprise.alertpublisher.component;

import com.cloudera.cmf.event.Event;
import com.cloudera.cmf.event.EventAttribute;
import com.cloudera.cmf.event.EventUtil;
import com.cloudera.cmf.event.SimpleEvent;
import com.cloudera.cmf.event.SystemTag;
import com.cloudera.cmf.event.query.EventStoreQueryAPI;
import com.cloudera.cmf.event.query.QueryBuilder;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;

@ManagedResource(description = "Cloudera - Event Store Consumer")
/* loaded from: input_file:com/cloudera/enterprise/alertpublisher/component/EventStoreConsumer.class */
public class EventStoreConsumer extends ScheduledPollConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(EventStoreConsumer.class);
    private final EventStoreQueryAPI eventStore;
    public static final String EVENT_SOURCE_HEADER = "event.source";
    private long rangeStartTimestamp;
    private final QueryBuilder queryBuilder;
    private final EventStoreEndpoint endpoint;
    private final AtomicLong eventsCollected;
    private final AtomicLong alertsCollected;
    private final AtomicLong eventExceptions;
    public static final long JITTER_MINUTES = 10;

    public EventStoreConsumer(EventStoreEndpoint eventStoreEndpoint, Processor processor) {
        super(eventStoreEndpoint, processor);
        this.eventsCollected = new AtomicLong();
        this.alertsCollected = new AtomicLong();
        this.eventExceptions = new AtomicLong();
        this.eventStore = eventStoreEndpoint.getEventStoreQueryAPI();
        this.rangeStartTimestamp = Instant.now().minus(Duration.standardMinutes(10L)).getMillis();
        this.queryBuilder = new QueryBuilder();
        if (eventStoreEndpoint.getAlertsOnly()) {
            this.queryBuilder.addTermClause(EventAttribute.ALERT, Boolean.toString(true), QueryBuilder.Operator.AND);
        }
        this.endpoint = eventStoreEndpoint;
    }

    private Event createTestEvent(boolean z) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(SystemTag.UUID.getTagName(), Arrays.asList(UUID.randomUUID().toString()));
        newHashMap.put(EventAttribute.ALERT.toString(), Arrays.asList(Boolean.toString(z)));
        newHashMap.put("testSingleAttr", Arrays.asList("Single Attribute"));
        newHashMap.put("testMultiAttrs", Arrays.asList("Attribute 1", "Attribute 2"));
        return new SimpleEvent("Test " + (z ? "Alert" : "Event"), new Date(), newHashMap);
    }

    @ManagedAttribute(description = "Send a test event")
    public boolean getTestEventSuccess() {
        return sendEvent(createTestEvent(false));
    }

    @ManagedAttribute(description = "Send a test alert")
    public boolean getTestAlertSuccess() {
        return sendEvent(createTestEvent(true));
    }

    private boolean sendEvent(Event event) {
        Exchange createExchange = getEndpoint().createExchange();
        Message in = createExchange.getIn();
        in.setHeader(EVENT_SOURCE_HEADER, this.endpoint.getURI().toString());
        in.setBody(event);
        this.eventsCollected.addAndGet(1L);
        if (EventUtil.isAlert(event)) {
            this.alertsCollected.addAndGet(1L);
        }
        try {
            getProcessor().process(createExchange);
            return true;
        } catch (Exception e) {
            getExceptionHandler().handleException(e);
            this.eventExceptions.addAndGet(1L);
            return false;
        }
    }

    @ManagedAttribute(description = "Events consumed from Event Store")
    public long getEventsCollected() {
        return this.eventsCollected.get();
    }

    @ManagedAttribute(description = "Alerts consumed from Event Store")
    public long getAlertsCollected() {
        return this.alertsCollected.get();
    }

    @ManagedAttribute(description = "Number of exceptions forwarding events")
    public long getEventExceptions() {
        return this.eventExceptions.get();
    }

    @ManagedAttribute(description = "Current start timestamp for range query")
    public Date getPersistTimestamp() {
        return new Date(this.rangeStartTimestamp);
    }

    static long extractPersistTimestampMillis(Event event) {
        List list;
        if (event == null || (list = (List) event.getAttributes().get(SystemTag.PERSIST_TIMESTAMP.getTagName())) == null || list.size() != 1) {
            return 0L;
        }
        return Long.parseLong((String) list.get(0));
    }

    protected int poll() throws Exception {
        this.queryBuilder.setPersistInterval(this.rangeStartTimestamp, Instant.now().plus(Duration.standardMinutes(10L)).getMillis());
        LOG.debug("QueryBuilder's internal lucene query: " + this.queryBuilder.build().getLuceneQuery().toString());
        LOG.debug("QueryBuilder's persist interval: " + this.rangeStartTimestamp + " to 9223372036854775807");
        List<Event> events = this.eventStore.doQuery(this.queryBuilder.build()).getEvents();
        if (events == null) {
            return 0;
        }
        for (Event event : events) {
            long extractPersistTimestampMillis = extractPersistTimestampMillis(event);
            if (extractPersistTimestampMillis > this.rangeStartTimestamp) {
                this.rangeStartTimestamp = extractPersistTimestampMillis;
            }
            sendEvent(event);
        }
        return events.size();
    }
}
