Hippo Event Bus - BloomReach Experience - Open Source CMS

This article covers a Hippo CMS version 10. There's an updated version available that covers our most recent release.

Hippo Event Bus

Introduction

The Hippo Event Bus is a cross-application local VM (i.e. non-clustered) event bus. It allows any web application to post and subscribe to events.

For guaranteed delivery of HippEvents across all cluster nodes use the BroadcastModule and a dedicated PersistedWorkflowEventListener.

Example Workflow Listener

Registering an event listener is quite straightforward. The Hippo Event Bus uses the whiteboard pattern for listener registration. This pattern decouples the lifecycles of the applications that host the listener and the event bus, so it does not matter which application is started first.

Given a listener:

package com.example;
 
import org.onehippo.cms7.event.HippoEvent;
import org.onehippo.cms7.event.HippoEventConstants;
import org.onehippo.cms7.services.eventbus.Subscribe;
import org.onehippo.repository.events.HippoWorkflowEvent;
 
public class MyListener {

  @Subscribe 
  public void handleEvent(HippoEvent event) {
    if (HippoEventConstants.CATEGORY_WORKFLOW.equals(event.category())) {
      HippoWorkflowEvent workflowEvent = new HippoWorkflowEvent(event);
      if ("publish".equals(workflowEvent.action())) {
        System.out.println("document " + workflowEvent.subjectId() +
                                                         " was published");
      }
    }
  }

}

Note that the method name and argument type are not pre-determined. So it is also possible to publish completely custom events.  The event bus will make sure that only events of the appropriate class are received.

It is also easy to write and use your own custom or extended HippoEvent wrapper like for example:

public final class DocumentWorkflowEvent extends HippoWorkflowEvent<DocumentWorkflowEvent> {

    private static final String WORKFLOW_EVENT_TYPE = "event-type";
    private static final String WORKFLOW_NAME = "handle";

    private static final String DEPUBLISH = "depublish";
    private static final String PUBLISH = "publish";
    private static final String COMMIT_EDITABLE_INSTANCE = "commitEditableInstance";

    public enum Type {
        UNKNOWN,
        /** when a document gets published. */
        PUBLISHED,
        /** when a document gets depublished. */
        UNPUBLISHED,
        /** when a document gets updated. */
        CHANGED
    }

    public ReviewedActionsWorkflowEvent(final HippoEvent<?> event) {
        super(event);
        final String eventMethod = methodName();
        if (PUBLISH.equals(eventMethod)) {
            type(Type.PUBLISHED);
        } else if (DEPUBLISH.equals(eventMethod)) {
            type(Type.UNPUBLISHED);
        } else if (COMMIT_EDITABLE_INSTANCE.equals(eventMethod)) {
            type(Type.CHANGED);
        } else {
            type(Type.UNKNOWN);
        }
    }

    public Type type() {
        return get(WORKFLOW_EVENT_TYPE);
    }

    private ReviewedActionsWorkflowEvent type(Type type) {
        return set(WORKFLOW_EVENT_TYPE, type);
    }
}

And then use this DocumentWorkflowEvent instead like:

public class MyListener {

  @Subscribe 
  public void handleEvent(HippoEvent event) {
    if (HippoEventConstants.CATEGORY_WORKFLOW.equals(event.category()) &&
          DocumentWorkflowEvent.WORKFLOW_NAME.equals(event.workflowName())) {
      DocumentWorkflowEvent workflowEvent = new DocumentWorkflowEvent(event);
      if (DocumentWorkflowEvent.Type.PUBLISHED == workflowEvent.type())) {
        System.out.println("document " + workflowEvent.handleUuid() + " was published");
      }
    }
  }
}

Such a listener then can be registered as follows:

package com.example;
 
import org.onehippo.cms7.services.HippoServiceRegistry;
import org.onehippo.cms7.services.eventbus.HippoEventBus;
 
public class MyComponent {

  private MyListener listener;

  public void init() {
    listener = new MyListener();
    HippoServiceRegistry.registerService(listener, HippoEventBus.class); 
  }

  public void destroy() {
    HippoServiceRegistry.unregisterService(listener, HippoEventBus.class); 
  }
} 

The listener is registered as a whiteboard service at the HippoEventBus interface. The event bus implementation will retrieve all listeners from the service registry when posting an event. Because the listener is registered in the service registry it is not necessary to check whether an event bus implementation has been registered already.

Note that this listener will only receive "local" events, i.e. events that were posted in the same JVM / container.  It will not receive events that are posted on other cluster nodes, for example.

Persisted Workflow Events

Hippo Repository by default logs Hippo events to the repository itself. It is possible to register listeners for these persisted events. Such listeners will receive all Hippo events, including actions that are executed on other cluster nodes and actions that were executed when the listener was not registered.

A listener for these events is very similar to the one above:

package com.example;

import org.onehippo.cms7.event.HippoEvent;
import org.onehippo.cms7.event.HippoEventConstants;
import org.onehippo.repository.events.PersistedHippoEventListener;

public class PublishedWorkflowEventListener implements PersistedHippoEventListener {

    @Override
    public String getEventCategory() {
        return HippoEventConstants.CATEGORY_WORKFLOW;
    }

    @Override
    public String getChannelName() {
        return "my-publication-listener";
    }

    @Override
    public boolean onlyNewEvents() {
        return true;
    }

    @Override
    public void onHippoEvent(final HippoEvent event) {
        if ("publish".equals(event.get("methodName"))) {
            System.out.println("document " + event.get("subjectId") + " was published");
        }
    }
}

The listener's channel name is used to identify the listener after a restart, and it should be unique in the container (i.e. no listener in the same cluster node instance should use the same channel name, for instance). The event category limits the events that are passed to the listener to those within the required category.

Registration of this listener should be done as follows:

import org.onehippo.repository.events.PersistedHippoEventsService;

 {
   HippoServiceRegistry.registerService(myPublishedWorkflowEventListener,
       PersistedHippoEventsService.class);
 }

Persisted Event Dispatch Configuration

Events happening while the cluster node is offline are delivered locally when the it comes online again. 

The repository stores the timestamp of the last processed event for each combination (repository-cluster-id, persisted-listener-channel-name).  In principle, all events with a greater timestamp will be dispatched to the listener. In practice, there are some bounds that ensure that the system doesn't end up spending all its time dispatching events.

It is possible to tune the broadcasting mechanism that's used for dispatching the persisted events.  The node

/hippo:configuration/hippo:modules/broadcast/hippo:moduleconfig

contains a number of parameters for configuring the broadcast module (defaults are between brackets). 

  • pollingTime (5000)  
    The interval, in milliseconds, between polls for updates.  Every poll will execute a query for recent events. 
  • queryLimit  (500) 
    The maximum number of events to retrieve and deliver at a time. 
  • maxEventAge  (24) 
    The maximum age, in hours, of events to publish.