使用 GPars 解决常见并发问题

了解 Groovy 的并发库如何利用流行的并发模型并使其在 Java 平台上可供访问

向多核处理技术的转变推动了人们对 fork/join、actors、agents 和 executors 等并发性模型的关注。虽然这些模型源自于不同的编程语言,但它们中有许多被封装在了 GPars 中,即一个基于 Groovy 的并发库中。Alex Miller 将指导您了解如何使用从函数和面向对象的编程中汲取的技术来解决并发性问题 — 一切皆使用 Groovy 的熟悉的、Java™友好的语法。

Alex Miller, 架构师兼高级工程师, Revelytix

Alex Miller 在 Revelytix 工作,致力于构建联邦语义 web 查询产品。在加入 Revelytix 之前,Alex 是 Terracotta 的技术领导,BEA Systems 的工程师,以及 MetaMatrix 的首席架构师。他的兴趣包括 Java 技术、并发性、分布式系统、语言和软件设计。Alex 喜欢以 @puredanger 的身份发布消息和在 Pure Danger Tech 上撰写博客。在密苏里州圣路易斯市,Alex 是 Lambda Lounge小组的创建者,该小组致力于研究基础的和动态的语言,他还创办了 Strange Loop 开发者大会。



2010 年 10 月 15 日

在并发性时代,带有 4、6 和 16 个处理器核心的芯片变得很普遍,而且在不久的将来,我们会看到带有上百甚至上千个核心的芯片。这种处理能力蕴含着巨大的可能性,但对于软件开发人员来说,它也带来了挑战。最大限度地利用这些闪耀新核的需求推动了对并发性、状态管理和为两者构建的编程语言的关注热潮。

Groovy、Scala 和 Clojure 等 JVM 语言满足了这些需求。这三种都是较新的语言,运行于高度优化的 JVM 之上,可以使用 Java 1.5 中新增的强大的 Java 并发库。尽管每种语言基于其原理采用不同的方法,不过它们都积极支持并发编程。

在本文中,我们将使用 GPars,一种基于 Groovy 的并发库,来检查模型以便解决并发性问题,比如后台处理、并行处理、状态管理和线程协调。

为何选择 Groovy ?为何选择 GPars ?

Groovy 是运行于 JVM 之上的一种动态语言。基于 Java 语言,Groovy 移除了 Java 代码中的大量正式语法,并添加了来自其他编程语言的有用特性。Groovy 的强大特性之一是它允许编程人员轻松创建基于 Groovy 的 DSL。(一个 DSL 或域特定语言是一种旨在解决特定编程问题的脚本语言。参阅 参考资料了解有关 DSL 的更多信息。)

获取代码和工具

参阅 参考资料部分下载 Groovy、GPars 和本文中用到的其他工具。您可以随时下载本文的 可执行代码样例

GPars 或 Groovy Parallel Systems 是一种 Groovy 并发库,捕捉并发性和协调模型作为 DSL。GPars 的构思源自其他语言的一些最受欢迎的并发性和协调模型,包括:

  • 来自 Java 语言的 executors 和 fork/join
  • 来自 Erlang 和 Scala 的 actors
  • 来自 Clojure 的 agents
  • 来自 Oz 的数据流变量

Groovy 和 GPars 的结合成为展示各种并发性方法的理想之选。甚至不熟悉 Groovy 的 Java 开发人员也能轻松关注相关讨论,因为 Groovy 的语法以 Java 语言为基础。本文中的示例基于 Groovy 1.7 和 GPars 0.10。


后台和并行处理

一个常见的性能难题是需要等待 I/O。I/O 可能涉及到从一个磁盘、一个 web 服务或甚至是一名用户读取数据。当一个线程在等待 I/O 的过程中被阻止时,将等待中的线程与原始执行线程分离开来将会很有用,这将使它能继续工作。由于这种等待是在后台发生的,所以我们称这种技术为 后台处理

例如,假设我们需要这样一个程序,即调用 Twitter API 来找到针对若干 JVM 语言的最新 tweets 并将它们打印出来。Groovy 能够使用 Java 库 twitter4j 很容易就编写出这样的程序,如清单 1 所示:

清单 1. 串行读取 tweets (langTweets.groovy)
import twitter4j.Twitter 
 import twitter4j.Query 

 @Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10') 
 def recentTweets(api, queryStr) { 
  query = new Query(queryStr) 
  query.rpp = 5 		 // tweets to return 
  query.lang = "en"		 // language 
  tweets = api.search(query).tweets 
  threadName = Thread.currentThread().name 
  tweets.collect { 
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  } 
 } 

 def api = new Twitter() 
 ['#erlang','#scala','#clojure'].each { 
  tweets = recentTweets(api, it) 
  tweets.each { 
    println "${it}"
  } 
 }

清单 1中,我首先使用了 Groovy Grape(参阅 参考资料)来捕获 twitter4j 库依赖性。然后定义了一个 recentTweets方法来接受查询字符串并执行该查询,返回一列格式化为字符串的 tweets。最后,我遍历了标记列表中的每个标记,获取了 tweets 并将它们打印出来。由于未使用线程,该代码串行执行每个搜索,如图 1 所示:

图 1. 串行读取 tweets
一个用于串行读取 tweets 的程序模型。

并行处理

如果 清单 1中的程序静候等待,它也可能不止等待一个处理。如果我将每个远程请求放入一个后台线程中,程序会并行等待每个响应查询,如图 2 所示:

图 2. 并行读取 tweets
用于并行读取 tweets 的一个改进的程序模型。

GPars Executors DSL 使得我们更容易将 清单 1中的程序从串行处理转换为并行处理,如清单 2 所示:

清单 2. 并行读取 tweets (langTweetsParallel.groovy)
 import twitter4j.Twitter 
 import twitter4j.Query 
 import groovyx.gpars.GParsExecutorsPool 

 @Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10') 
 @Grab(group='org.codehaus.gpars', module='gpars', version='0.10') 
 def recentTweets(api, queryStr) { 
  query = new Query(queryStr) 
  query.rpp = 5 		 // tweets to return 
  query.lang = "en"		 // language 
  tweets = api.search(query).tweets 
  threadName = Thread.currentThread().name 
  tweets.collect { 
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  } 
 } 

 def api = new Twitter() 
 GParsExecutorsPool.withPool { 
  def retrieveTweets = { query -> 
    tweets = recentTweets(api, query) 
    tweets.each { 
      println "${it}"
    } 
  } 

  ['#erlang','#scala','#clojure'].each { 
    retrieveTweets.callAsync(it) 
  } 
 }

我使用 Executors DSL 为 GParsExecutorsPool添加了一段 import语句,并使用 Groovy Grape 依赖性系统添加了一段 Grab语句来捕获 GPars 库。然后我引入了一个 GParsExecutorsPool.withPool块,这将增强块中的代码以添加额外功能。在 Groovy 中,可以使用 call方法来调用闭包。GParsExecutorsPool将在底层内存池中作为任务执行通过 call 方法调用的闭包。此外,GParsExecutorsPool通过一个 callAsync方法增强闭包,被调用后该方法在无阻塞的情况下立即返回。在本例中,我包装了我的 tweet 搜索并将操作包装到一个闭包中,然后为每个查询异步调用它。

由于 GPars 将这些任务分配给了一批作业员,我现在可以并行执行我的所有搜索了(只要内存池足够大)。我也可以并行处理每个查询所得结果,在其到达时打印到屏幕。

该例展示了后台处理能够提高性能和响应能力的两种方式:I/O 等待并行完成,依赖于该 I/O 的处理也并行发生。


Executors

您可能想知道什么是 executor,GParsExecutorsPool中的后台处理如何工作。Executor实际上是 Java 5 中引入的 java.util.concurrent库的一部分。(参阅 参考资料)。java.util.concurrent.Executor接口仅有一个方法:execute(Runnable)。它存在的目的在于将任务的提交与一个 Executor实现中任务的实际执行方式分离开来。

java.util.concurrent.Executors类提供许多有用的方法来创建各种不同的线程池类型支持的 Executor实例。GParsExecutorsPool默认使用一个含守护进程线程和固定数目线程的一个线程池(Runtime.getRuntime().availableProcessors() + 1)。当默认设置不合适时,要更改线程池大小、使用一个自定义 ThreadFactory或指定现有的整个 Executor池(withExistingPool)很简单。

要更改线程数,我可以将 GParsExecutorsPool线程计数传递给 withPool方法,如清单 3 所示:

清单 3. 使用线程计数启动一个 Executor 池
GParsExecutorsPool.withPool(8) { 
  // ... 
 }

或者如果我想传递一个具有专门命名线程的自定义线程工厂,我可以像清单 4 这样做。

清单 4. 使用自定义线程工厂启动一个 Executor 池
def threadCounter = new AtomicLong(0) 
 def threadFactory = {Runnable runnable -> 
  Thread thread = new Thread(runnable) 
  id = threadCounter.getAndIncrement() 
  thread.setName("thread ${id}") 
  return thread 
 } as ThreadFactory 

 def api = new Twitter() 
 GParsExecutorsPool.withPool(2, threadFactory) { 
  // ... 
 }

asynccallAsync方法不阻止线程并立即返回一个 Future对象来表示异步计算的未来结果。接收者可以要求 Future阻止线程,直至一个结果就绪,检测查看它是否完成并取消计算指令,或检查是否有其他进程取消了它。与 Executor接口一样,Future类是底层 java.util.concurrent包的一部分。


CPU 的并行性

在后台处理示例中,您了解了让一个程序并行等待多个 I/O 密集型任务(而非串行处理它们)的好处。使用一批作业员来并行执行多个任务对 CPU 密集型任务也大有裨益。

您的应用程序的两个重要方面影响着它可以接受的并行度,因此您具有的编程选项包括:

  • 任务粒度,即任务在时间或数据上的范围
  • 任务依赖性,即通常存在于任务之间的依赖关系的数量

两个方面存在于一个状态集上,且在制定一个解决方案之前考虑该状态集上的问题所在是很有用的。例如,一个程序的任务粒度可由大型事务级工作定义,它也可以由大型数据集一小部分(整幅图像的几个像素)中的多个简短的计算指令组成。由于任意工作量的一个线程或进程的上下文切换涉及到的开销较大,小粒度级的系统往往效率低下且遭遇低性能问题。基于任务粒度的批处理是为您的系统找到最佳性能点的一种方法。

依赖性较少的任务通常被描述为 “高度平行”,即太容易将它们分成多个并行任务了。典型的例子包括图形处理、强力搜索、分形和粒子模拟。任何处理或转换大批命令或文件的业务处理程序也可归入这一类。

Executor 队列争用

我们已经探究了如何使用 GPars 将任务推入 Executor池的机理。不过,最好要记住,Executor是在 2005 年某个时候添加到 Java 2 Platform、Standard Edition (J2SE) 的。它们是为相对少数的核心(2 到 8 个)而调优的,这些核心运行粗粒度的、可能会阻止具有较少任务相关性的事务型任务。Executor是在单个传入的工作队列由多个工作线程共享的情况下实现的。

对于该模型的一个关键问题是增加工作线程数会加剧对工作队列的争用(如图 3 所示)。这种争用最终会随线程和核心的增多而成为一个可伸缩性瓶颈。

图 3. Executor 队列争用
一个图表显示由 executor 队列争用引起的可伸缩性瓶颈。

Executor队列的一个替代项是 fork/join 框架,该框架目前存有 JSR 166y 维护更新(参阅 参考资料),且会在 JDK 7 中正式引入 Java 平台。Fork/join 是为运行细粒度计算任务的大量并发任务而调优的。


GPars 中的 Fork/join

Fork/join 支持定义任务间的依赖关系和生成新任务;这些属性使其成为分而治之风格算法的理想之选,该算法将一个任务分为若干子任务,然后重新将子计算指令联合起来(参阅 参考资料)。通过让一个线程拥有一个工作队列,Fork/join 解决了队列争用的问题。所有情况下使用的队列其实是一种 双队列(deque)(两端都能输入数据的数据行列,发音为 “deck”),它允许线程从另一个队列的后端窃取工作,从而平衡进入线程池的工作。

以查找列表中最大值的任务为例。最明显的战略是简单地遍历所有数字,在遍历过程中密切注意最大值。但是,这本质上是一种串行战略,且不利用那些昂贵的核心。

而如果我将最大值函数作为一个分而治之的并行算法来实现,想想会发生什么。分而治之算法是一种递归算法;每一步都具有如清单 5 所示的结构:

清单 5. 分而治之算法
IF problem is small enough to solve directly 
 THEN solve it directly 
 ELSE { 
  Divide the problem in two or more sub-problems 
  Solve each sub-problem 
  Combine the results 
 }

IF条件允许我改变每个任务的粒度。该风格的算法将生成一棵树,其中树叶由采用 THEN分支的任务定义。树中的内部节点是采用 ELSE分支的任务。每个内部节点必须等待(依赖于)其两个(或多个)子任务。fork/join 模型是专为这种算法设计的,即一棵依赖树中有大量任务在等待中。fork/join 中处于等待中的任务实际上不阻止线程。

GPars 允许我们通过执行 fork/join 任务来创建和执行 fork/join 算法,如清单 6 所示:

清单 6. max() (computeMax.groovy) 的并行 fork/join 实现
import static groovyx.gpars.GParsPool.runForkJoin 
 import groovyx.gpars.GParsPool 
 import groovyx.gpars.AbstractForkJoinWorker 

 @Grab(group='org.codehaus.gpars', module='gpars', version='0.10') 
 class Config { 
	 static DATA_COUNT = 2**14 
	 static GRANULARITY_THRESHHOLD = 128 
	 static THREADS = 4 
 } 

 items = [] as List<Integer> 
 items.addAll(1..Config.DATA_COUNT) 
 Collections.shuffle(items) 

 GParsPool.withPool(Config.THREADS) { 
  computedMax = runForkJoin(1, Config.DATA_COUNT, items.asImmutable()) 
    {begin, end, items -> 
      int size = end - begin 
      if (size <= Config.GRANULARITY_THRESHHOLD) { 
        return items[begin..<end].max() 
      } else { // divide and conquer 
        leftEnd = begin + ((end + 1 - begin) / 2) 
        forkOffChild(begin, leftEnd, items) 
        forkOffChild(leftEnd + 1, end, items) 
        return childrenResults.max() 
      } 
    } 
		
	 println "expectedMax = ${Config.DATA_COUNT}"
	 println "computedMax = ${computedMax}"
 }

注意,fork/join 在 groovyx.gpars.GParsPool类中拥有其特定的线程池。GParsPoolGParsExecutorsPool共享多个常见功能,但拥有 fork/join 所特有的功能。要直接使用 fork/join,您必须使用带有一个任务闭包的 runForkJoin()方法或一个将 AbstractForkJoinWorker分为子类的任务类。


并行集合

Fork/join 提供一种不错的方式来定义和执行并行任务结构,特别是分而治之算法中的那些结构。不过,您可能已经注意到,上面的示例中涉及到相当多的繁文缛节。我必须定义任务闭包、确定合适的任务粒度、分离子问题、综合结果,等等。

理论上,我希望在更高抽象级别工作,即定义一个数据结构,然后并行对其执行常见操作,无需在每种情况下定义管理细节的低级别任务。

JSR 166y 维护更新指定为此指定一个高级接口,称为 ParallelArrayParallelArray对一个数组结构提供常见的函数程序设计操作,这些函数通过一个 fork/join 池并行执行。

由于 API 的函数特性,传递一个函数(方法)给这些操作很有必要,这样就可以在 ParallelArray中的每个项目上执行它。JDK 7 仍在开发中的一项功能是对 λ 的支持,该功能允许开发人员定义和传递代码块。此时,JDK 7 中所含 ParallelArray的状态有待 λ 项目结果定夺(参阅 参考资料)。


GPars 中的 ParallelArray

Groovy 完全支持将代码块定义为闭包并将它们作为一类对象四处传递,因此使用 Groovy 和 GPars 在这种函数式样下工作很自然。ParallelArray和 GPars 支持一组核心的函数运算符:

  • map
  • reduce
  • filter
  • size
  • sum
  • min
  • max

另外,GPars 在一个 GParsPool块内扩展集合,为我们提供构建于原语之上的其他并行方法:

  • eachParallel
  • collectParallel
  • findAllParallel
  • everyParallel
  • groupByParallel

可以将并行集合方法透明化,以便标准集合方法默认情况下并行运作。这将允许您传递一个并行集合给现有(非并行)代码库并按原样使用该代码。不过,考虑这些并行方法使用的状态仍然很重要,因为外部代码可能不保证必要的同步运作。

再看一下 清单 6中的示例,注意到,max()是一个已经在并行集合中提供的方法,因此没有必要直接定义和调用 fork/join 任务,如清单 7 所示:

清单 7. 使用 GPars ParallelArray 函数(computeMaxPA.groovy)
import static groovyx.gpars.GParsPool.runForkJoin 
 import groovyx.gpars.GParsPool 

 @Grab(group='org.codehaus.gpars', module='gpars', version='0.10') 
 class Config { 
	 static DATA_COUNT = 2**14 
	 static THREADS = 4 
 } 

 items = [] as List<Integer> 
 items.addAll(1..Config.DATA_COUNT) 
 Collections.shuffle(items) 

 GParsPool.withPool(Config.THREADS) { 
	 computedMax = items.parallel.max() 
	 println "expectedMax = ${Config.DATA_COUNT}"
	 println "computedMax = ${computedMax}"
 }

使用 ParallelArray 的函数程序设计

现在假设我在一个库存系统中编写订单报告,计算过期的订单数以及这些订单的平均过期天数。我可以这么做:首先定义方法来确定订单是否过期(通过比较当前日期与到期日),然后计算今天与到期日之间的天数差额。

该代码的核心是使用核心的 ParallelArray方法计算所需数目的那一部分,如清单 8 所示:

清单 8. 充分利用并行函数运算符(orders.groovy)
GParsPool.withPool { 
  def data = createOrders().parallel.filter(isLate).map(daysOverdue) 
  println("# overdue = " + data.size()) 
  println("avg overdue by = " + (data.sum() / data.size())) 
 }

这里我选取了一个订单列表,将其转化为一个 ParallelArray,仅保留那些 isLate返回值为真的订单,并计算了每个订单的映射函数,以将其转化为过期天数。然后我可以使用内置的聚合函数来获取过期天数的大小和总和并计算平均值。该代码可能与使用函数程序设计语言编写的那些很类似,不过它的额外优势在于该代码是自动并行执行的。


管理状态

任何时候要处理将由多个线程读写的数据时,您必须考虑如何管理该数据并协调更改。用 Java 语言和其他语言管理共享状态的主导范例包括由锁或其他关键节标记保护的可变状态。

出于多种原因,可变状态和锁容易出问题。锁意味着要整理代码中的依赖性,使开发人员对于执行路径和预期结果有理可循。但是,由于锁的多个方面未加以实施,因此看到低质量代码是很常见的,这些代码包含可见性、安全发布、竞争条件、死锁和其他常见并发弊病方面的问题。

一个更严重的问题是,即使您一开始使用两个在并发性方面得到正确编写的组件,也有可能在结合它们时生成新的意外错误。因此编写构建于可变状态和锁之上、且该状态和锁随系统增长仍然可靠的并发系统很难。

在接下来几节,我将展示跨系统中的线程管理和共享状态的三个范例。从根本上讲,这些范例可以(且是)构建于线程和锁的基底之上,但是它们创建降低复杂度的更高的抽象级别。

我将要展示的三种方法是 actors、agents 和数据流变量,它们均受 GPars 支持。


Actors

actor 范例最初是在 Erlang 中普及的,而最近因在 Scala 中的使用而声名远扬(参阅 参考资料了解有关两种语言的更多信息)。Erlang 是于 20 世纪 80 年代和 90 年代在 Ericsson 专为 AXD301 电信交换机而设计的。设计这种交换机很有挑战性,需要考虑几个因素:高可靠性、无停机时间(热代码升级)、大规模并发性。

Erlang 以 “进程” 为前提,如同轻量级线程(而不像操作系统进程),但是不直接映射到本地线程。进程的执行由底层虚拟机调度。Erlang 中的进程内存较小,可快速启动和进行上下文切换。

Erlang actors 仅仅是在进程上执行的函数。Erlang 无共享内存,且其中的所有状态都是不变的。不可变数据是许多侧重于并发性的语言的关键方面,因为它有如此好的属性。不可变数据是不能更改的,因此读取不可变数据无需用到锁,即使有多个线程读取时也是如此。对不可变数据的更改包括构建数据新版本并从新版本开始工作。对于主要在共享可变状态语言(比如 Java 语言)方面有背景知识的开发人员来说,这一角度转变需要一些时间来适应。

状态是通过传递不可变消息在 actors 之间 “共享” 的。每个 actor 都有一个邮箱,而且通过在其邮箱中接收消息可反复执行 actor函数。消息的发送通常是异步的,尽管构建同步调用也很简单,而且有些 actor 实现提供这样的特性。

GPars 中的 Actors

GPars 使用来自 Erlang 和 Scala 的许多概念实现 actor 模型。GPars 中的 Actors 是从邮箱中使用消息的轻量级进程。根据消息是由 receive()react()方法使用,可以放弃或保留线程绑定。

在 GPars 中,可以利用接受闭包的 factory 方法或通过为 groovyx.gpars.actor.AbstractPooledActor划分子类来创建 actors。在 actor 中,应当由一个 act()方法。通常 act()方法包含一个无止境重复的循环,然后调用 react(面向轻量级 actor)或 receive(面向仍然与其线程绑定的重量级 actor)。

清单 9. 'Rock, Paper, Scissors' (rps.groovy) 的 actor 实现
package puredanger.gparsdemo.rps; 

 import groovyx.gpars.actor.AbstractPooledActor 

 enum Move { ROCK, PAPER, SCISSORS } 

 @Grab(group='org.codehaus.gpars', module='gpars', version='0.10') 
 class Player extends AbstractPooledActor { 
  String name 
  def random = new Random() 
  
  void act() { 
    loop { 
      react { 
        // player replies with a random move 
        reply Move.values()[random.nextInt(Move.values().length)] 
      } 
    } 
  } 
 } 

 class Coordinator extends AbstractPooledActor { 
  Player player1 
  Player player2 
  int games 
  
  void act() { 
    loop { 
      react { 
	 // start the game 
        player1.send("play") 
        player2.send("play") 
        
        // decide the winner 
        react {msg1 -> 
          react {msg2 ->          
            announce(msg1.sender.name, msg1, msg2.sender.name, msg2) 

            // continue playing 
            if(games-- > 0) 
              send("start") 
            else 
              stop() 
          } 
        } 
      } 
    } 
  } 
  
  void announce(p1, m1, p2, m2) { 
    String winner = "tie"
    if(firstWins(m1, m2) && ! firstWins(m2, m1)) { 
      winner = p1 
    } else if(firstWins(m2, m1) && ! firstWins(m1, m2)) { 
      winner = p2 
    } // else tie 
    
    if(p1.compareTo(p2) < 0) { 
      println toString(p1, m1) + ", " + toString(p2, m2) + ", winner = " + winner      
    } else { 
      println toString(p2, m2) + ", " + toString(p1, m1) + ", winner = " + winner 
    } 
  } 
  
  String toString(player, move) { 
    return player + " (" + move + ")"
  } 
  
  boolean firstWins(Move m1, Move m2) { 
    return (m1 == Move.ROCK && m2 == Move.SCISSORS) || 
        (m1 == Move.PAPER && m2 == Move.ROCK) || 
        (m1 == Move.SCISSORS && m2 == Move.PAPER) 
  } 
 } 


 final def player1 = new Player(name: "Player 1") 
 final def player2 = new Player(name: "Player 2")  
 final def coordinator = new Coordinator(player1: player1, player2: player2, games: 10) 

 [player1,player2,coordinator]*.start() 
 coordinator << "start"
 coordinator.join() 
 [player1,player2]*.terminate()

清单 9 含有对 “Rock, Paper, Scissors” 游戏的完整表示,该游戏是通过 GPars 使用一个 Coordinatoractor 和两个 Playeractors 实现的。Coordinator发送一条 play消息给两个 Players,然后等待接收响应。在它接收到两条响应之后,Coordinator打印出比赛结果并自我发送一条消息,以开始新游戏。Playeractors 等待动向请求,因此每一个都用一个任意举动进行响应。

GPars 很好地实现了所有关键 actor 功能以及一些额外功能。actor 方法也许不是针对所有并发性问题的最佳解决方案,但是它确实提供了很好的方式来建模涉及消息传递的问题。


Agents

Agents 在 Clojure(参阅 参考资料)中用于协调对可识别变更状态的多线程访问。Agents 分离不必要的身份(所指内容的名称)耦合和该身份引用的当前值。在大部分语言中,这两个方面有着千丝万缕的联系,因此拥有名称也就意味着您可以变更值。该关系如图 4 所示:

图 4. 管理状态的 Agent
一个图表显示 agent 与状态的关系。

Agents 在变量持有者与可变状态本身之间提供一层间接关系。要更改状态,您可以向 agent 传递一个函数,然后 agent 评估功能流,用每个函数的输出替换状态。由于 agent 序列化了数据访问,因此不会有竞争条件或数据损坏风险。

此外,数据的读取包括查看当前快照,这仅需很少的并发性开销,因为快照不会改变。

变更被异步发送到 agents。如有必要,一个线程可以阻止 agent,直至其更改被应用,或者您可以在应用变更时指定执行一个操作。

GPars 中的 agents

GPars 实现 Clojure 中的很多 agent 功能。在清单 10 中,我建模了一个僵尸进程入侵场景,并管理了 agent 内的世界状态。有两个线程:其中一个线程假设每隔一定时间,一个僵尸进程就会吃人脑,将该数字转化为僵尸。另一个进程假设其余僵尸进程的 5% 被幸存者使用弹枪杀死。主线程监守世界并报告入侵进度。

清单 10. 保护世界免受僵尸大灾难
(zombieApocalypse.groovy) 
 import groovyx.gpars.agent.Agent 
 import groovyx.gpars.GParsExecutorsPool 

 public class World { 
	 int alive = 1000 
	 int undead = 10 
	
	 public void eatBrains() { 
		 alive = alive - undead 
		 undead = undead * 2 		
		 if(alive <= 0) { 
			 alive = 0 
			 println "ZOMBIE APOCALYPSE!"
		 } 
	 } 
	
	 public void shotgun() { 
		 undead = undead * 0.95 
	 } 	
	
	 public boolean apocalypse() { 
		 alive <= 0 
	 } 
	
	 public void report() { 
		 if(alive > 0) { 
			 println "alive=" + alive + " undead=" + undead 
		 } 
	 } 
 } 

 @Grab(group='org.codehaus.gpars', module='gpars', version='0.10') 
 def final world = new Agent<World>(new World()) 

 final Thread zombies = Thread.start { 
	 while(! world.val.apocalypse()) { 
		 world << { it.eatBrains() } 
		 sleep 200 
	 } 
 } 

 final Thread survivors = Thread.start { 
	 while(! world.val.apocalypse()) { 
		 world << { it.shotgun() } 
		 sleep 200 
	 } 
 } 

 while(! world.instantVal.apocalypse()) { 
	 world.val.report() 
	 sleep 200 
 }

Agents 是 Clojure 中的一个重要特性,很高兴可以看到它们出现在 GPars 中。GPars 实现在失去一些功能(比如修改操作和观察程序),但这些只是微小遗漏,将来可能会予以添加。


数据流变量

数据流变量与 Oz 编程语言显著关联(参阅 参考资料),但是实现已在 Clojure、Scala 和 GPars 中得到构建。

对数据流变量的最常用类比是,它们如同电子表格中的单元格 —它们关注于指定必须发生的计算和必须提供值来执行该计算的变量。底层调度程序然后负责执行能够取得进展的线程,因为其输入可用。数据流系统仅关注数据如何在系统中流动,交由线程来决定如何有效利用多个核心。

数据流有一些不错的属性,其中问题的某些类是不可能的(竞争条件),某些类是可能而决定性的(死锁);因此您可以确保,如果您的代码在测试期间不生成死锁,它在生产过程中就不会经历死锁现象。

数据流变量可能仅被绑定一次,因而其使用是有限的。数据流充当值的绑定队列,因此可以通过代码定义的结构灌注数据,保留相同的有益属性。在实践中,数据流变量提供一种不错的方式来将值从一个线程传输到另一个线程,且它们通常用于传输多线程单元测试中的结果。GPars 还定义通过线程池(比如 actors)调度的逻辑数据流任务并通过数据流变量进行传输。

Dataflow streams

清单 2中,您看到每个后台线程接收和打印检索特定主题 tweets 的结果。清单 11 是该程序的一个变体,只是此次创建一个 DataFlowStream。后台任务将使用 DataFlowStream来将结果 tweets 流式传输回主线程,该主线程从数据流中读取它们。

清单 11. 通过 DataFlowStream 流式传输结果
(langTweetsDataflow.groovy) 
 import twitter4j.Twitter 
 import twitter4j.Query 
 import groovyx.gpars.GParsExecutorsPool 
 import groovyx.gpars.dataflow.DataFlowStream 

 @Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10') 
 @Grab(group='org.codehaus.gpars', module='gpars', version='0.10') 
 def recentTweets(api, queryStr, resultStream) { 
  query = new Query(queryStr) 
  query.rpp = 5 		 // tweets to return 
  query.lang = "en"		 // language 
  tweets = api.search(query).tweets 
  threadName = Thread.currentThread().name 
  tweets.each { 
	 resultStream << "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  } 
  resultStream << "DONE"
 } 
 
 def api = new Twitter() 
 def resultStream = new DataFlowStream() 
 def tags = ['#erlang','#scala','#clojure'] 
 GParsExecutorsPool.withPool { 
  tags.each { 
    { query -> recentTweets(api, query, resultStream) }.callAsync(it) 
  } 
 } 

 int doneMessages = 0 
 while(doneMessages < tags.size()) { 
	 msg = resultStream.val 
	 if(msg == "DONE") { 
		 doneMessages++ 
	 } else { 
		 println "${msg}"
	 } 
 }

注意,“DONE” 消息被通过数据流从每个后台线程发送到结果侦听程序。表示线程发送结果任务已完成的指标有时在消息传递领域称为 “毒消息”。它充当 producer 与 consumer 之间的信号。

管理状态的其他方法

本文中未涉及的两个重要并发模型是通信顺序进程(CSP)和软件事务内存(STM)。

CSP 基于 Tony Hoare 的经典之作中对并发程序行为的正式规范,且基于进程和通道的核心概念。进程在某些方面与前面讨论的 actor 模型类似,而 通道与数据流有很多相似之处。CSP 进程与 actors 的不同之处在于,它比单个邮箱有更丰富的输入和输出通道集。GPars 包含一个 API,完成 CSP 思想的 JCSP 实现。

STM 过去几年中一直是一个活跃研究领域,Haskell、Clojure 和 Fortress 等语言包括(有点不同)该概念的实现。STM 需要程序员在源代码中划分事务界限,然后系统向底层状态应用每个事务。如果检测到冲突,可以重试事务,不过它是自动处理的。然而,GPars 不包括 STM 的实现,但是将来有可能会包括。


结束语

并发性目前是一个热门领域,各种语言和库采用的方法之间有诸多混杂。GPars 采用目前一些最流行的并发模型,通过 Groovy 使其在 Java 平台上可访问。

带有大量核心的计算机占据很大地位,而且事实上在芯片上可用的核心的数量呈指数级上升。相应地,并发性会继续成为一个探索和革新领域;我们得知,JVM 将成为最持久方案之源。


下载

描述名字大小
本文样例代码j-gpars.zip6KB

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Java technology, Open source
ArticleID=551149
ArticleTitle=使用 GPars 解决常见并发问题
publish-date=10152010