Using Tracking processors to replay events in Axon Framework 3

Replaying events is a crucial part in any event sourcing / cqrs application, to rebuild projections, generate new ones or seed external systems with data.

I’m a big fan of the Axon Framework. Even with its quirks and occasional (strange) bugs, it’s my go-to toolbox for my event sourcing & cqrs consulting and development work.

With the recent 3.0 release, Axon changed the way events can be replayed by introducing the Subscribing and Tracking event processors. The Subscribing processor follows the event stream in real-time, whereas the Tracking processor keeps track of events it has processed (using a token). This means that the Tracking processor can be stopped and resumed, and it will pick up processing where it left off.

In a recent project we frequently used both types of processors. To simplify switching from subscribing to tracking mode for (existing) projections, we added two classes: a TrackedProjection annotation and an accompanying Configuration that scans for beans that have the annotation applied.

package org.demo.projections;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface TrackedProjection {
}

The configuration takes care of any (Axon) ProcessingGroup annotations (that change the registered name of the processor), and then invokes registerTrackingProcessor on Axon’s EventHandlingConfiguration.

package org.demo.configuration;

import org.axonframework.config.EventHandlingConfiguration;
import org.axonframework.config.ProcessingGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.demo.projections.TrackedProjection;

import javax.annotation.PostConstruct;
import java.util.Optional;

@Configuration
public class ProjectionsConfiguration {
	@Autowired
	private EventHandlingConfiguration eventHandlingConfiguration;

	@PostConstruct
	public void startTrackingProjections() throws ClassNotFoundException {
		ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
		scanner.addIncludeFilter(new AnnotationTypeFilter(TrackedProjection.class));
		for (BeanDefinition bd : scanner.findCandidateComponents("org.demo")) {
			Class<?> aClass = Class.forName(bd.getBeanClassName());
			ProcessingGroup processingGroup = aClass.getAnnotation(ProcessingGroup.class);
			String name = Optional.ofNullable(processingGroup).map(ProcessingGroup::value).orElse(aClass.getPackage().getName());
			registerTrackingProcessor(name);
		}
	}

	private void registerTrackingProcessor(String name) {
		eventHandlingConfiguration.registerTrackingProcessor(name);
	}
}


These snippets assume Spring Boot (1.5.x) running on Java 8.

This code can be further improved by adding progress tracking for the running processors. These metrics can be exposed, through a REST endpoint, to indicate the time remaining for processors to “catch up” with the live event stream.

Let me know if this code is useful to you!

Michiel Rook

Michiel Rook is an experienced, passionate & pragmatic freelance coach, developer & speaker from the Netherlands. He loves helping teams and companies to develop better software and significantly improve their delivery process. When he’s not thinking about continuous deployment, DevOps or event sourcing he enjoys music, cars, sports and movies.

6 thoughts on “Using Tracking processors to replay events in Axon Framework 3

  • September 26, 2017 at 2:43 pm
    Permalink

    Hi Michiel,

    First, great article you’ve wrote! Congrats!

    I need some help about using this structure in my project, how can I inject or expose this processors into Rest for example, could you help me with some example code please?

    Thanks

  • September 28, 2017 at 3:17 pm
    Permalink

    Hi,

    This is an annotation for existing projections, those can be injected into your controllers like you normally do (with @Autowired or @Inject or whatever DI system you use).

  • October 11, 2017 at 4:29 pm
    Permalink

    Hi Michael,

    I found your article and was excited, since the title was promissing exactly what I was looking for. After testing out your code, I’m wondering what I got wrong.

    If I undersood your apporach, you are trying to use the switching of tracking processors using the annotation-based processing instead of a usage of imperative invocation during setup or the general EventHandlingConfiguration.usingTrackingProcessors(). This perfectly makes sense, but I think there are some problems with it.

    First of all, you code only works during initialization, since AxonFramework will not re-evaluate the registration after the AxonConfiguration has been processed. I just created what you proposed (REST Controller) and the register tracking processor just has no effect at runtime.

    The second issue is regarding the replaying. Actually, AxonFramework will not replay any events on registration, besides the FIRST registration. So once you register the new tracking processor, the AxonFramework identifies that the processors token is not present and wind-up all the events. After doing so, it will update the token entry of the processor. If you processor is for example filling a cache and should be re-constructed from the event replay on each start, it just won’t happen.

    In generally I’m looking exactly for the way to replay for events for a given processor. In Axon 2 docs (http://www.axonframework.org/docs/2.0/event-processing.html) the possibility of Cluster replay is mentioned. Unfortunatelly, it is gone in Axon3 both from docs and code…

    Do you have any ideas how to initiate a replay of events for a processor?
    (I assume there is a possibility to delete the token from the token_entry table, but this smells like an evil hack).

    Kind regards,

    Simon

  • Pingback: Implementing event replay with AxonFramework - TechJava

  • Pingback: Rebuilding projections in Axon Framework 3 - Michiel Rook's blog

Leave a Reply

Your email address will not be published. Required fields are marked *