在集群环境中运行的 Spring 计划任务

我正在编写一个应用程序,它有一个每60秒执行一次的 cron 作业。应用程序配置为在需要时可伸缩到多个实例。我只希望每60秒在一个实例上执行一次任务(在任何节点上)。我无法找到一个解决这个问题的方法,我很惊讶以前没有被问过很多次。我正在使用 Spring4.1.6。

    <task:scheduled-tasks>
<task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
117889 次浏览

批处理和计划作业通常在它们自己的独立服务器上运行,远离面向客户的应用程序,因此在预期在集群上运行的应用程序中包含作业并不是一个常见的要求。此外,集群环境中的作业通常不需要担心同一作业的其他实例并行运行,因此隔离作业实例不是一个很大的需求的另一个原因。

一个简单的解决方案是在 SpringProfile 中配置作业。例如,如果当前配置是:

<beans>
<bean id="someBean" .../>


<task:scheduled-tasks>
<task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
</beans>

改成:

<beans>
<beans profile="scheduled">
<bean id="someBean" .../>


<task:scheduled-tasks>
<task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
</beans>
</beans>

然后,只在一台机器上启动应用程序,并激活 scheduled配置文件(-Dspring.profiles.active=scheduled)。

如果主服务器由于某种原因变得不可用,只需启动启用配置文件的另一台服务器,事情就会继续正常运行。


如果您希望对作业进行自动故障转移,那么情况也会发生变化。然后,您需要保持作业在所有服务器上运行,并通过一个公共资源(如数据库表、集群缓存、 JMX 变量等)检查同步。

这是在集群中安全执行作业的另一种简单而健壮的方法。只有当节点是集群中的“领导者”时,才能基于数据库执行任务。

此外,当一个节点在群集中失败或关闭时,另一个节点成为领导者。

你所要做的就是建立一个“领导者选举”机制,每次都要检查一下自己是否是领导者:

@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
if (checkIfLeader()) {
final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
for (EmailTask emailTask : list) {
dispatchService.sendEmail(emailTask);
}
}
}

遵循以下步骤:

1. 定义集群中每个节点包含一个条目的对象和表:

@Entity(name = "SYS_NODE")
public class SystemNode {


/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;


/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;


/** The ip. */
@Column(name = "IP")
private String ip;


/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;


/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();


/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;


public Long getId() {
return id;
}


public void setId(final Long id) {
this.id = id;
}


public String getTimestamp() {
return timestamp;
}


public void setTimestamp(final String timestamp) {
this.timestamp = timestamp;
}


public String getIp() {
return ip;
}


public void setIp(final String ip) {
this.ip = ip;
}


public Date getLastPing() {
return lastPing;
}


public void setLastPing(final Date lastPing) {
this.lastPing = lastPing;
}


public Date getCreatedAt() {
return createdAt;
}


public void setCreatedAt(final Date createdAt) {
this.createdAt = createdAt;
}


public Boolean getIsLeader() {
return isLeader;
}


public void setIsLeader(final Boolean isLeader) {
this.isLeader = isLeader;
}


@Override
public String toString() {
return "SystemNode{" +
"id=" + id +
", timestamp='" + timestamp + '\'' +
", ip='" + ip + '\'' +
", lastPing=" + lastPing +
", createdAt=" + createdAt +
", isLeader=" + isLeader +
'}';
}

}

2. 创建以下服务: a)在数据库中插入节点; b)检查引导程序

@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {


/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);


/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";


/** The ip. */
private String ip;


/** The system service. */
private SystemService systemService;


/** The system node repository. */
private SystemNodeRepository systemNodeRepository;


@Autowired
public void setSystemService(final SystemService systemService) {
this.systemService = systemService;
}


@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
this.systemNodeRepository = systemNodeRepository;
}


@Override
public void pingNode() {
final SystemNode node = systemNodeRepository.findByIp(ip);
if (node == null) {
createNode();
} else {
updateNode(node);
}
}


@Override
public void checkLeaderShip() {
final List<SystemNode> allList = systemNodeRepository.findAll();
final List<SystemNode> aliveList = filterAliveNodes(allList);


SystemNode leader = findLeader(allList);
if (leader != null && aliveList.contains(leader)) {
setLeaderFlag(allList, Boolean.FALSE);
leader.setIsLeader(Boolean.TRUE);
systemNodeRepository.save(allList);
} else {
final SystemNode node = findMinNode(aliveList);


setLeaderFlag(allList, Boolean.FALSE);
node.setIsLeader(Boolean.TRUE);
systemNodeRepository.save(allList);
}
}


/**
* Returns the leaded
* @param list
*          the list
* @return  the leader
*/
private SystemNode findLeader(final List<SystemNode> list) {
for (SystemNode systemNode : list) {
if (systemNode.getIsLeader()) {
return systemNode;
}
}
return null;
}


@Override
public boolean isLeader() {
final SystemNode node = systemNodeRepository.findByIp(ip);
return node != null && node.getIsLeader();
}


@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (applicationEvent instanceof ContextRefreshedEvent) {
pingNode();
}
}


/**
* Creates the node
*/
private void createNode() {
final SystemNode node = new SystemNode();
node.setIp(ip);
node.setTimestamp(String.valueOf(System.currentTimeMillis()));
node.setCreatedAt(new Date());
node.setLastPing(new Date());
node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
systemNodeRepository.save(node);
}


/**
* Updates the node
*/
private void updateNode(final SystemNode node) {
node.setLastPing(new Date());
systemNodeRepository.save(node);
}


/**
* Returns the alive nodes.
*
* @param list
*         the list
* @return the alive nodes
*/
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
final List<SystemNode> finalList = new LinkedList<>();
for (SystemNode systemNode : list) {
if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
finalList.add(systemNode);
}
}
if (CollectionUtils.isEmpty(finalList)) {
LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
}
return finalList;
}


/**
* Finds the min name node.
*
* @param list
*         the list
* @return the min node
*/
private SystemNode findMinNode(final List<SystemNode> list) {
SystemNode min = list.get(0);
for (SystemNode systemNode : list) {
if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
min = systemNode;
}
}
return min;
}


/**
* Sets the leader flag.
*
* @param list
*         the list
* @param value
*         the value
*/
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
for (SystemNode systemNode : list) {
systemNode.setIsLeader(value);
}
}

}

3. ping 数据库以发送你还活着的消息

@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
systemNodeService.pingNode();
}


@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
systemNodeService.checkLeaderShip();
}

4. 你已经准备好了! 在执行任务之前检查一下你是不是领导者:

@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
if (checkIfLeader()) {
final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
for (EmailTask emailTask : list) {
dispatchService.sendEmail(emailTask);
}
}
}

我认为你必须使用 基于 JDBC-JobStore 的石英聚类为此目的

有一个 ShedLock项目正是为此服务的。你只需要注释任务,这些任务在执行时应该被锁定

@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
// do something
}

配置 Spring 和 LockProvider

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
class MySpringConfiguration {
...
@Bean
public LockProvider lockProvider(DataSource dataSource) {
return new JdbcTemplateLockProvider(dataSource);
}
...
}

您可以使用像 Db- 调度程序这样的可嵌入式调度程序来完成此任务。它具有持久的执行,并使用简单的乐观锁定机制来保证由单个节点执行。

如何实现用例的示例代码:

   RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
.execute((taskInstance, executionContext) -> {
System.out.println("Executing " + taskInstance.getTaskAndInstance());
});


final Scheduler scheduler = Scheduler
.create(dataSource)
.startTasks(recurring1)
.build();


scheduler.start();

我使用的是 数据库表进行锁定。,一次只有一个任务可以对表执行插入操作。另一个会得到一个 DuplicateKeyException。 插入和删除逻辑由一个 方面围绕@Calculed 注释进行处理。 我用的是 Spring Boot 2.0

@Component
@Aspect
public class SchedulerLock {


private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);


@Autowired
private JdbcTemplate jdbcTemplate;


@Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {


String jobSignature = joinPoint.getSignature().toString();
try {
jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});


Object proceed = joinPoint.proceed();


jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
return proceed;


}catch (DuplicateKeyException e) {
LOGGER.warn("Job is currently locked: "+jobSignature);
return null;
}
}
}


@Component
public class EveryTenSecondJob {


@Scheduled(cron = "0/10 * * * * *")
public void taskExecution() {
System.out.println("Hello World");
}
}


CREATE TABLE scheduler_lock(
signature varchar(255) NOT NULL,
date datetime DEFAULT NULL,
PRIMARY KEY(signature)
);

Dlock 被设计为通过使用数据库索引和约束只运行一次任务。您可以简单地执行以下操作。

@Scheduled(cron = "30 30 3 * * *")
@TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
public void execute() {


}

查看 文章关于使用它的内容。

Spring 上下文不是集群的,因此在分布式应用程序中管理任务有点困难,您需要使用支持 jgroup 的系统来同步状态,并让您的任务获得执行操作的优先级。或者您可以使用 ejb 上下文来管理类似 jboss ha 环境的集群式 ha 单例服务 Https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd 或者,您可以使用集群缓存和访问锁资源之间的服务和第一个服务采取锁将形成的行动或实现您自己的 jgroup 通信您的服务和执行的行动一个节点

您可以在这里使用 Zookeep 选择主实例,而主实例将只运行预定的作业。

这里的一个实现是 AspectandApacheCurator

@SpringBootApplication
@EnableScheduling
public class Application {


private static final int PORT = 2181;


@Bean
public CuratorFramework curatorFramework() {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:" + PORT, new ExponentialBackoffRetry(1000, 3));
client.start();
return client;
}


public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}


}

体态课

 @Aspect
@Component
public class LeaderAspect implements LeaderLatchListener{


private static final Logger log = LoggerFactory.getLogger(LeaderAspect.class);
private static final String ELECTION_ROOT = "/election";


private volatile boolean isLeader = false;


@Autowired
public LeaderAspect(CuratorFramework client) throws Exception {
LeaderLatch ll = new LeaderLatch(client , ELECTION_ROOT);
ll.addListener(this);
ll.start();
}




@Override
public void isLeader() {
isLeader = true;
log.info("Leadership granted.");
}


@Override
public void notLeader() {
isLeader = false;
log.info("Leadership revoked.");
}




@Around("@annotation(com.example.apache.curator.annotation.LeaderOnly)")
public void onlyExecuteForLeader(ProceedingJoinPoint joinPoint) {
if (!isLeader) {
log.debug("I'm not leader, skip leader-only tasks.");
return;
}


try {
log.debug("I'm leader, execute leader-only tasks.");
joinPoint.proceed();
} catch (Throwable ex) {
log.error(ex.getMessage());
}
}


}

领导者注释

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LeaderOnly {
}

计划任务

@Component
public class HelloWorld {


private static final Logger log = LoggerFactory.getLogger(HelloWorld.class);




@LeaderOnly
@Scheduled(fixedRate = 1000L)
public void sayHello() {
log.info("Hello, world!");
}
}

我使用了一种不同的方法,不需要设置数据库来管理节点之间的锁。

该组件名为 FencedLock,由 Hazelcast 提供。

我们使用它来防止另一个节点执行某些操作(不一定链接到调度) ,但它也可以用于在节点之间共享调度的锁。

为此,我们只需设置两个函数 helper,它们可以创建不同的锁名称:

@Scheduled(cron = "${cron.expression}")
public void executeMyScheduler(){
   

// This can also be a member of the class.
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();


Lock lock = hazelcastInstance.getCPSubsystem().getLock("mySchedulerName");


lock.lock();
try {
// do your schedule tasks here


} finally {
// don't forget to release lock whatever happens: end of task or any exceptions.
lock.unlock();
}
}

或者你也可以在延迟后自动释放锁: 假设你的 cron 作业每小时运行一次,你可以在50分钟后设置自动释放,如下:

@Scheduled(cron = "${cron.expression}")
public void executeMyScheduler(){
   

// This can also be a member of the class.
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();


Lock lock = hazelcastInstance.getCPSubsystem().getLock("mySchedulerName");


if ( lock.tryLock ( 50, TimeUnit.MINUTES ) ) {
try {
// do your schedule tasks here
} finally {
// don't forget to release lock whatever happens: end of task or any exceptions.
lock.unlock();
}
} else {
// warning: lock has been released by timeout!
}
}

注意,这个 Hazelcast 组件在基于云的环境(例如 k8s 集群)中工作得非常好,而且不需要支付额外的数据库。

下面是您需要配置的内容:

// We need to specify the name otherwise it can conflict with internal Hazelcast beans
@Bean("hazelcastInstance")
public HazelcastInstance hazelcastInstance() {
Config config = new Config();
config.setClusterName(hazelcastProperties.getGroup().getName());
NetworkConfig networkConfig = config.getNetworkConfig();


networkConfig.setPortAutoIncrement(false);
networkConfig.getJoin().getKubernetesConfig().setEnabled(hazelcastProperties.isNetworkEnabled())
.setProperty("service-dns", hazelcastProperties.getServiceDNS())
.setProperty("service-port", hazelcastProperties.getServicePort().toString());
config.setProperty("hazelcast.metrics.enabled", "false");


networkConfig.getJoin().getMulticastConfig().setEnabled(false);


return Hazelcast.newHazelcastInstance(config);
}

HazelcastProperties 是与属性映射的 ConfigurationProperties对象。

对于本地测试,您可以使用本地配置文件中的属性禁用网络配置:

hazelcast:
network-enabled: false
service-port: 5701
group:
name: your-hazelcast-group-name