如何在 Java 中实现数据库侦听器

我有一个需求,如果一条记录被插入到一个 db 表中,那么就需要自动执行一个 java 进程。实现 db 侦听器最简单的方法是什么?

102595 次浏览

一般的解决方案可能包括在感兴趣的表上创建一个触发器,通知任何侦听器有关 INSERT事件。一些数据库已经为这种进程间通知规范化了手段。例如:

甲骨文:

后记:

  • NOTIFY语句是这种通知的一种简单方法

其他:

  • 其他数据库中可能也有类似的通知机制,但我不知道。
  • 您总是可以通过在事件表中插入事件来实现自己的事件通知队列表,该事件表由 Java 进程使用/轮询。尽管如此,获得正确的性能可能是相当棘手的。

类似的答案是: 如何使用 java 建立数据库侦听器?

您可以使用一个支持事务的消息队列来做到这一点,并在提交事务或(连接关闭)不支持通知的数据库时触发一条消息。在大多数情况下,您必须手动通知并跟踪要通知的内容。

Spring 为 AMQPJMS提供了一些自动事务支持。您可以使用的一个更简单的替代方案是 番石榴的异步事件总线,但它只适用于一个 JVM。 对于下面的所有选项,我建议您使用消息队列通知平台的其他部分。

选项-非轮询非数据库特定

ORM 选项

有些库,比如 Hibernate JPA 有实体侦听器,使这个过程变得更容易,但那是因为它们假设它们管理所有的 CRUDING。

对于常规 JDBC,你将不得不做自己的簿记。也就是在连接被提交或关闭之后,然后向 MQ 发送已更新的消息。

JDBC 解析

一个复杂的簿记选项是包装/装饰您的 java.sql.DataSource和/或 java.sql.Connection在一个自定义的,这样的 commit()(和关闭) ,然后发送一个消息。我相信一些联邦缓存系统可以做到这一点。您可以捕获执行的 SQL 并进行解析,以查看它是 INSERT 还是 UPDATE,但是如果没有非常复杂的解析和元数据,您将无法获得行级监听。遗憾的是,我不得不承认这是一个优势的 ORM提供的,因为它知道你的更新。

刀的选择

如果您的 没有使用 ORM,最好的选择是在事务关闭后在 DAO 中手动发送一条消息,说明一行已经更新。在发送消息之前,请确保事务已关闭。

选项-轮询非特定于数据库的

有点遵循@GlenBest 的建议。

有些事情我不会这么做。我将外部化计时器或使它只有一个服务器运行计时器(即调度程序)。我只会使用 ScheduledExecutorService(更好地包装在番石榴的 ListenerScheduledExecutorService) ,而不是石英(IMHO 使用石英轮询超级杀伤力)。

在所有要监视的表中,应该添加一个“通知”列。

然后你可以这样做:

// BEGIN Transaction
List<String> ids = execute("SELECT id FROM table where notified = 'f'");
//If db not transactional either insert ids in a tmp table or use IN clause
execute("update table set notified = 't' where notified = 'f'")
// COMMIT Transaction
for (String id : ids) { mq.sendMessage(table, id); }

选项-db 专用

使用 Postgres NOTIFY,你仍然需要在一定程度上进行投票,所以你要做上面的大部分工作,然后把消息发送到总线上。

我不确定这个解决方案能在多大程度上满足您的需求,但可以作为一种选择。如果您正在使用 oracle,那么您可以编写一个 java 程序并将其编译为一个 oracle 函数。您可以从 post insert 触发器调用 Java 程序。

Oracle 数据库中的 Java 程序

假设:

  • 拥有标准的可移植代码比 Java 程序的即时实时执行更为重要。您希望允许可移植性,以备将来使用其他技术(例如,避免专有 DB 事件、外部触发器)。在记录添加到表后,Java 进程可以稍微运行(例如10秒后)。也就是说,调度 + 轮询或实时触发/消息/事件都是可以接受的。

  • 如果同时向表中添加多行,则需要运行一个进程,而不是多个进程。DB 触发器将为每一行启动一个 java 进程——这是不合适的。

  • 服务质素非常重要。即使存在致命的硬件或软件错误,您也希望 Java 程序再次运行并处理不完整的数据。

  • 您希望对您的环境应用强大的安全标准(例如,避免让 java 或 DB 直接执行 OS 命令)

  • 你想最小化代码

    1. 不依赖专有数据库功能的核心 Java 标准代码:

      • 使用 ScheduledExecutorService 或 Quartz 调度程序(或 unix cron 作业或 Windows 任务调度程序)每分钟运行一个 Java 程序(或者可以每10秒运行一次)。这既充当调度程序,又充当看门狗,确保程序全天候运行。Quartz 也可以部署在应用服务器中。
      • 让你的 Java 程序运行1分钟(或10秒) ,循环,通过 JDBC 查询数据库,休眠几秒钟,然后最终退出。
    2. 如果你在应用服务器上有应用程序: 创建一个使用 Timer 服务的 Session Bean,然后再次通过 JDBC 会话 Bean 定时器服务查询表

    3. 有一个写入/追加文件的 DB 触发器。当文件更改 Java7文件监视器时,使用 java7文件监视器触发逻辑

还有另一种选择: 使用带有 DB 适配器的开源 ESB 触发逻辑(例如 Fuse、 Mule 或 OpenAdapter) ,但这提供了超出所述要求的强大功能,而且安装和学习都很费时和复杂。

EJB 计时器使用@Scheme 的示例:

public class ABCRequest {
// normal java bean with data from DB
}


@Singleton
public class ABCProcessor {
@Resource DataSource myDataSource;
@EJB ABCProcessor abcProcessor;
// runs every 3 minutes
@Schedule(minute="*/3", hour="*")
public void processNewDBData() {
// run a JDBC prepared statement to see if any new data in table, put data into RequestData
try
{
Connection con = dataSource.getConnection();
PreparedStatement ps = con.prepareStatement("SELECT * FROM ABC_FEED;");
...
ResultSet rs = ps.executeQuery();
ABCRequest abcRequest
while (rs.hasNext()) {
// population abcRequest
}
abcProcessor.processABCRequest(abcRequst);
} ...
}
}


@Stateless
public class class ABCProcessor {
public void processABCRequest(ABCRequest abcRequest) {
// processing job logic
}
}

另请参见: 看看这个答案用于将 CDI 事件对象从 EJB 发送到 Web 容器。

我有一个解决甲骨文的办法。你不需要创建你自己的,因为现在 Oracle 收购了 Java,并为它发布了一个监听器。据我所知,这并不在内部使用轮询,而是将通知推送到 Java 端(可能基于某个触发器) :

public interface oracle.jdbc.dcn.DatabaseChangeListener
extends java.util.EventListener {
void onDatabaseChangeNotification(oracle.jdbc.dcn.DatabaseChangeEvent arg0);
}

你可以这样实现它(这只是一个例子) :

public class DBListener implements DatabaseChangeListener {
private DbChangeNotification toNotify;


public BNSDBListener(DbChangeNotification toNotify) {
this.toNotify = toNotify;
}


@Override
public void onDatabaseChangeNotification(oracle.jdbc.dcn.DatabaseChangeEvent e) {
synchronized( toNotify ) {
try {
toNotify.notifyDBChangeEvent(e); //do sth
} catch (Exception ex) {
Util.logMessage(CLASSNAME, "onDatabaseChangeNotification",
"Errors on the notifying object.", true);
Util.printStackTrace(ex);
Util.systemExit();
}
}
}
}

编辑:
您可以使用以下类来注册: oracle.jdbc.OracleConnectionWrapper

public class oracle.jdbc.OracleConnectionWrapper implements oracle.jdbc.OracleConnection {...}

假设你在某处创建了一个方法:

public void registerPushNotification(String sql) {
oracle.jdbc.driver.OracleConnection oracleConnection = ...;//connect to db


dbProperties.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
dbProperties.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true");


//this is what does the actual registering on the db end
oracle.jdbc.dcn.DatabaseChangeRegistration dbChangeRegistration= oracleConnection.registerDatabaseChangeNotification(dbProperties);


//now you can add the listener created before my EDIT
listener = new DBListener(this);
dbChangeRegistration.addListener(listener);


//now you need to add whatever tables you want to monitor
Statement stmt = oracleConnection.createStatement();
//associate the statement with the registration:
((OracleStatement) stmt).setDatabaseChangeRegistration(dbChangeRegistration); //look up the documentation to this method [http://docs.oracle.com/cd/E11882_01/appdev.112/e13995/oracle/jdbc/OracleStatement.html#setDatabaseChangeRegistration_oracle_jdbc_dcn_DatabaseChangeRegistration_]


ResultSet rs = stmt.executeQuery(sql); //you have to execute the query to link it to the statement for it to be monitored
while (rs.next()) { ...do sth with the results if interested... }


//see what tables are being monitored
String[] tableNames = dbChangeRegistration.getTables();
for (int i = 0; i < tableNames.length; i++) {
System.out.println(tableNames[i]    + " has been registered.");
}
rs.close();
stmt.close();
}

此示例不包括 try-catch 子句或任何异常处理。