Resolve common concurrency problems with GPars

Learn how Groovy's concurrency library leverages popular concurrency models and makes them accessible on the Java platform

The shift toward multicore processing has fueled an interest in concurrent programming models like fork/join, actors, agents, and executors. While these models originated within different programming languages, many of them are encapsulated in GPars, a Groovy-based concurrency library. With Alex Miller as your guide, learn how to resolve concurrency problems using techniques drawn from both the functional and object-oriented programming worlds — and do it all using Groovy's familiar, Java™-friendly syntax.

Alex Miller, Architect and senior engineer, Revelytix

Alex MillerAlex Miller works at Revelytix, building federated semantic web query products. Prior to Revelytix, Alex was technical lead at Terracotta, an engineer at BEA Systems, and chief architect at MetaMatrix. His interests include Java technology, concurrency, distributed systems, languages, and software design. Alex enjoys tweeting as @puredanger and blogging at Pure Danger Tech. In St. Louis, Alex is the founder of the Lambda Lounge group for the study of functional and dynamic languages and the Strange Loop developer conference.



07 September 2010

Also available in Chinese Russian Japanese

Develop skills on this topic

This content is part of a progressive knowledge path for advancing your skills. See Java concurrency

In the concurrency era, chips with 4, 8, and 16 processor cores are becoming common, and in the near future we will see chips with hundreds or even thousands of cores. That kind of processing power holds tremendous possibility, but for software developers, it also presents a challenge. The need to maximally leverage those shiny new cores has fueled a surge of interest in concurrency, state management, and programming languages structured for both.

Alex Miller and Andrew Glover talk concurrency

Andrew Glover interviews Alex on concurrency, GPars, and the multicore future in two in-depth podcasts. Listen to Part 1 and Part 2.

JVM languages like Groovy, Scala, and Clojure meet these demands. All three are newer languages that benefit from running on the highly optimized JVM, while also having access to the robust Java concurrency libraries added in Java 1.5. These languages actively enable concurrent programming, although each one takes a different approach based on its philosophy.

In this article, we'll use GPars, a Groovy-based concurrency library, to examine models for solving concurrency problems such as background processing, parallel processing, state management, and thread coordination.

Why Groovy? Why GPars?

Groovy is a dynamically-typed language that runs on the JVM. Based on the Java language, Groovy removes much of the ceremonial syntax found in Java code and adds useful features from other programming languages. One of Groovy's strongest features is that it lets programmers easily create Groovy-based DSLs. (A DSL, or domain-specific language, is a scripted language designed to solve a specific programming problem. See Resources to learn more about DSLs.)

Get code and tools

See the Resources section to download Groovy, GPars, and other tools used in this article. You can download the article's executable code samples anytime.

GPars, or Groovy Parallel Systems, is a Groovy concurrency library that captures concurrency and coordination models as DSLs. GPars draws ideas from some of the most popular concurrency and coordination models of other languages, including:

  • Executors and fork/join from the Java language
  • Actors from Erlang and Scala
  • Agents from Clojure
  • Dataflow variables from Oz

Groovy and GPars make an ideal pair for demonstrating various approaches to concurrency. Even for Java developers unfamiliar with Groovy, the discussion should be easy to follow, because Groovy's syntax is based on the Java language. Examples in this article are based on Groovy 1.7 and GPars 0.10.


Background and parallel processing

A common performance challenge is the need to wait for I/O. The I/O might involve reading data from a disk, or a database, or a web service, or even from a user. When a thread is blocked waiting for I/O, it is useful to be able to separate the waiting thread from the original thread of execution, allowing it to continue working. Because the waiting happens in the background, we call this technique background processing.

For example, imagine a program that calls the Twitter API to find the most recent tweets for several JVM languages, then prints them out. Groovy makes it easy to write such a program using the Java library twitter4j, shown in Listing 1:

Listing 1. Reading tweets serially (langTweets.groovy)
import twitter4j.Twitter
import twitter4j.Query

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
def recentTweets(api, queryStr) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.collect {
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
}

def api = new Twitter()
['#erlang','#scala','#clojure'].each {
  tweets = recentTweets(api, it)
  tweets.each {
    println "${it}"
  }
}

In Listing 1, I've first used Groovy Grape (see Resources) to grab the twitter4j library dependency. I then defined a recentTweets method to take a query string and execute that query, returning a list of tweets formatted as strings. Finally, I walked though each tag in my list of tags, obtained the tweets, and printed them out. Because no threads were used, this code executes each search serially, as shown in Figure 1:

Figure 1. Reading tweets serially
A program model for reading tweets serially.

Parallel processing

If the program in Listing 1 is going to sit around waiting, then it might as well be waiting for more than one thing. If I put each remote request into a background thread, the program could wait for each response query in parallel, as shown in Figure 2:

Figure 2. Reading tweets in parallel
An improved program model for reading tweets in parallel.

The GPars Executors DSL makes it easy to transform the program in Listing 1 from serial processing to parallel processing, as shown in Listing 2:

Listing 2. Reading tweets in parallel (langTweetsParallel.groovy)
  import twitter4j.Twitter
import twitter4j.Query
import groovyx.gpars.GParsExecutorsPool

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def recentTweets(api, queryStr) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.collect {
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
}

def api = new Twitter()
GParsExecutorsPool.withPool {
  def retrieveTweets = { query ->
    tweets = recentTweets(api, query)
    tweets.each {
      println "${it}"
    }
  }

  ['#erlang','#scala','#clojure'].each {
    retrieveTweets.callAsync(it)
  }
}

Using the Executors DSL, I added an import statement for the GParsExecutorsPool and a Grab statement to grab the GPars library using the Groovy Grape dependency system. I then introduced a GParsExecutorsPool.withPool block, which enhances code within the block to add additional capabilities. In Groovy, closures can be invoked with the call method. GParsExecutorsPool will execute closures invoked with the call method as a task in the underlying pool. In addition, GParsExecutorsPool enhances closures with a callAsync method that is invoked and immediately returns without blocking. In the example, I wrapped my tweet search and printing actions into a closure, then invoked it asynchronously for each query.

Because GPars gives these tasks to a pool of workers, I can now perform all of my searches in parallel (provided the pool is large enough). I can also process the results from each query in parallel, printing them to the screen as they arrive.

This example illustrates two ways that background processing can improve performance and responsiveness: I/O waits are done in parallel and processing dependent on that I/O can also occur in parallel.


Executors

You might be wondering what an executor is and how the background processing in GParsExecutorsPool works. Executors are actually part of the java.util.concurrent library introduced in Java 5 (see Resources). The java.util.concurrent.Executor interface has only a single method: execute(Runnable). It exists to decouple task submission from how a task is actually executed within an Executor implementation.

The java.util.concurrent.Executors class provides many helpful methods to create Executor instances backed by a variety of different thread-pool types. GParsExecutorsPool defaults to using a thread pool with daemon threads and a fixed number of threads (Runtime.getRuntime().availableProcessors() + 1). In cases where the default isn't appropriate, it's easy to change the thread-pool size, use a custom ThreadFactory, or specify the whole existing Executor pool (withExistingPool).

To change the number of threads, I could just pass the GParsExecutorsPool thread count to the withPool method, as shown in Listing 3:

Listing 3. Starting an Executor pool with thread count
GParsExecutorsPool.withPool(8) {
  // ...
}

Or, if I wanted to pass a custom thread factory that had specially named threads, I could do so as in Listing 4.

Listing 4. Starting an Executor pool with a custom thread factory
def threadCounter = new AtomicLong(0)
def threadFactory = {Runnable runnable ->
  Thread thread = new Thread(runnable)
  id = threadCounter.getAndIncrement()
  thread.setName("thread ${id}")
  return thread
} as ThreadFactory

def api = new Twitter()
GParsExecutorsPool.withPool(2, threadFactory) {
  // ...
}

The async and callAsync methods do not block and immediately return a Future object, which represents the future result of an asynchronous computation. The recipient can ask the Future to block until a result is ready, poll to see whether it's complete, cancel the computation, or check whether another thread has canceled it. Like the Executor interface, the Future class is part of the underlying java.util.concurrent package.


Parallelism for the CPU

In the background processing example, you saw the benefit of having a program wait for multiple I/O-bound tasks in parallel, rather than processing them sequentially. Utilizing a pool of workers to execute multiple tasks in parallel is also beneficial for CPU-bound tasks.

Two important aspects of your application affect the degree of parallelism it can accept and, thus, the range of programming options you have:

  • Task granularity, meaning the scope of a task in terms of time or data
  • Task dependencies, meaning the number of dependencies that typically exist between tasks

Both aspects exist on a continuum and it's useful to consider where your problem lives on that continuum before devising a solution. For instance, a program's task granularity could be defined by large transaction-sized pieces of work, or it could consist of many short computations on a small part of a larger dataset (a few pixels of a whole image). Due to the overhead involved in context switching to a thread or process for any amount of work, systems with small-scope granularity are often inefficient and suffer from low performance. Batching based on task granularity is one way to find the optimum efficiency point for your system.

Tasks with few dependencies are often described as "embarrassingly parallel," meaning that they're almost too easy to separate into a number of parallel tasks. Classic examples include graphics processing, brute-force searching, fractals, and particle simulations. Any business-processing program that processes or transforms a large number of orders or files could also fit into this category.

Executor queue contention

We've already explored the mechanics of how to push tasks into an Executor pool with GPars. It's good to keep in mind, however, that Executors were added to the Java 2 Platform, Standard Edition (J2SE) sometime around 2005. As such, they're tuned for a relatively small number of cores (two to eight) running coarser grained, possibly blocking transactional tasks that have few inter-task dependencies. Executors are implemented with a single incoming work queue shared by all worker threads.

A key problem with this model is that increasing the number of worker threads increases the contention on the work queue (this is shown in Figure 3). This contention ultimately becomes a scalability bottleneck as threads and cores increase.

Figure 3. Executor queue contention
A diagram showing the scalability bottleneck that results from executor queue contention.

An alternative to Executor queues is the fork/join framework, which currently lives under the JSR 166y maintenance update (see Resources) and will be formally introduced to the Java platform in JDK 7. Fork/join is tuned for a large number of concurrent threads running fine-grained computational tasks.


Fork/join in GPars

Fork/join has support for defining dependencies between tasks and generating new tasks; these properties make it ideal for divide-and-conquer style algorithms where a task forks into sub-tasks, then joins the child computations back together (see Resources). Fork/join addresses the issue of queue contention by having one work queue per thread. The queue used in each case is actually a deque (a double-ended queue, pronounced "deck"), which allows threads to steal work from the back end of another queue, balancing work entering the pool.

Consider the task of finding the maximum value in a list. The most obvious strategy is to simply walk through all of the numbers, keeping tabs on the highest value seen as you walk through it. This is an inherently serial strategy, however, and does not take advantage of all those expensive cores.

Instead, consider what happens if I implement the maximum-value function as a parallel divide-and-conquer algorithm. Divide-and-conquer is a recursive algorithm; each step has the structure shown in Listing 5:

Listing 5. Divide-and-conquer algorithm
IF problem is small enough to solve directly
THEN solve it directly
ELSE {
  Divide the problem in two or more sub-problems 
  Solve each sub-problem
  Combine the results
}

The IF condition allows me to vary the granularity of each task. This algorithm style will produce a tree whose leaves are defined by tasks that take the THEN branch. Internal nodes in the tree are tasks that take the ELSE branch. Each internal node must wait (depend on) its two (or more) child tasks. The fork/join model is designed for exactly these kinds of algorithms, where you have a large number of tasks waiting in a dependency tree. Waiting tasks in fork/join do not actually block threads.

GPars allows us to create and execute fork/join algorithms by running a fork/join task, as demonstrated in Listing 6:

Listing 6. Parallel fork/join implementation of max() (computeMax.groovy)
import static groovyx.gpars.GParsPool.runForkJoin
import groovyx.gpars.GParsPool
import groovyx.gpars.AbstractForkJoinWorker

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Config {
	static DATA_COUNT = 2**14
	static GRANULARITY_THRESHHOLD = 128
	static THREADS = 4
}

items = [] as List<Integer>
items.addAll(1..Config.DATA_COUNT)
Collections.shuffle(items)

GParsPool.withPool(Config.THREADS) {
  computedMax = runForkJoin(1, Config.DATA_COUNT, items.asImmutable()) 
    {begin, end, items ->
      int size = end - begin
      if (size <= Config.GRANULARITY_THRESHHOLD) {
        return items[begin..<end].max()
      } else { // divide and conquer
        leftEnd = begin + ((end + 1 - begin) / 2)
        forkOffChild(begin, leftEnd, items)
        forkOffChild(leftEnd + 1, end, items)
        return childrenResults.max()
      }
    }
		
	println "expectedMax = ${Config.DATA_COUNT}"
	println "computedMax = ${computedMax}"
}

Note that fork/join has its own special pool in the class groovyx.gpars.GParsPool. GParsPool shares many common features with GParsExecutorsPool but has special features specific to fork/join. To use fork/join directly, you must either use the runForkJoin() method with a task closure or a task class that subclasses AbstractForkJoinWorker.


Parallel collections

Fork/join provides an excellent way to define and execute parallel task structures, especially those in divide-and-conquer algorithms. You might have noticed, however, that there was considerable ceremony involved in the previous example. I had to define the task closure, determine appropriate task granularity, split the subproblem, combine the results, and so on.

Ideally, we want to work at a higher level of abstraction, where we define a data structure, then execute common operations over it in parallel, without needing to define low-level tasks that manage the details in every case.

The JSR 166y maintenance update specifies a high-level interface for this purpose called ParallelArray. ParallelArray provides common functional programming operations over an array structure and those functions are executed in parallel using a fork/join pool.

Due to the functional nature of the API, it is necessary to pass a function (method) to many of these operations so that it can be performed on each item in the ParallelArray. One feature still in development for JDK 7 is lambda support, which would allow developers to define code blocks and pass them around. At this time, the status of ParallelArray's inclusion in JDK 7 is pending the outcome of the lambda project (see Resources).


ParallelArray in GPars

Groovy has full support for defining blocks of code as closures and passing them around as first-class objects, so it is very natural to work in this functional style using Groovy and GPars. ParallelArray and GPars support a core set of functional operators:

  • map
  • reduce
  • filter
  • size
  • sum
  • min
  • max

Additionally, GPars extends collections inside a GParsPool block, giving us additional parallel methods built on the primitives:

  • eachParallel
  • collectParallel
  • findAllParallel
  • everyParallel
  • groupByParallel

Parallel collection methods can be made transparent, so that the standard collection methods will operate in parallel by default. This allows you to pass a parallel collection to existing (non-parallel) code bases and potentially use that code as is. It is still important to consider the state used by those parallel methods, however, because the external code may not make the necessary synchronization guarantees.

Revisiting the example from Listing 6, notice that max() is a method already provided on parallel collections, so there is no need to define and invoke a fork/join task directly, as in Listing 7:

Listing 7. Using GPars ParallelArray functions (computeMaxPA.groovy)
import static groovyx.gpars.GParsPool.runForkJoin
import groovyx.gpars.GParsPool

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Config {
	static DATA_COUNT = 2**14
	static THREADS = 4
}

items = [] as List<Integer>
items.addAll(1..Config.DATA_COUNT)
Collections.shuffle(items)

GParsPool.withPool(Config.THREADS) {
	computedMax = items.parallel.max()
	println "expectedMax = ${Config.DATA_COUNT}"
	println "computedMax = ${computedMax}"
}

Functional programming with ParallelArray

Now let's say I'm writing a report for the orders in an inventory system, computing the number of orders that are overdue and the average numbers of days overdue for those orders. I can do this by defining methods to first determine whether an order is late (by comparing the due date with today's date) and then calculate the number of days' difference between today and the due date.

The heart of this code is the part that computes the necessary numbers using core ParallelArray methods, shown in Listing 8:

Listing 8. Making full use of parallel functional operators (orders.groovy)
GParsPool.withPool {
  def data = createOrders().parallel.filter(isLate).map(daysOverdue)
  println("# overdue = " + data.size())
  println("avg overdue by = " + (data.sum() / data.size()))
}

Here I've taken a list of orders, turned it into a ParallelArray, retained only those orders where isLate returned true, and computed a mapping function for each order to turn it into the number of days overdue. I can then use built-in aggregate functions on the array to get the size and sum of those days overdue and compute an average. This code is very similar to what you might see in a functional programming language, with the additional benefit that it's automatically executed in parallel.


Managing state

Anytime you work with data that will be read or written by multiple threads, you must consider how to manage that data and coordinate changes. The ruling paradigm for managing shared state in the Java language and others consists of mutable state protected by locks or other critical section markers.

Mutable state and locks are problematic for many reasons. Locks imply ordering dependencies in the code that allow the developer to reason about the execution path and expected results. Because many aspects of locking are not enforced, however, it is common to see poor-quality code containing problems of visibility, safe publication, race conditions, deadlocks, and other common concurrency bugs.

A more serious issue is that even if you start with two components that were correctly written with respect to concurrency, it is possible (even likely) to combine them in ways that produce new and surprising bugs. So it is difficult to write concurrent systems built on mutable state and locks that continue to be reliable as a system grows.

In the following sections, I'll demonstrate three paradigms for managing and sharing state across threads in a system. Fundamentally, these paradigms can be (and are) built on top of a substrate of threads and locks, but they create higher level abstractions that push much of that complexity down.

The three approaches I'll demonstrate are actors, agents, and dataflow variables, all of which are supported by GPars.


Actors

The actor paradigm was first popularized in Erlang and has gained further notoriety recently due to its use in Scala (see Resources to learn more about both languages). Erlang was designed in the 1980s and 1990s at Ericsson for devices like the AXD301 telecommunications switch. The design considerations for switches like these are challenging: extreme reliability, no downtime (hot code upgrades), and massive concurrency.

Erlang assumes a world of "processes," which are like lightweight threads (not like operating system processes) but do not map directly to native threads. Process execution is scheduled by the underlying virtual machine. Processes in Erlang are assumed to be small in memory, fast to start, and fast to context-switch.

Erlang actors are simply functions that execute on a process. Erlang has no shared memory, and in it all state is immutable. Immutable data is a key aspect of many languages that focus on concurrency because it has such nice properties. Immutable data cannot be changed and thus reading immutable data does not require locks, even when multiple threads are reading. Modifying immutable data consists of building a new version of the data and working from the new version. For developers with a background primarily in shared-mutable-state languages (like the Java language) this shift in perspective can take some adjustment.

State is "shared" between actors by passing immutable messages. Each actor has a mailbox and the actor function is executed repeatedly by receiving messages in its mailbox. Message sending is usually asynchronous, although it is easy to build synchronous calls as well and some actor implementations provide those as a feature.

Actors in GPars

GPars implements the actor model using many of the concepts from Erlang and Scala. Actors in GPars are lightweight processes that consume messages from a mailbox. It is possible to either give up or retain a thread binding depending on whether messages are consumed by the receive() or react() methods.

In GPars, actors can be created from a factory method that takes a closure or by subclassing groovyx.gpars.actor.AbstractPooledActor. Within the actor, there should be an act() method. Typically, the act() method contains a loop that will repeat forever and then calls either react (for a lightweight actor) or receive (for a heavyweight actor that remains bound to its thread).

Listing 9. Actor implementation of 'Rock, Paper, Scissors' (rps.groovy)
package puredanger.gparsdemo.rps;

import groovyx.gpars.actor.AbstractPooledActor

enum Move { ROCK, PAPER, SCISSORS }

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Player extends AbstractPooledActor {
  String name
  def random = new Random()
  
  void act() {
    loop {
      react {
        // player replies with a random move 
        reply Move.values()[random.nextInt(Move.values().length)]
      }
    }
  }
}

class Coordinator extends AbstractPooledActor {
  Player player1
  Player player2
  int games
  
  void act() {
    loop {
      react {
	// start the game
        player1.send("play")
        player2.send("play")
        
        // decide the winner
        react {msg1 ->
          react {msg2 ->          
            announce(msg1.sender.name, msg1, msg2.sender.name, msg2) 

            // continue playing
            if(games-- > 0) 
              send("start")
            else 
              stop()
          }
        }
      }
    }
  }
  
  void announce(p1, m1, p2, m2) {
    String winner = "tie"
    if(firstWins(m1, m2) && ! firstWins(m2, m1)) {
      winner = p1
    } else if(firstWins(m2, m1) && ! firstWins(m1, m2)) {
      winner = p2
    } // else tie
    
    if(p1.compareTo(p2) < 0) {
      println toString(p1, m1) + ", " + toString(p2, m2) + ", winner = " + winner      
    } else {
      println toString(p2, m2) + ", " + toString(p1, m1) + ", winner = " + winner 
    }
  }
  
  String toString(player, move) {
    return player + " (" + move + ")"
  }
  
  boolean firstWins(Move m1, Move m2) {
    return (m1 == Move.ROCK && m2 == Move.SCISSORS) || 
        (m1 == Move.PAPER && m2 == Move.ROCK) ||
        (m1 == Move.SCISSORS && m2 == Move.PAPER)
  }
}


final def player1 = new Player(name: "Player 1") 
final def player2 = new Player(name: "Player 2")  
final def coordinator = new Coordinator(player1: player1, player2: player2, games: 10)

[player1,player2,coordinator]*.start()
coordinator << "start"
coordinator.join()
[player1,player2]*.terminate()

Listing 9 contains a complete representation of the "Rock, Paper, Scissors" game implemented with GPars using a Coordinator actor and two Player actors. The Coordinator sends a play message to both Players and then waits to receive a response. After it has received two responses, the Coordinator prints the outcome of the match and sends a message to itself to start a new game. The Player actors wait to be asked for a move, whereupon each one responds with an arbitrary move.

GPars provides nice implementations of all the key actor features plus some additional ones. The actor approach may not be the best solution for all concurrency problems, but it does provide an excellent way to model problems that involve message passing.


Agents

Agents are used in Clojure (see Resources) to coordinate multithreaded access to an identifiable piece of changing state. Agents split the unnecessary coupling of identity (a name for something you refer to) and the current value referred to by that identity. In most languages, these two aspects are inextricably linked, such that having the name means you can also change the value. This relationship is shown in Figure 4:

Figure 4. Agent managing state
A diagram showing the relationship of agent to state.

Agents provide a layer of indirection between the holder of a variable and the mutable state itself. To change state, you pass a function to the agent and it evaluates a stream of these functions, replacing the state with the output of each function. Because the agent serializes access to the data, there is no risk of a race condition or data corruption.

Additionally, reading the data consists of looking at the current snapshot, which can be done with little concurrency overhead because a snapshot does not change.

Changes are sent to agents asynchronously. If necessary, a thread can block on the agent until its changes have been applied, or you can specify an action to be executed when the changes are applied.

Agents in GPars

GPars implements much of the agent functionality found in Clojure. In Listing 10, I've modeled a zombie invasion and managed the state of the world within an agent. There are two threads: one assumes that in every time unit, a zombie eats the brains of a human, converting that number to zombies. The other thread assumes that 5 percent of the remaining zombies are killed by survivors with shotguns. The main thread watches the world and reports on the invasion's progress.

Listing 10. Protecting the world from the zombie apocalypse
(zombieApocalypse.groovy)
import groovyx.gpars.agent.Agent
import groovyx.gpars.GParsExecutorsPool

public class World {
	int alive = 1000
	int undead = 10
	
	public void eatBrains() {
		alive = alive - undead
		undead = undead * 2		
		if(alive <= 0) {
			alive = 0
			println "ZOMBIE APOCALYPSE!"
		}
	}
	
	public void shotgun() {
		undead = undead * 0.95
	}	
	
	public boolean apocalypse() {
		alive <= 0
	}
	
	public void report() {
		if(alive > 0) {
			println "alive=" + alive + " undead=" + undead
		}
	}
}

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def final world = new Agent<World>(new World())

final Thread zombies = Thread.start {
	while(! world.val.apocalypse()) {
		world << { it.eatBrains() }
		sleep 200
	}
}

final Thread survivors = Thread.start {
	while(! world.val.apocalypse()) {
		world << { it.shotgun() } 
		sleep 200
	}
}

while(! world.instantVal.apocalypse()) {
	world.val.report()
	sleep 200
}

Agents are an important feature in Clojure and it's nice to see them show up in GPars. The GPars implementation is missing some features (like modification actions and watchers), but these are minor omissions and will likely be added in the future.


Dataflow variables

Dataflow variables are most prominently associated with the Oz programming language (see Resources), but implementations have been built in Clojure, Scala, and GPars.

The most common analogy for dataflow variables is that they are like cells in a spreadsheet — they focus on specifying the computation that must occur and the variables that must provide values to perform that computation. The underlying scheduler is then responsible for executing threads that can make progress because their inputs are available. Dataflow systems focus purely on how the data flows through the system, leaving it up to the thread scheduler to determine how to efficiently make use of multiple cores.

Dataflow has some nice properties in that certain classes of problems are impossible (race conditions) and certain classes are possible but deterministic (deadlocks); thus you can guarantee that if your code does not produce deadlocks during testing, it will not experience deadlocks in production.

Dataflow variables may only be bound once, so their use is limited. Dataflow streams act as bounded queues of values, so data can be poured through the structure defined by the code, maintaining many of the same beneficial properties. In practice, dataflow variables provide an excellent way to communicate values from one thread to another, and they are commonly used to communicate results in multithreaded unit tests. GPars also defines logical dataflow tasks that are scheduled over a thread pool (like actors) and communicate via dataflow variables.

Dataflow streams

Back in Listing 2, you saw each background thread receive and print the results of retrieving tweets for a particular topic. Listing 11 is a variant of this program that instead creates a single DataFlowStream. Background tasks will use the DataFlowStream to stream result tweets back to the main thread that reads them from the stream.

Listing 11. Streaming results through a DataFlowStream
(langTweetsDataflow.groovy)
import twitter4j.Twitter
import twitter4j.Query
import groovyx.gpars.GParsExecutorsPool
import groovyx.gpars.dataflow.DataFlowStream

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def recentTweets(api, queryStr, resultStream) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return 
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.each {
	resultStream << "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
  resultStream << "DONE"
}
 
def api = new Twitter()
def resultStream = new DataFlowStream()
def tags = ['#erlang','#scala','#clojure']
GParsExecutorsPool.withPool {
  tags.each { 
    { query -> recentTweets(api, query, resultStream) }.callAsync(it)
  }
}

int doneMessages = 0
while(doneMessages < tags.size()) {
	msg = resultStream.val
	if(msg == "DONE") {
		doneMessages++
	} else {
		println "${msg}"
	}
}

Notice the "DONE" message being sent from each background thread through the stream to the results listener. This indicator that a thread is finished sending results is sometimes called a "poison message" in message-passing literature. It acts as a signal between a producer and a consumer.

More ways to manage state

Two important concurrency models that are not covered in this article are communicating sequential processes (CSP) and software transactional memory (STM).

CSP is based on Tony Hoare's classic work in formalizing the behavior of concurrent programs, and is based on the core concepts of processes and channels. Processes are similar in some ways to the actor model already discussed and channels share many similarities with dataflow streams. CSP processes differ from actors in having a much richer set of input and output channels than a single mailbox. GPars includes an API for the JCSP implementation of the CSP ideas.

STM has been an active area of research for the last few years and languages like Haskell, Clojure, and Fortress include (somewhat different) implementations of the concept. STM requires programmers to demarcate transactional boundaries in source and the system then applies each transaction to the underlying state. If conflicts are detected, the transaction can be retried, but it is handled automatically. GPars does not yet include an implementation of STM, but it likely will in the future.


In conclusion

Concurrency is a hot area right now and a great deal of hybridization is occurring between approaches taken by different languages and libraries. GPars takes some of the most popular concurrency models today and makes them accessible on the Java platform, via Groovy.

Computers with a large number of cores aren't going anywhere, and in fact the number of cores available on a chip will exponentially increase. Concurrency will, accordingly, continue to be an area of exploration and innovation; as we've learned, the JVM will be home to many of the most enduring alternatives.


Download

DescriptionNameSize
Sample code for this articlej-gpars.zip6KB

Resources

Learn

Get products and technologies

Discuss

  • Get involved in the My developerWorks community. Connect with other developerWorks users while exploring the developer-driven blogs, forums, groups, and wikis.

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Java technology on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Java technology, Open source
ArticleID=514755
ArticleTitle=Resolve common concurrency problems with GPars
publish-date=09072010