Here at , we aggregate and stitch together a lot of data from a lot of different sources on a regular basis – helping companies stay out of head of risks to their growth. Some of this aggregation takes a long time and we needed a way to distribute the processing of all this data across our fleet. Peoplelogic When we first started looking into how to solve this problem, we had a hard time finding the right combination of libraries, examples, and versions to combine Spring Boot, Quartz, and Hazelcast into a distributed task engine. Each one works great on their own, but what if we wanted to combine them? This article shows you exactly that (along with combining Spring Beans and Hazelcast as a bonus!) and assumes that you’re moderately familiar with the structure and setup of a Spring Boot project (or at least the Spring Framework!). We will use IntelliJ IDEA for the project, but any integrated development environment (IDE) that supports Gradle projects should work just fine. Let’s get started! Setup Your Project – Initializing Spring Boot and Gradle Start with the Spring Initializr, either from inside IntelliJ IDEA or , or by cloning our demo starter project at . You’ll wind up with a project structure that looks a bit like the following: via the web https://github.com/peoplelogic/article-starter With the project foundation in place, let’s get the project ready for . Open up the file from the project you just cloned. Look for the section and add the following: Hazelcast build.gradle dependencies implementation 'com.hazelcast:hazelcast-all:3.12.9' We will also need the ability to run Quartz jobs, so be sure to add the following to the build file: compile 'org.springframework.boot:spring-boot-starter-quartz' Finally, we want to make sure that the Quartz jobs don’t run at the same time across multiple nodes within the Hazelcast cluster, so we’ll need a library that will keep those in sync. Download from our GitHub repository and drop it into <project>/libs and then add the following to your build file: https://github.com/peoplelogic/quartz-scheduler-hazelcast-jobstore/releases/download/quartz-hazelcast-jobstore-2.0.0/quartz-hazelcast-jobstore-2.0.0.jar compile files( ) 'libs/quartz-hazelcast-jobstore-2.0.0.jar' Your dependencies section should now look like this: dependencies { implementation testImplementation( ) { exclude group: , : } compile implementation compile compile files( ) } 'org.springframework.boot:spring-boot-starter' 'org.springframework.boot:spring-boot-starter-test' 'org.junit.vintage' module 'junit-vintage-engine' 'org.springframework.boot:spring-boot-starter-quartz' 'com.hazelcast:hazelcast-all:3.12.1' 'org.apache.commons:commons-lang3:3.11' 'libs/quartz-hazelcast-jobstore-2.0.0.jar' With your dependencies in place, we’re ready to start writing some code. Configure Hazelcast Now that Spring Boot is in place, configuring Hazelcast is easy. We simply need to create a (from spring-context) and specify the Spring beans that we’ll expose when running locally or in production. First, create a new package called under and create a new class called Paste the following into that file: Configuration class config ai.peoplelogic.demo HazelcastConfiguration.java. package ai.peoplelogic.demo.demo.config; com.hazelcast.config.Config; com.hazelcast.config.JoinConfig; com.hazelcast.spring.context.SpringManagedContext; org.springframework.context.annotation.Bean; org.springframework.context.annotation.Configuration; org.springframework.context.annotation.Profile; java.util.Collections.singletonList; @Configuration public { @Bean public SpringManagedContext managedContext() { SpringManagedContext(); } @Bean @Profile( ) public Config localConfig() { Config config = Config(); config.setInstanceName( ); config.setManagedContext(managedContext()); config.getNetworkConfig().setPort( ); config.getGroupConfig().setName( ); JoinConfig joinConfig = config.getNetworkConfig().getJoin(); joinConfig.getMulticastConfig().setEnabled( ); joinConfig.getTcpIpConfig().setEnabled( ).setMembers(singletonList( )); config; } @Bean @Profile( ) public Config awsConfig() { Config config = Config(); config.setInstanceName( ); config.setManagedContext(managedContext()); config.getGroupConfig().setName( ); JoinConfig joinConfig = config.getNetworkConfig().getJoin(); joinConfig.getMulticastConfig().setEnabled( ); joinConfig.getAwsConfig().setEnabled( ); config; } } import import import import import import import static class HazelcastConfiguration return new "local" new "scaleable-task-demo" 3000 "scaleable-task-demo" false true "127.0.0.1" return "prod" new "scaleable-task-demo" "scaleable-task-demo" false true /** * Shared instance IAM roles are preferred for locating other nodes in the cluster */ //joinConfig.getAwsConfig().setEnabled(true).setProperty("access-key", hazelcastAccessKey).setProperty("secret-key", hazelcastAccessSecret); return With the code above, we’re defining that this class provides configuration information for the autoconfiguration system in Spring Boot. We’re also defining a Spring Bean for a that tells Hazelcast that it’s tasks should be aware of Spring Beans for dependency injection. Hazelcast ManagedContext Finally, you’re setting up two beans – one for running with a local profile and one for running in production (just change the profile name for your production instance). In this case, we’re running on AWS, so we’re using the for production. Hazelcast Config AWS style configuration Note: We also set a name for the group in Hazelcast. This prevents other Hazelcast jobs in the local network from joining, even if they’re on the same ports. There are other configuration options available, but those are outside the scope of this demo. Before we run, add the following line to your file: src/main/resources/application.properties spring.profiles.active=local With this configuration in place, let’s start up our application. You should see Hazelcast attempt to run and locate other members of the cluster name you specified: : : INFO --- [ main] a.peoplelogic.demo.demo.DemoApplication : The following profiles are active: local : : INFO --- [ main] com.hazelcast.instance.AddressPicker : [LOCAL] [scaleable-task-demo] [ ] Interfaces is disabled, trying to pick one address TCP-IP config addresses: [ ] : : INFO --- [ main] com.hazelcast.instance.AddressPicker : [LOCAL] [scaleable-task-demo] [ ] Picked [ ]: , using socket ServerSocket[addr= 2020 -09 -21 15 52 14.202 87304 2020 -09 -21 15 52 14.802 87304 3.12 .9 from 127.0 .0 .1 2020 -09 -21 15 52 14.822 87304 3.12 .9 127.0 .0 .1 3000 /0:0:0:0:0:0:0:0,localport=3000], bind any local is true 2020-09-21 15:52:14.830 INFO 87304 --- [ main] com.hazelcast.system : [127.0.0.1]:3000 [scaleable-task-demo] [3.12.9] Hazelcast 3.12.9 (20200819 - 3638e8b) starting at [127.0.0.1]:3000 It looks like things are ready to go! You can start up a second instance and you should see similar to the following in your logs: Members { : , : } [ Member [ ]: - f0-bc4d ece de-c1143aa45578 Member [ ]: - c40f3fb-cbeb c-bc1f fb33a3c4 ] size 2 ver 2 127.0 .0 .1 3000 236936 -4 -90 127.0 .0 .1 3002 7 -448 -3592 this Keeping the Trains Running on Time –Scheduling with Quartz Before we can get to coding our highly scalable task in Hazelcast, we need to finish setting up . Quartz is an enterprise Java Job Scheduler with good support by Spring and more features than the built-in Spring scheduler. In addition to simple time delay triggers and a crontab like trigger, it also allows for custom job stores that work in a clustered environment (Hazelcast, JDBC, etc). Quartz To set things up, we’ll need another configuration class called under the package QuartzConfiguration.java ai.peoplelogic.demo.config with the following code: package ai.peoplelogic.demo.demo.config; ai.peoplelogic.demo.demo.jobs.DemoJob; com.hazelcast.core.HazelcastInstance; com.idvp.data.infrastructure.scheduling.quarz.store.hazelcast.HazelcastJobStoreDelegate; org.apache.commons.lang3.ArrayUtils; org.quartz.CronTrigger; org.quartz.JobDetail; org.quartz.Trigger; org.quartz.impl.StdSchedulerFactory; org.springframework.beans.factory.annotation.Autowired; org.springframework.beans.factory.annotation.Qualifier; org.springframework.beans.factory.annotation.Value; org.springframework.context.ApplicationContext; org.springframework.context.annotation.Bean; org.springframework.context.annotation.Configuration; org.springframework.scheduling.quartz.*; java.util.Calendar; java.util.Properties; @Configuration public { @Value( ) fetchCron; @Autowired HazelcastInstance hazelcastInstance; private ApplicationContext applicationContext; public QuartzConfiguration(ApplicationContext applicationContext) { .applicationContext = applicationContext; } @Bean public SpringBeanJobFactory springBeanJobFactory() { AutowiringSpringBeanJobFactory jobFactory = AutowiringSpringBeanJobFactory(); jobFactory.setApplicationContext(applicationContext); jobFactory; } @Bean public SchedulerFactoryBean scheduler(Trigger... triggers) { HazelcastJobStoreDelegate.setInstance(hazelcastInstance); SchedulerFactoryBean schedulerFactory = SchedulerFactoryBean(); Properties properties = Properties(); properties.setProperty( , ); properties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, HazelcastJobStoreDelegate.class.getName()); schedulerFactory.setOverwriteExistingJobs( ); schedulerFactory.setAutoStartup( ); schedulerFactory.setQuartzProperties(properties); schedulerFactory.setJobFactory(springBeanJobFactory()); schedulerFactory.setWaitForJobsToCompleteOnShutdown( ); (ArrayUtils.isNotEmpty(triggers)) { schedulerFactory.setTriggers(triggers); } schedulerFactory; } CronTriggerFactoryBean createCronTrigger(JobDetail jobDetail, cronExpression, triggerName) { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.SECOND, ); calendar.set(Calendar.MILLISECOND, ); CronTriggerFactoryBean factoryBean = CronTriggerFactoryBean(); factoryBean.setJobDetail(jobDetail); factoryBean.setCronExpression(cronExpression); factoryBean.setStartTime(calendar.getTime()); factoryBean.setStartDelay( L); factoryBean.setName(triggerName); factoryBean.setMisfireInstruction(CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING); factoryBean; } JobDetailFactoryBean createJobDetail(Class jobClass, jobName) { JobDetailFactoryBean factoryBean = JobDetailFactoryBean(); factoryBean.setName(jobName); factoryBean.setJobClass(jobClass); factoryBean.setDurability( ); factoryBean; } @Bean(name = ) public JobDetailFactoryBean jobMemberClassStats() { createJobDetail(DemoQuartzJob.class, ); } @Bean(name = ) public CronTriggerFactoryBean triggerMemberClassStats(@Qualifier( ) JobDetail jobDetail) { createCronTrigger(jobDetail, fetchCron, ); } } import import import import import import import import import import import import import import import import import class QuartzConfiguration "${demo.job.cron}" String this new return new new "org.quartz.scheduler.instanceId" "AUTO" true true true if return static String String // To fix an issue with time-based cron jobs 0 0 new 0 return static String new true return "demoJob" return "Demo Quartz Job" "demoTrigger" "demoJob" return "Demo Quartz Trigger" The code above programmatically creates a factory for creating new schedulers through the triggers that are defined as beans and sets the clustered Job store and also tells it that we’re going to start the scheduler immediately. It also defines a new Cron trigger using a property from or any other configuration provided location and defines the Job classes we’re going to execute (provided as Spring Beans). application.properties We’re also going to need to create a Job class. Create a new package and put a class in it called with the following code to start: ai.peoplelogic.demo.jobs DemoQuartzJob.java package ai.peoplelogic.demo.demo.jobs; org.quartz.Job; org.quartz.JobExecutionContext; org.slf4j.Logger; org.slf4j.LoggerFactory; org.springframework.stereotype.Component; java.io.Serializable; @Component public { private final Logger log = LoggerFactory.getLogger( .getClass()); public execute(JobExecutionContext context) { log.info( ); } } import import import import import import , class DemoQuartzJob implements Job Serializable this void "Logging job execution." Note: Your Jobs will need to implement the Quartz Job interface (which provides the execute method above) and Serializable (so that Hazelcast can serialize your Job across the cluster in its store). Finally, since we want to be able to expose our Jobs as Spring beans and also use Spring beans in our jobs we’re going to need one more helper class in called Put the following code in that class to finish the Quartz setup: ai.peoplelogic.demo.config AutowiringSpringBeanJobFactory.java. package ai.peoplelogic.demo.demo.config; org.quartz.spi.TriggerFiredBundle; org.springframework.beans.factory.config.AutowireCapableBeanFactory; org.springframework.context.ApplicationContext; org.springframework.context.ApplicationContextAware; org.springframework.scheduling.quartz.SpringBeanJobFactory; public final { private transient AutowireCapableBeanFactory beanFactory; @Override public setApplicationContext(final ApplicationContext context) { beanFactory = context.getAutowireCapableBeanFactory(); } @Override protected createJobInstance(final TriggerFiredBundle bundle) throws Exception { final job = .createJobInstance(bundle); beanFactory.autowireBean(job); job; } } import import import import import class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware void Object Object super return To finish the setup of Quartz, we just need to add one more property to our : src/main/resources/application.properties demo.job.cron= / * * * ? 0 0 1 This line tells our Quartz cron job to run at the top of every minute. You could do 0/3 if you wanted to run every 3 minutes or 0/10 if you wanted to run every 10. When you start your application again, you should see the following in the console: : : INFO --- [ main] org.quartz.core.SchedulerSignalerImpl : Initialized Scheduler Signaller type: 2020 -09 -22 10 59 51.752 92337 of . . . 2020-09-22 10:59:51.752 92337 --- [ ] . . . : .2.3.2 . 2020-09-22 10:59:51.779 92337 --- [ ] . . . . . . . . : . 2020-09-22 10:59:51.780 92337 --- [ ] . . . : - : ( .3.2) ' ' ' - - . ' : ' . . . ' - . . . : 0 ' . . . ' - 10 . - ' . . . . . . . . ' - . . 2020-09-22 10:59:51.780 92337 --- [ ] . . . : ' ' . 2020-09-22 10:59:51.780 92337 --- [ ] . . . : : 2.3.2 2020-09-22 10:59:51.780 92337 --- [ ] . . . : : . . . . . @72 2020-09-22 10:59:51.784 92337 --- [ ] . . . . . : [127.0.0.1]:3000 [ - - ] [3.12.9] ... 2020-09-22 10:59:51.957 92337 --- [ ] . . . . : 2020-09-22 10:59:51.990 92337 --- [ ] . . . : $ - - . . class org quartz core SchedulerSignalerImpl INFO main org quartz core QuartzScheduler Quartz Scheduler v created INFO main c i d i s q s h HazelcastJobStore HazelcastJobStore initialized INFO main org quartz core QuartzScheduler Scheduler meta data Quartz Scheduler v2 scheduler with instanceId Matthews MacBook Pro local1600786791746 Scheduler class org quartz core QuartzScheduler running locally NOT STARTED Currently in standby mode Number of jobs executed Using thread pool org quartz simpl SimpleThreadPool with threads Using job store com idvp data infrastructure scheduling quarz store hazelcast HazelcastJobStoreDelegate which supports persistence and is clustered INFO main org quartz impl StdSchedulerFactory Quartz scheduler scheduler initialized from an externally provided properties instance INFO main org quartz impl StdSchedulerFactory Quartz scheduler version INFO main org quartz core QuartzScheduler JobFactory set to ai peoplelogic demo demo config AutowiringSpringBeanJobFactory b40f87 INFO main c h i p impl PartitionStateManager scaleable task demo Initializing cluster partition table arrangement INFO main o s s quartz SchedulerFactoryBean Starting Quartz Scheduler now INFO main org quartz core QuartzScheduler Scheduler scheduler_ _Matthews MacBook Pro local1600786791746 started Notice how it’s showing the custom Job store and tells us that we support persistence and that it’s clustered. As it runs, you should see output from the Job we created above: : : INFO --- [eduler_Worker ] a.p.demo.demo.jobs.DemoQuartzJob : Logging job execution. 2020 -09 -22 11 02 01.697 92434 -1 If you start up a second instance of the demo application, you’ll see the output in both consoles, but at different timestamps – the clustered job store is doing its job! Setup Your Task Now that we’ve got Hazelcast, Quartz, and Spring boot configured and running, we need to create a task that we want to split the execution of across the Hazelcast cluster. To get started, let’s open up the file we created in the previous section. We’re going to modify the job we created earlier to ensure that we don’t execute the same job twice (ie. the job ran for too long) and also to add the code to grab an from Hazelcast and tell it to execute our new Task. Modify so that it looks like the following: DemoQuartzJob.java IExecutorService DemoQuartzJob.java package ai.peoplelogic.demo.demo.jobs; com.hazelcast.core.ExecutionCallback; com.hazelcast.core.HazelcastInstance; com.hazelcast.core.IExecutorService; org.quartz.Job; org.quartz.JobExecutionContext; org.quartz.Scheduler; org.quartz.SchedulerException; org.slf4j.Logger; org.slf4j.LoggerFactory; org.springframework.beans.factory.annotation.Autowired; org.springframework.stereotype.Component; java.io.Serializable; java.util.List; java.util.Map; @Component public { private final Logger log = LoggerFactory.getLogger( .getClass()); @Autowired HazelcastInstance hazelcastInstance; public execute(JobExecutionContext context) { log.info( ); { Scheduler scheduler = context.getScheduler(); List<JobExecutionContext> jobs = scheduler.getCurrentlyExecutingJobs(); (JobExecutionContext job : jobs) { (job.getTrigger().equals(context.getTrigger()) && job.getJobDetail() != context.getJobDetail()) { log.warn( ); ; } } executeJobOnCluster(); } (SchedulerException e) { log.error( , e); } } private executeJobOnCluster() { IExecutorService executorService = hazelcastInstance.getExecutorService( ); (int i = ; i < ; i++) { DemoTask task = DemoTask(); executorService.submit(task, ExecutionCallback< < , >>() { public onResponse( < , > incomingResponse) { (incomingResponse.size() > && incomingResponse.containsKey( )) { System.out.println( + incomingResponse.get( )); } } public onFailure(Throwable t) { log.error( , t); } }); } } } import import import import import import import import import import import import import import , class DemoQuartzJob implements Job Serializable this void "Job execution started." try for if "Ignored fetch job because another is running on this cluster." return catch "Error executing scheduler: '(" void "scaleable-task-demo" // Execute 100 tasks for 0 100 new new Map String Object void Map String Object if 0 "name" "Received response on task: " "name" void "Error getting distributed task result" You can see the execute function now checks that we’re not executing the same job twice and we’ve added a new method, which uses Hazelcast’s callback executer pattern to send a Task (DemoTask in this case) to the cluster members and then listen for a response or failure. In the successful response (onResponse), we simply output the key of the Map that was the response. To simulate sending a variety of tasks, we’re sending these tasks 100 times to the executor. executeJobOnCluster name Note: The Task can return any object you want as long as it and everything that it references is Serializable. Be very careful about what you send back and forth over the Hazelcast cluster as it can have very sneaky serialization errors. Note: Tasks will return in random order and asynchronously – when you handle the response, don’t expect them to be in any given order. With the Quartz Job setup to send your Task to Hazelcast, we need to write one final class – the Callable Task – . Create a new class in called and add the following code: DemoTask.java ai.peoplelogic.demo.jobs DemoTask.java package ai.peoplelogic.demo.demo.jobs; com.hazelcast.core.HazelcastInstance; com.hazelcast.core.HazelcastInstanceAware; com.hazelcast.spring.context.SpringAware; org.slf4j.Logger; org.slf4j.LoggerFactory; org.springframework.beans.BeansException; org.springframework.context.ApplicationContext; org.springframework.context.ApplicationContextAware; java.io.Serializable; java.util.HashMap; java.util.Map; java.util.Random; java.util.concurrent.Callable; java.util.concurrent.TimeUnit; @SpringAware public { transient HazelcastInstance hazelcastInstance; transient ApplicationContext context; private final Logger log = LoggerFactory.getLogger( .getClass()); public DemoTask() { } public setApplicationContext(final ApplicationContext applicationContext) throws BeansException { context = applicationContext; } @Override public setHazelcastInstance(HazelcastInstance hazelcastInstance) { .hazelcastInstance = hazelcastInstance; } @Override public < , > call() throws Exception { TimeUnit.SECONDS.sleep( ); log.info( ); < , > result = HashMap<>(); result.put( , + hazelcastInstance.getCluster().getLocalMember().getUuid()); result.put( , ); result; } } import import import import import import import import import import import import import import , < < , >>, , class DemoTask implements HazelcastInstanceAware Callable Map String Object Serializable ApplicationContextAware this void void this Map String Object // Here you would do some really fancy logic. We're just going to sleep for 10 seconds and store some values 10 "Executing task" Map String Object new "name" "Demo Task " "success" true return In this very simple example, we’ve implemented a Callable interface that Hazelcast expects for its Executor and told Hazelcast that it needs to wire in the Hazelcast instance that we’re using and also that this Task is aware of the Spring ApplicationContext. With that configuration in place, then you can use @Autowire to inject Spring Beans into this task to do more complicated logic. Note: Be sure to mark the autowired beans as like we have with the HazelcastITnstance and the ApplicationContext. This prevents them from being reused between executions. transient Note: Be careful if you call database transactions from your task - particularly lazy loaded Hibernate or JPA objects – you may wind up with exceptions! Now, with our code wired together, let’s fire it up and see if we execute the jobs across the cluster. You should see the same output across any nodes in the cluster during each Job run: Node 2: : : INFO --- [cached.thread ] ai.peoplelogic.demo.demo.jobs.DemoTask : Executing task : : INFO --- [cached.thread ] ai.peoplelogic.demo.demo.jobs.DemoTask : Executing task 2020 -09 -22 13 25 10.057 96239 -7 2020 -09 -22 13 25 10.058 96239 -8 Node 3: : : INFO --- [cached.thread ] ai.peoplelogic.demo.demo.jobs.DemoTask : Executing task : : INFO --- [cached.thread ] ai.peoplelogic.demo.demo.jobs.DemoTask : Executing task 2020 -09 -22 13 25 10.058 96243 -4 2020 -09 -22 13 25 10.058 96243 -5 Along with output indicating that a response was returned on the node that executed the job: Received response on task: Demo Task ff66bd34-f5b9 f3 cc8 c49842e0fe2 Received response on task: Demo Task da90cff9 b6 d9a fc87548ddaa -41 -8 -4 -0475 -43 -9 -2 Scale It! That’s it! You’re ready to scale the application across an entire fleet of servers. Just start up additional instances of your application and Hazelcast will find the new members and immediately start distributing the tasks to those new members. In this article, you’ve seen how to setup a new Spring Boot application, add Hazelcast and Quartz to that application, and then implement the Hazelcast distributed executor pattern to execute logic from your Spring Beans across a cluster. From the example, you can now start to add more complicated logic, inject your custom Spring Beans into your tasks, or execute more compute intensive tasks. We’ve uploaded all the sample code to a GitHub repository – feel free to clone it, use it, and contribute back changes – here . https://github.com/peoplelogic/scalable-task-execution Happy coding! Previously published at https://peoplelogic.ai/blog/scalable-task-execution-with-hazelcast-and-spring-boot