使用 Drools 和 JPA 实现持续的实时数据分析

根据 Drools 5 工作内存中的事实情况对 JPA POJO 进行编程

您可以将 Drools 与 JPA 和基于 Spring 的应用程序代码集成,而且无需采取侵入性的命令样式编程即可实现这一点。了解如何经济高效地使用 POLO 将业务需求融入实时系统监控和持续数据分析流程。作者 Xinyu Liu 分享了自己在 Java™ 持久性和业务集成技术方面的经验,包括使用 Drools 5 构建具有内存有效和健壮的应用程序方面的高级技巧。

Xinyu Liu, 产品开发副总裁, eHealthObjects

作为 Sun Microsystems 的一名认证企业架构师,Xinyu Liu 在尖端服务器端技术方面拥有广泛的应用程序设计和开发经验。他获得了乔治华盛顿大学的研究生学位,目前担任 eHealthObjects 的产品开发副总裁,这是一家医疗技术公司,提供创新的医疗产品、解决方案、服务和交流平台,可以与其他系统、服务和应用程序无缝集成。Dr. Liu 曾为 Java.net、JavaWorld.com 和 IBM developerWorks 撰写过各种主题的文章,例如 JSF、Spring Security、Hibernate Search、Spring Web Flow 和 Servlet 3.0 规范。他还在 Packt Publishing 工作过,负责审核 Spring Web Flow 2 Web DevelopmentGrails 1.1 Web Application DevelopmentApplication Development for IBM WebSphere Process Server 7 and Enterprise Service Bus 7 等书。



2012 年 8 月 20 日

企业开发人员按照管理复杂工作流、业务规则和业务智能来分派任务,这样可以快速实现企业平台的价值,该平台集成了工作流引擎、企业服务总线 (ESB) 和规则引擎。迄今为止,这个出色的平台已经被 IBM WebSphere® Process Server/WebSphere Enterprise Service Bus(参见 参考资料)和 Oracle SOA Suite 之类的商用产品填满了。来自 Boss Community 的 Drools 5 是一种开源替代方案,它通过一组统一的 API 和一个共享的、有状态的知识会话来无缝集成 jBPM 工作流引擎和规则引擎。

面向初学者

请注意,本文假设您熟悉 Spring 平台、Java Persistence API 和 Drools 的基本原理。请参阅 参考资料,了解有关这些主题的更多介绍性文章。

Drools 5 的 Business Logic 集成平台主要包括 Drools Expert 和 Drools Fusion,这两项共同组成了平台的规则引擎和用于复杂事件处理/时态推理的基础架构。本文的样例应用程序是根据这些核心特性构建的。请参阅 参考资料,了解有关 Drools 5 中其他可用程序包的更多信息。

Drools 5 中的 POJO

传统的 Java 对象 (POJO) 是在 Spring 框架中首次以引人注目的方式实现的。POJO 以及依赖注入 (DI) 和面向方面的编程 (AOP) 共同标志着向简单性的回归,这种简单性有效地促进 Spring 成为开发 Web 应用程序的一种行业标准。POJO 的采用已经从 Spring 流向 EJB 3.0 和 JPA,然后再流向 XML-to-Java 绑定技术(比如 JAXB 和 XStream)。最近,POJO 已通过 Hibernate Search 集成到了全文搜索引擎 Lucene 中(参阅 参考资料)。

如今,由于这些增加的改进,应用程序的 POJO 数据模型可以在多个层上进行传播,并直接通过 Web 页面或 SOAP/REST Web 服务端点进行公开。作为一种编程模型,POJO 既经济高效又属于非侵入性的,这为开发人员在简化企业架构时节约了不少时间。

现在,Drools 5 通过允许直接将 POJO 作为事实 (fact) 直接插入知识会话(knowledge session)中,或是插入一个称为 “工作内存” 的规则引擎中,将 POJO 编程简单性应用于下一个级别。本文介绍了一种既经济高效又属于非侵入性的方法,这种方法将 JPA 实体作为 Drools 工作内存中的事实来进行操作。持续的实时数据分析从来不会这么简单。

Drools 编程挑战

许多医疗服务提供商使用案例管理系统作为跟踪医疗记录(比如护理、处方和评估)的一种经济高效的方法。我们的示例程序(基于这样一种系统)具有以下流程和需求:

  • 案例在系统内的所有临床医生之间循环。
  • 临床医生每周至少负责一个评估任务,或将通知发送给临床医生的监管员。
  • 系统给临床医生自动安排评估任务。
  • 如果案例未评估超过 30 天,则向案例组中的所有临床医生发送提醒。
  • 如果没有响应,系统会采取系统的业务规则定义的措施,比如通知出现问题的临床医生组并提出另一项计划。

为该用例选择一个业务流程管理 (BPM) 工作流和规则引擎是有一定道理的:系统使用数据剖析/分析规则(已在上述列表中用斜体字标出),将每个案例用作在 jBPM 中长期运行的一个流程/工作流,而且我们可以使用一个 Drools Planner 来满足自动安排的需求。出于本文的目的,我们将只关注程序的业务规则。我们还要介绍的是系统需求,在满足规则条件时立即实时生成提醒和通知。因此这是一个持续的实时数据分析用例。

清单 1 显示了在我们的系统中声明的三个实体类:MemberCaseClinicianCaseSupervision

清单 1. 实体类
@Entity
@EntityListeners({DefaultWorkingMemoryPartitionEntityListener.class})
public class MemberCase implements Serializable 
{
  private Long id; // pk
  private Date startDtm;
  private Date endDtm;
  private Member member; // not null (memberId)
  private List<CaseSupervision> caseSupervisions = new ArrayList<CaseSupervision>();
  //...
}
 
@Entity
@EntityListeners({DefaultWorkingMemoryPartitionEntityListener.class})
public class Clinician implements Serializable 
{ 
  private Long id; // pk
  private Boolean active;
  private List<CaseSupervision> caseSupervisions = new ArrayList<CaseSupervision>();
	//...
}

@Entity
@EntityListeners({SupervisionStreamWorkingMemoryPartitionEntityListener.class})
public class CaseSupervision implements Serializable 
{ 
  private Long id; // pk
  private Date entryDtm;
  private MemberCase memberCase;
  private Clinician clinician;
  //...
}

MemberCase 中的每个实例代表一个病历。Clinician 代表机构中的临床医生。临床医生每次进行案例评估时会产生一个 CaseSupervision 记录。同时,这三个实体是将要定义的业务规则中的事实类型。还要注意的是,上述 CaseSupervision 被声明为 Drools 中的一个事件类型

从应用程序的角度来看,我们可以从系统的任何地方、在不同的屏幕上、在不同的工作流中修改这三种类型的实体。我们甚至可以使用 Spring Batch 这样的工具来批量更新实体。然而,出于本例的考虑,让我们假设将只通过 JPA 持久上下文来更新实体。

注意,样例应用程序是一个 Spring-Drools 集成,它使用 Maven 来完成构建。本文稍后将考虑一些配置细节,但是您可以随时 下载源 zip。现在,让我们考虑一些使用 Drools 5 的概念特性。

事实和 FactHandle

规则引擎的一般概念是:事实 (fact) 是规则所依赖的数据对象。在 Drools 中,事实是从应用程序获得且断言为引擎的工作内存的任意 Java bean。或者说,就像在 JBoss Drools 参考手册 中撰写的那样:

规则引擎根本没有 “克隆” 事实,它是一天结束时的所有引用/指针 (pointer)。事实是您的应用程序数据。没有 getter 和 setter 的 Strings 和其他类不是有效的 Fact,不能和 Field Constraints 一起使用,Field Constraints 依靠 getter 和 setter 的 JavaBean 标准与对象进行交互。

除非您在规则之上已经指定了关键字 no-looplock-on-active,否则在工作内存中发生事实更改时,不能在任何时候重新评估 Drools 规则引擎中的规则。您还可以使用 @PropertyReactive@watch 注释来指定事实属性,Drools 应该监视这些属性来应对更改。Drools 会忽略对事实的其他所有属性的更新。

出于实际维护的目的,有三种方法来安全更新 Drools 工作内存中的事实

  1. 在 Drools 语法中,右边 (RHS) 是规则的操作/结果部分,您可以在 modify 块内对其进行更新。将一个事实更改为将要激活的规则的结果时,会使用这个方法。
  2. 从外部通过 Java 类中的 FactHandle;用于由应用程序 Java 代码执行的事实更改。
  3. Fact 类实现 PropertyChangeSupport,就像 JavaBeans 规范定义的那样;使用此方法将 Drools 注册为 Fact 对象的 PropertyChangeListener

作为安静的观察者,我们的规则不会更新 Drools 工作内存中的任何 JPA 实体事实;相反,它们会将逻辑事实生成为推理结果。(参见下列的 清单 6。)但是,更新规则中的 JPA 实体时需要特别注意,因为更新的实体可能处于分离状态,或者没有事务或只读事务与当前线程有关联。因此,对实体所做的更改将不会保存到数据库中。

尽管事实对象是因为引用而被传递,Drools(与 JPA/Hibernate 不同)不能跟踪超出规则之外的事实更改。您可以通过使用 FactHandle 来通知 Drools 有关在应用程序 Java 代码中所做的事实更改,从而避免产生不一致的规则推理结果。Drools 接着会对规则进行适当的重新评估。FactHandle 是表示您在工作内存中断言的事实对象的标记。这就是您希望修改或取消一个事实时与工作内存的正常交互方式。在样例应用程序(清单 2清单 3)中,我们使用 FactHandle 来操作工作内存中的实体事实。

您可以通过实现 PropertyChangeSupport(它捕获了对 bean 的属性所做的每一项更改)来解决 Drools 无法跟踪事实更改的问题。但是,请记住,这也是随后由于执行频繁的规则重新评估而需要解决的性能问题。

使用 JPA 实体作为事实

您可以通过 POJO 事实将 JPA 实体作为域数据对象插入到 Drools 的工作内存中。这样做可以让您避免对 Value Object/DTO 层以及 JPA 实体和 DTO 之间的相应转换层进行数据建模。

将实体用作事实会简化应用程序代码,您必须额外注意 “实体-生命周期” 阶段。实体事实应当保存为受管(持久)状态或分离状态。永远不要将临时的实体插入到 Drools 工作内存中,因为它们还未保存到数据库中。同样,应当从工作内存中收回已删除的实体。否则应用程序数据库和规则引擎的工作内存会不同步。

因此,这会带来一些严重的问题:我们如何才能有效通知规则引擎有关通过 FactHandle 在应用程序代码中所做的实体更改?

命令式(Imperative)编程与 AOP 的比较

如果想通过命令式编程的方式来应对这个挑战,我们需要结束在紧邻相应 JPA API 方法的知识会话上调用 insert()update()retract() 方法。这种方法应该是 Drools API 的一种入侵性用法,而且应将 spaghetti 代码留在应用程序中。更糟糕的是,JPA 中已更新的(脏)实体在读取/写入事务结束时会自动与数据库同步运行,未对持久上下文进行任何显式调用。我们如何才能拦截这些更改并通知给 Drools?另一个选择是,在单独的进程中轮询实体更改,就像典型的商业智能 (BI) 工具所做的那样,这会使核心业务功能保持干净,但实现此操作很困难,成本较高,结果也不会是即时的。

JPA EntityListener 是一种 AOP 拦截器,非常适合我们的用例。在 清单 2 中,我们将定义两个 EntityListener,它们会拦截对应用程序中三种类型的实体所做的所有更改。这种方法使得 JPA 中实体 的生命周期与其在 Drools 中的生命周期不断地保持同步。

在 “实体-生命周期” 回调方法中,我们为给定的实体实例查找一个 FactHandle,然后根据 JPA 生命周期阶段通过返回的 FactHandle 来更新或收回事实。如果 FactHandle 缺失,则会将实体作为新事实 插入到工作内存中,以实现事实更新或持久保存。由于工作内存中不存在实体,因此在调用 JPA 删除时也没有必要将其从工作内存中删除。清单 2 中所示的两个 JPA EntityListener 用于工作内存中的两个不同的入口点,或者分区。第一个入口点在 MemberCaseClinician 之间共享,第二个入口点用于 CaseSupervision 事件类型。

清单 2. EntityListeners
@Configurable
public class DefaultWorkingMemoryPartitionEntityListener 
{
  @Value("#{ksession}") //unable to make @Configurable with compile time weaving work here
  private StatefulKnowledgeSession ksession;   
   
  @PostPersist
  @PostUpdate
  public void updateFact(Object entity)
  {       
    FactHandle factHandle = getKsession().getFactHandle(entity);
    if(factHandle == null)
      getKsession().insert(entity);
    else
      getKsession().update(factHandle, entity);
  }        
		   
  @PostRemove
  public void retractFact(Object entity)
  {
    FactHandle factHandle = getKsession().getFactHandle(entity);
    if(factHandle != null)
      getKsession().retract(factHandle);
  }
 
  public StatefulKnowledgeSession getKsession() 
  {
    if(ksession != null)
    {
      return ksession;
    }
    else
    {
      // a workaround for @Configurable
      setKsession(ApplicationContextProvider.getApplicationContext()
        .getBean("ksession", StatefulKnowledgeSession.class));
      return ksession;
    }
  }
  //...
}
 
@Configurable
public class SupervisionStreamWorkingMemoryPartitionEntityListener
{ 
  @Value("#{ksession}")  
  private StatefulKnowledgeSession ksession;   
	
  @PostPersist 
  // CaseSupervision is an immutable event, 
  // thus we don’t provide @PostUpdate and @PostRemove implementations.
  public void insertFact(Object entity)
  {   
    WorkingMemoryEntryPoint entryPoint = getKsession()
      .getWorkingMemoryEntryPoint("SupervisionStream");
    entryPoint.insert(entity);
  }        
  //...
}

就像 AOP 一样,清单 2 中的 EntityListener 方法使系统的核心业务逻辑保持干净。注意,这种方法需要一个或多个 Drools 全局知识会话来注入到两个 EntityListener 中。在本文的后面部分中,我们将一个知识会话声明为一个单态(singleton)的 Spring bean。

提示:共享的全局知识会话

从本质上来说,共享的全局知识会话使得 EntityListener 方法适合于系统范围、BI 数据分析和分析需求。它同样不适合特定于用户流程以及在线购物系统之类的规则执行,在这些系统中,通常会动态生成知识会话来处理用户指定的数据,然后将其解决。

初始化工作内存

在启动应用程序后,三种实体类型的所有现有记录都将从数据库预加载到用于规则执行的工作内存中,如 清单 3 所示。从那时起,会向工作内存通知通过两个 EntityListener 对实体所做的任何更改。

清单 3. 初始化工作内存并运行 Drools 查询
@Service("droolsService")
@Lazy(false)
@Transactional
public class DroolsServiceImpl 
{
  @Value("#{droolsServiceUtil}")
  private DroolsServiceUtil droolsServiceUtil;
    
  @PostConstruct
  public void launchRules()
  {
    droolsServiceUtil.initializeKnowledgeSession();
    droolsServiceUtil.fireRulesUtilHalt();    
  }
   
  public Collection<TransientReminder> findCaseReminders()
  {
    return droolsServiceUtil.droolsQuery("CaseReminderQuery", 
      "caseReminder", TransientReminder.class, null);
  }
   
  public Collection<TransientReminder> findClinicianReminders()
  {
    return droolsServiceUtil.droolsQuery("ClinicianReminderQuery", 
      "clinicianReminder", TransientReminder.class, null);
  }
}  
 
@Service
public class DroolsServiceUtil
{
  @Value("#{ksession}")
  private StatefulKnowledgeSession ksession;
            
  @Async
  public void fireRulesUtilHalt()
  {
    try{
      getKsession().fireUntilHalt(); 
    }catch(ConsequenceException e) 
    {
      throw e;
    }
  }
   
  public void initializeKnowledgeSession()
  {  
    getKsession().setGlobal("droolsServiceUtil", this);
    syncFactsWithDatabase();
  }

  @Transactional //a transaction-scoped persistence context
  public void syncFactsWithDatabase()
  {
    synchronized(ksession)
    {       
      // Reset all the facts in the working memory
      Collection<FactHandle> factHandles = getKsession().getFactHandles(
        new ObjectFilter(){public boolean accept(Object object)
        {
          if(object instanceof MemberCase)
            return true;
          return false;
        }
      });
      for(FactHandle factHandle : factHandles)
      {
        getKsession().retract(factHandle);
      }

      factHandles = getKsession().getFactHandles(
        new ObjectFilter(){public boolean accept(Object object)
        {
          if(object instanceof Clinician)
            return true;
          return false;
        }
      });
      for(FactHandle factHandle : factHandles)
      {
        getKsession().retract(factHandle);
      }           

      WorkingMemoryEntryPoint entryPoint = getKsession()
        .getWorkingMemoryEntryPoint("SupervisionStream");
      factHandles = entryPoint.getFactHandles();
      for(FactHandle factHandle : factHandles)
      {
        entryPoint.retract(factHandle);
      }               

      List<Command> commands = new ArrayList<Command>();
      commands.add(CommandFactory.newInsertElements(getMemberCaseService().findAll()));
      getKsession().execute(CommandFactory.newBatchExecution(commands));

      commands = new ArrayList<Command>();
      commands.add(CommandFactory.newInsertElements(getClinicianService().findAll()));
      getKsession().execute(CommandFactory.newBatchExecution(commands));    
	 
      for(CaseSupervision caseSupervision : getCaseSupervisionService().findAll())
      {
        entryPoint.insert(caseSupervision);
      }  
           
    }
  }
 
  public <T> Collection<T> droolsQuery(String query, String variable, 
    Class<T> c, Object... args)
  {
    synchronized(ksession)
    {       
      Collection<T> results = new ArrayList<T>();
      QueryResults qResults = getKsession().getQueryResults(query, args);  
      for(QueryResultsRow qrr : qResults)
      {
        T result = (T) qrr.get("$"+variable);
        results.add(result);
      }       
      return results;
    }
  }
}

有关 fireAllRules() 的注意事项

请注意,在 清单 3 中,我们拥有在各个 EntityListener 的回调方法中调用 fireAllRules() 的选项。在一个急切加载 (eager-loaded) 的 Spring bean 的 “@PostConstruct” 方法中,我只调用了一次 fireUntilHalt() 方法就简化了这一步骤。fireUtilHalt 方法应该在单独的线程中调用一次(查看 Spring 的 @Async 注释),随后不断触发规则激活,直至调用停止。如果没有需要触发的激活,fireUtilHalt 会等待将激活添加到一个激活议程组或规则流组中。

我可以选择在应用程序的 Spring XML 配置文件(如下所示)中触发规则,甚至是启动流程。然而,我在尝试配置 fireUntilHalt() 方法时检测到了一个可能的线程处理问题。在对懒惰式加载 (lazy-loading) 实体关系进行规则评估期间,结果是一个 “数据库连接已关闭的错误”(参见高级主题)。

Spring-Drools 集成

现在,让我们花一些时间来看看 Spring-Drools 集成的一些配置细节。清单 4 是应用程序的 Maven pom.xml 的一个代码段,包括用于 Drools 内核、Drools 编译器和 Drools Spring 集成包的依赖关系:

清单 4. 部分 Maven pom.xml
<dependency>
  <groupId>org.drools</groupId>
  <artifactId>drools-core</artifactId>
  <version>5.4.0.Final</version>
  <type>jar</type>
</dependency>               
<dependency>
  <groupId>org.drools</groupId>
  <artifactId>drools-compiler</artifactId>
  <version>5.4.0.Final</version>
  <type>jar</type>
</dependency>
<dependency> 
  <groupId>org.drools</groupId> 
  <artifactId>drools-spring</artifactId> 
  <version>5.4.0.Final</version> 
  <type>jar</type> 
  <exclusions>
    <!-- The dependency pom includes spring and hibernate dependencies by mistake. -->	
  </exclusions>
</dependency>

身份与等同性的比较

清单 5 中,我将一个全局有状态知识会话配置为一个单态的 Spring bean。(一个无状态知识会话不会充当一个持续时间很长的会话,因为它在迭代调用期间没有保持其状态。)清单 5 中需要注意的一个重要设置是 <drools:assert-behavior mode="EQUALITY" />

在 JPA/Hibernate 中,托管实体将与身份(identity) 进行比较,而分离的实体将与等同性(equality) 进行比较。插入到有状态实体会话中的实体快速从 JPA 角度分离。因为与单态的有状态知识会话的生命期相比,一个事务范围的持续上下文,甚至是一个 “扩展的” 或 “流范围的” 持续上下文(参见 参考资料)是临时的。每次通过不同的持续上下文对象取得的同一个实体是不同的 Java 对象。默认情况下,Drools 使用的是身份比较。因此,当通过 ksession.getFactHandle(entity) 在工作内存中的现有实体事实上查看 FactHandle 时,Drools 很可能不会找到匹配项。为了与分离的实体相匹配,我们必须在配置文件中选择 EQUALITY

清单 5. 部分 Spring applicationContext.xml
<drools:kbase id="kbase">
  <drools:resources>
    <drools:resource  type="DRL" source="classpath:drools/rules.drl" />
  </drools:resources>
  <drools:configuration>
    <drools:mbeans enabled="true" />
    <drools:event-processing-mode mode="STREAM" />
    <drools:assert-behavior mode="EQUALITY" />
  </drools:configuration>
</drools:kbase>
<drools:ksession id="ksession" type="stateful" name="ksession" kbase="kbase" />

看看应用程序源代码,了解更完整的配置细节。

Drools 规则

清单 6 定义了两个复杂的事件处理 (CEP) 规则。除了类似 JPA 的两个事实类型之外,MemberCaseClinicianCaseSupervision 实体类被声明为一个事件。临床医生执行的每个案例评估任务都会生成一个 CaseSupervision 记录。创建记录之后,不可能一直不断地对其进行更改。

清单 6 中的 Case Supervision 规则的条件可用来测试在过去的 30 天内案例上是否已经存在案例监督。如果没有,规则的结果/措施部分会生成一个 TransientReminder 事实(在 清单 7 中定义),并从逻辑上将事实插入工作内存。Clinician Supervision 规则指示,临床医生在过去的七天内应当已经完成至少一个案例监督;如果没有完成,规则的结果/措施部分会生成一个类似的 TransientReminder 事实,并从逻辑上插入到工作内存中。

清单 6. 案例监督规则
package ibm.developerworks.article.drools;

import ibm.developerworks.article.drools.service.*
import ibm.developerworks.article.drools.domain.*
 
global DroolsServiceUtil droolsServiceUtil;

declare Today
  @role(event)
  @expires(24h)
end

declare CaseSupervision
  @role(event)
  @timestamp(entryDtm)
end

rule "Set Today"
  timer (cron: 0 0 0 * * ?)
  salience 99999  // optional
  no-loop
  when
  then
    insert(new Today()); 
end

rule "Case Supervision"
  dialect "mvel"
  when
    $today : Today()
    $memberCase : MemberCase(endDtm == null, startDtm before[30d] $today)
    not CaseSupervision(memberCase == $ memberCase) 
      over window:time(30d) from entry-point SupervisionStream
    then
      insertLogical(new TransientReminder($memberCase, (Clinician)null, 
        "CaseReminder", "No supervision on the case in last 30 days."));
end
 
query "CaseReminderQuery"
  $caseReminder : TransientReminder(reminderTypeCd == "CaseReminder")
end
 
rule "Clinician Supervision"
  dialect "mvel"
  when
    $clinician : Clinician()
    not CaseSupervision(clinician == $clinician) 
      over window:time(7d) from entry-point SupervisionStream
  then
    insertLogical(new TransientReminder((MemberCase)null, $clinician, 
      "ClinicianReminder", "Clinician completed no evaluation in last 7 days."));
end
 
query "ClinicianReminderQuery"
  $clinicianReminder : TransientReminder(reminderTypeCd == "ClinicianReminder")
end

请注意,清单 7 中所示的 TransientReminder 事实不是一个 JPA 实体,而是一个常规的 POJO。

清单 7. TransientReminder
public class TransientReminder implements Comparable, Serializable
{			
  private MemberCase memberCase;
  private Clinician clinician;
  private String reminderTypeCd;
  private String description;

  public String toString() 
  {
    return ReflectionToStringBuilder.toString(this);
  }

  public boolean equals(Object pObject) 
  {
    return EqualsBuilder.reflectionEquals(this, pObject);
  }

  public int compareTo(Object pObject) 
  {
    return CompareToBuilder.reflectionCompare(this, pObject);
  }

  public int hashCode() 
  {
    return HashCodeBuilder.reflectionHashCode(this);
  } 	
}

事实与事件的比较

事件是使用 @timestamp@duration@expires 之类的时态元数据进行装饰的事实。事实和事件之间最重要的区别是,事件在 Drools 上下文中是不可变的。如果一个事件受更改的制约,那么更改(描述为 “事件数据补充”)不应当影响规则执行的结果。这就是我们在 CaseSupervisionEntityListener 中只监视 @PostPersist 实体生命周期阶段的原因(参见 清单 2)。

Drools 对 Sliding Windows 协议的支持使得事件对时态推理特别有用。滑动窗口 是为感兴趣的事件制定作用域的一种方式,就好像它们属于一个不断移动的窗口一样。两种最常见的滑动窗口实现是基于时间的窗口和基于长度的窗口。

清单 6 中所示的样例规则中,over window:time(30d) 建议,过去 30 天中创建的 CaseSupervision 事件由规则引擎进行评估。一旦过了 30 天,不可变的事件将永远不会再次进入到窗口中,而且 Drools 将自动从工作内存中收回这些事件,并对相应的规则进行重新评估。由于事件是不可变的,所以 Drools 会自动管理事件生命周期。因而事件比事实更具有内存效率。(但是,请注意,您必须在 Drools-Spring 配置中将事件处理模式设置为 STREAM;否则滑动窗口之类的时态操作符会停止工作。)

使用已声明的类型

清单 6 中需要注意的其他一些事项包括:MemberCase 事实(不属于事件类型)也针对事件约束进行了评估,就像我们只评估之间 30 多天内创建的案例一样。某个案例可能今天已经存在 29 天了,但明天就是 30 天了,这就意味着必须在每一天的一开始就对 Case Supervision 规则进行重新评估。不幸的是,Drools 并不提供 “今天” 变化的量。因此,作为一个变通方案,我添加了一个名为 Today 的事件类型;这是一个 Drools 声明类型,或是一个用规则语言(而不是用 Java 代码)声明的数据构造。

这种特殊的事件类型根本不声明任何显式属性,除了一个隐式的 @timestamp 元数据,后者是在 Today 事件断言到工作内存中时进行自动填充。另一个元数据 @expires(24h) 指定,某个 Today 事件会在断言后的 24 小时内到期。

要想在每天的一开始重设 Today,还需要在 Set Today 规则之上添加了一个 timer。先激活这个规则,然后就会在每天的一开始触发该规则来插入一个新鲜的 Today 事件,该事件将取代刚刚期满的事件。随后,新鲜的 Today 事件会触发 Case Supervision 规则的重估。还要注意的是,如果规则的条件中没有出现事实更改,那么计时器本身无法触发规则的重估。计时器也不能重新评估函数或内联的 eval,因为 Drools 将这些构造函数的返回结果作为时间常量,并缓存它们的值。

何时使用事实与事件的比较

了解事实和事件之间的区别有助于我们轻松决定何时使用每种类型:

  • 在某个时间点或某段持续时间内,当数据表示系统状态的不可变快照时,应该将事件 用于场景,该事件是时间敏感的,并且会很快到期,或可以预计数据量会快速且持续增长。
  • 在数据对业务域至关重要的地方,以及数据将体验正在进行的更改并且这些更改要求持续重估规则的时候,应该将事实 用于场景。

Drools 查询

下一个步骤是提取规则执行结果,这通过查询工作内存中的事实来完成。(一种替代方法是通过调用规则语法右手边的 global 上的方法,让规则引擎将结果传递给应用程序。)在这个例子中,事实断言和规则触发都立即发生,没有任何延迟,这就确保了我们在 清单 6 中的查询会返回实时的报告。因为 TransientReminder 事实是通过逻辑方式断言的,所以在它们的条件再无法得到满足时,规则引擎会自动从工作内存中收回它们。

可以说提醒 是在早上由一个规则引擎在特定的案例上生成的。随后,我们在 Java 代码中执行查询 “CaseReminderQuery”,如 清单 3 所示,因此会返回一个提醒,并向系统中的所有临床医生显示该提醒。如果在下午某个临床医生完成了案例上的一个评估,并生成了一个新的案例监督记录,该事件会打破用于提醒 事实的条件。Drools 随后会自动收回它。我们要确定该提醒事实消失了,方法是在完成案例评估之后立即运行相同的查询。逻辑断言可以使推理结果保持最新,而且规则引擎运行在内存效率模式之下,这与事件的行为非常相像。

逻辑事实计数器

请注意,每一个以逻辑方式断言的事实都附带一个计数器,该计数器在每次断言一个同等 事实时都增加。如果不再保持反复断言同等 事实的众多规则中的某个规则,那么用于逻辑事实的计数器就会减少。当计数器达到 0 时,就会自动收回该事实。

现场查询 更是锦上添花。让一个现场查询处于打开状态,这会创建一个查询结果视图,并发布给定视图内容的更改事件。这意味着现场查询恰好需要运行一次,由此产生的结果视图会使用由规则引擎发布的正在进行的更改来实现自动更新。

到目前为止,您可能已经发现,只需一点点有关 Drools、JPA 和 Spring 的背景知识,就可以轻松实现一个持续的实时数据分析应用程序。我们将通过一些将改进我们的案例管理解决方案的高级编程步骤来结束本文。

高级 Drools 编程

管理关系

FactHandle 的一个有趣的约束条件是它只与当前事实相关联,与事实的嵌套关系没有关联。Drools 会通过其在 getKsession().update(factHandle, memberCase) 中的 FactHandle,了解对 MemberCaseid(尽管这永远不可能发生,因为主键是不可变的)、startDtmendDtm 所做的更改。然而,在调用同一个方法时,不会通知 Drools 有关对 membercaseSupervisions 属性所做的更改。

同样,系统不会通知 JPA 中的 EntityListener 通知有关一对多和多对多关系的更改。这是因为外键位于相关 表或链接 表中。

为了根据更新的事实与这些关系建立连接,我们可以构建递归逻辑,获取每个嵌套关系的 FactHandle。一个更好的解决方案是将 EntityListener 放置在与规则条件有关中的所有实体上(包括链接表)。我们使用 MemberCaseSupervision 来完成此操作,其中更改由每个实体自己的 EntityListenerFactHandle 来处理的(参见 清单 2清单 3)。

规则评估期间的实体懒惰式加载

除非我们已经指定一个知识库分区(也就是说,可以执行并行处理),否则不会在调用 ksession.insert()ksession.update()ksession.retract() 的同一个线程中对规则进行评估。清单 2清单 3 中的事实断言都发生在事务上下文中,在该上下文中,事务范围的 JPA 持久上下文(Hibernate 会话)是可用的。这就允许规则引擎跨懒惰式加载实体关系进行评估。如果启用了一个知识库分区,则必须将实体关系配置为急切加载,以避免产生 JPA LazyInitializationException

启用事务

默认情况下,Drools 不支持事务,因为它在工作内存中不保存任何历史快照。这对我们的 EntityListener 来说是一个问题,因为生命周期回调方法是在数据库刷新之后、但在事务提交之前调用的。如果事务被回滚又会怎么样呢?如果那样的话,JPA 持久上下文中的实体将变为分离的实体,且与数据库表中的行不一致,而且工作内存中的行也是如此。规则引擎推理结果将不再是可信的。

启用事务通过确保工作内存中的数据库和应用程序数据库始终同步,且规则推理结果始终准确,使我们的案例管理系统具有防弹功能。在 Drools 中,适当应用 JPA 和 JTA 实现以及类路径中的一个 “drools-jpa-persistence” 包,可以配置一个 JPAKnowledgeService(参阅 参考资料)来创建我们的有状态知识会话。具有流程实例、变量和事实对象的整个有状态知识会话可以映射为表 “SessionInfoThe”(将 ksessionId 作为主键)中行的一个二进制列。

当我们通过注释或 XML 在应用程序中指定事务边界时,应用程序启动的事务会传播到规则引擎。无论什么时候发生事务回滚,有状态知识会话都会恢复到数据库中保存的以前的状态。这维护了应用程序数据库和 Drools 数据库之间的一致性和集成。当同时从多个 JTA 事务中进行访问时,内存中的单个有状态知识会话应当像 REPEATABLE READ 一样运转;否则,单个 SessionInfo 实体实例可能具有一些从不同事务所做的混合状态更改,这会打破事务划分。请注意,自编写 REPEATABLE READ 起,就不能确定它是否能够通过 drools-jpa-persistence 包的事务管理器来实现。

集群

如果应用程序要在集群环境下运行,前面描述的方法很快就会失败。每个嵌入式规则引擎的实例都会接收同一个节点上发生的实体事件,这会导致不同节点上的工作内存不同步。我们可以使用一个通用的远程 Drools 服务器(参阅 参考资料)来解决这个问题。不同节点上的实体实例会通过 REST/SOAP Web 服务通信向集中式 Drools 服务器发布其所有的事件。随后应用程序可以从 Drools 服务器订阅推理结果。请注意,Drools 服务器中 SOAP 的 Apache CXF 实现目前不支持 ws-transaction。考虑到针对该真实用例概述的义务性事务需求,我希望很快就能提供这方面的支持。

结束语

在本文中,您有机会汇总一些您已经了解的有关在 Spring 和 JPA 中进行 POJO 编程的知识,同时本文还汇总了一些在 Drools 5 中可用的新特性。我已经演示了如何巧妙使用 EntityListener(一个全局 Drools 会话)和 fireUtilHalt() 方法,用它们来开发一个基于 POJO 的持续实时数据分析应用程序。您已经了解了核心的 Drools 概念,比如致力于 “事实与事件的比较” 之类的主题,还了解了如何编写逻辑断言,以及更高级的主题和使用,比如,事务管理和将 Drools 实现扩展到一个集群环境中。请参阅 应用程序源代码,了解有关 Drools 5 的更多信息。


下载

描述名字大小
本文的样例代码j-drools5-src.zip5KB

参考资料

学习

获得产品和技术

  • 下载 Drools 5:本文中使用了 Drools Expert 和 Drools Fusion,它们可以实现规则引擎和 CEP 框架。

讨论

  • 加入 developerWorks 中文社区。查看开发人员推动的博客、论坛、讨论组和维基,并与其他 developerWorks 用户交流。

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Java technology, Open source
ArticleID=831053
ArticleTitle=使用 Drools 和 JPA 实现持续的实时数据分析
publish-date=08202012