메인 컨텐츠로 가기

developerWorks 이용 약관에 동의하시는 경우 제출을 클릭하십시오. 이용 약관 보기.

developerWorks에 처음 로그인하면 developerWorks프로파일이 생성됩니다.귀하의 프로파일에서 동의하신 내용이 공개되지만 이 사항은 언제든지 변경 가능합니다. 귀하의 성명(숨김으로 체크되어 있어도 표시됩니다)과 디스플레이 이름은 게시한 컨텐츠나 사이트 엑세스시 표시됩니다.

모든 정보가 안전하게 전송되었습니다.

  • 닫기 [x]

처음 developerWorks에 로그인할 때 프로파일이 작성되므로, 이를 위해 디스플레이 이름을 선택해야 합니다. 선택하신 디스플레이 이름은 developerWorks에 게시한 컨텐츠에 표시됩니다.

3글자 이상 31글자 이하의 길이로 사용 가능합니다. dW커뮤니티 내에서는 보안상 이메일주소를 제외한 다른 이름을 지정하셔야 합니다.

developerWorks 이용 약관에 동의하시는 경우 제출을 클릭하십시오. 이용 약관 보기.

모든 정보가 안전하게 전송되었습니다.

  • 닫기 [x]

GPars를 사용하여 일반적인 동시성 문제점 해결하기

Groovy의 동시성 라이브러리가 자주 사용되는 동시성 모델을 활용하고 Java 플랫폼에서 액세스 가능하게 하는 방법 알아보기

Alex Miller, Architect and senior engineer, Revelytix
Alex Miller works at Revelytix, building federated semantic web query products. Prior to Revelytix, Alex was technical lead at Terracotta, an engineer at BEA Systems, and chief architect at MetaMatrix. His interests include Java technology, concurrency, distributed systems, languages, and software design. Alex enjoys tweeting as @puredanger and blogging at Pure Danger Tech. In St. Louis, Alex is the founder of the Lambda Lounge group for the study of functional and dynamic languages and the Strange Loop developer conference.

요약:  멀티코어 프로세싱으로의 이동과 함께 분기 실행/결합, 액터, 에이전트 및 실행자와 같은 동시 프로그래밍 모델에 대한 관심이 증가했습니다. 이러한 모델은 서로 다른 프로그래밍 언어에서 생성되었지만 다수가 Groovy 기반 동시성 라이브러리인 GPars로 캡슐화되어 있습니다. Alex Miller의 안내에 따라 함수형 프로그래밍과 오브젝트 지향 프로그래밍에서 파생된 기술을 사용하여 동시성 문제점을 해결(모두 Groovy의 익숙한 Java™에 적합한 구문을 사용하여 수행함)하는 방법에 대해 살펴봅니다.

원문 게재일:  2010 년 9 월 07 일 번역 게재일:   2010 년 12 월 31 일
난이도:  중급 원문:  보기 PDF:  A4 and Letter (104KB | 22 pages)Get Adobe® Reader®
페이지뷰:  2544 회
의견:  


동시성의 시대에 4, 8 및 16개의 프로세서 코어를 가진 칩은 일반적인 것이 되고 있으며 가까운 미래에는 수백 또는 수천 개의 코어를 가진 칩도 보게 될 것이다. 이러한 유형의 처리 능력은 엄청난 가능성을 보유하고 있지만 소프트웨어 개발자에게는 도전 과제를 제시하기도 한다. 이렇게 멋진 새 코어를 최대한 활용해야 한다는 요구로 인해 동시성, 상태 관리와 이 두 가지를 위해 구조화된 프로그래밍 언어에 대한 관심이 증폭되었다.

동시성에 대한 Alex Miller와 Andrew Glover의 대화

Andrew Glover는 두 편의 심층 팟 캐스트에서 동시성, GPars 및 멀티코어의 미래에 대해 Alex와 인터뷰를 진행한다. Part 1Part 2를 들어보자.

Groovy, Scala 및 Clojure와 같은 JVM 언어는 이러한 요구를 충족한다. 이 세 가지 언어는 모두 매우 최적화된 JVM에서 실행되고 Java 1.5에 추가된 강력한 Java 동시성 라이브러리에도 액세스할 수 있는 장점이 있다. 이들 언어는 적극적으로 동시 프로그래밍을 사용할 수 있도록 하지만 각각 철학에 따라 다른 접근방식을 사용한다.

이 기사에서는 Groovy 기반 동시성 라이브러리인 GPars를 사용하여 백그라운드 처리, 병렬 처리, 상태 관리 및 스레드 조정과 같은 동시성 문제점을 해결하는 데 필요한 모델을 검사한다.

Groovy와 GPars를 사용해야 하는 이유

Groovy는 JVM에서 실행되는 동적으로 유형화된 언어이다. Java 언어를 기반으로 하여 Groovy는 Java 코드에서 찾은 형식적인 구문을 대부분 제거하고 다른 프로그래밍 언어의 유용한 기능을 추가한다. Groovy의 강력한 기능 중 하나는 프로그래머가 Groovy 기반 DSL을 쉽게 작성할 수 있다는 것이다. (DSL(Domain-Specific Language)은 특정 프로그래밍 문제점을 해결하도록 설계된 스크립트화된 언어이다. DSL에 대한 자세한 정보는 참고자료를 참조한다.)

코드와 도구 가져오기

참고자료 섹션을 참조하여 이 기사에 사용된 Groovy, GPars 및 기타 도구를 다운로드한다. 언제든지 이 기사의 실행 파일 코드 샘플을 다운로드할 수 있다.

GPars(Groovy Parallel Systems)는 동시성 및 조정 모델을 DSL로 캡처하는 Groovy 동시성 라이브러리이다. GPars는 다음과 같은 다른 언어의 가장 유명한 동시성 및 조정 모델 중 일부에서 아이디어를 얻는다.

  • Java 언어의 실행자 및 분기 실행/결합
  • Erlang 및 Scala의 액터
  • Clojure의 에이전트
  • Oz의 데이터 플로우 변수

Groovy와 GPars는 동시성에 대한 다양한 접근방식을 보여 주는 데 이상적인 조합이다. Groovy의 구문은 Java 언어를 기반으로 하기 때문에 Groovy에 익숙하지 않은 Java 개발자의 경우에도 이러한 설명을 쉽게 따라올 수 있다. 이 기사에 있는 예제는 Groovy 1.7 및 GPars 0.10을 기반으로 한다.


백그라운드 및 병렬 처리

성능과 관련된 일반적인 도전 과제는 I/O를 기다려야 한다는 것이다. I/O에는 디스크, 데이터베이스 또는 웹 서비스나 사용자로부터 제공되는 데이터를 읽는 것이 포함된다. I/O를 기다리느라 스레드가 멈추는 경우에는 대기하는 스레드를 원래의 실행 스레드로부터 분리하여 작업을 계속 수행할 수 있으면 좋다. 대기는 백그라운드에서 수행되기 때문에 이 기술을 백그라운드 처리라고 부른다.

예를 들어, 여러 JVM 언어에 대해 가장 최근의 트윗을 찾기 위헤 Twitter API를 호출하는 프로그램을 생각해 보자. Groovy는 Listing 1에 있는 Java 라이브러리 twitter4j를 사용하여 이러한 프로그램을 쉽게 작성할 수 있게 한다.


Listing 1. 직렬로 트윗 읽기(langTweets.groovy)
import twitter4j.Twitter
import twitter4j.Query

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
def recentTweets(api, queryStr) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.collect {
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
}

def api = new Twitter()
['#erlang','#scala','#clojure'].each {
  tweets = recentTweets(api, it)
  tweets.each {
    println "${it}"
  }
}

Listing 1에서는 먼저 Groovy Grape(참고자료 참조)를 사용하여 twitter4j 라이브러리 종속성을 확보했다. 그런 다음 쿼리 문자열을 가져와서 해당 쿼리를 실행하는 데 필요한 recentTweets 메소드를 정의하여 문자열로 형식화된 트윗 목록을 리턴했다. 마지막으로 태그 목록에 있는 각각의 태그를 살펴보고 트윗을 확보하여 출력했다. 스레드가 사용되지 않았기 때문에 이 코드는 그림 1과 같이 각각의 검색을 직렬로 실행한다.


그림 1. 직렬로 트윗 읽기
직렬로 트윗을 읽기 위한 프로그램 모델

병렬 처리

Listing 1에 있는 프로그램이 대기하면서 시간을 보내야 한다면 둘 이상의 항목을 대기하는 것이 낫다. 각각의 원격 요청을 백그라운드 스레드에 추가하면 그림 2와 같이 프로그램이 병렬로 각 응답 쿼리를 기다릴 수 있다.


그림 2. 병렬로 트윗 읽기
병렬로 트윗을 읽기 위한 향상된 프로그램 모델

GPars 실행자 DSL을 사용하면 Listing 2와 같이 Listing 1에 있는 프로그램을 직렬 처리에서 병렬 처리로 쉽게 변환할 수 있다.


Listing 2. 병렬로 트윗 읽기(langTweetsParallel.groovy)
  import twitter4j.Twitter
import twitter4j.Query
import groovyx.gpars.GParsExecutorsPool

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def recentTweets(api, queryStr) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.collect {
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
}

def api = new Twitter()
GParsExecutorsPool.withPool {
  def retrieveTweets = { query ->
    tweets = recentTweets(api, query)
    tweets.each {
      println "${it}"
    }
  }

  ['#erlang','#scala','#clojure'].each {
    retrieveTweets.callAsync(it)
  }
}

필자는 실행자 DSL을 사용하여 GParsExecutorsPool을 위한 import문과 Groovy Grape 종속성 시스템을 사용하여 GPars 라이브러리를 확보하는 데 필요한 Grab문을 추가했다. 그런 다음 블록 내에서 기능을 추가하기 위해 코드를 강화하는 GParsExecutorsPool.withPool 블록을 소개했다. Groovy에서는 call 메소드를 사용하여 클로저를 호출할 수 있다. GParsExecutorsPool은 call 메소드로 호출된 클로저를 기본 풀의 태스크로 실행한다. 또한 GParsExecutorsPool은 호출된 후 멈추지 않고 즉시 리턴되는 callAsync 메소드로 클로저를 강화한다. 예제에서는 트윗 검색과 인쇄 조치를 클로저에 랩핑한 후 각 쿼리에 대해 비동기로 호출했다.

GPars는 이러한 태스크를 작업자 풀에 제공하기 때문에 필자는 이제 모든 검색을 병렬로 수행할 수 있다(단, 풀이 충분히 커야 함). 또한 각 쿼리의 결과를 병렬로 처리한 후 도착 시 화면에 출력할 수 있다.

이 예제에서는 백그라운드 처리에서 성능과 응답성을 향상시킬 수 있는 두 가지 방법(I/O 대기가 병렬로 수행되고 해당 I/O에 종속되는 처리도 병렬로 수행됨)에 대해 보여 준다.


실행자

실행자가 무엇이며 GParsExecutorsPool 작업에서의 백그라운드 처리가 어떻게 작동하는지 궁금할 것이다. 실행자는 실은 Java 5에서 도입된 java.util.concurrent 라이브러리의 일부이다(참고자료 참조). java.util.concurrent.Executor 인터페이스에는 단일 메소드(execute(Runnable))만 있다. 이는 Executor 구현에서 태스크가 실제로 실행되는 방법으로부터 태스크 제출을 분리하기 위해 존재한다.

java.util.concurrent.Executors 클래스는 서로 다른 다양한 스레드 풀 유형에서 지원하는 Executor 인스턴스를 작성하는 데 필요한 다수의 유용한 메소드를 제공한다. GParsExecutorsPool은 기본적으로 디먼 스레드와 고정된 수의 스레드가 포함된 스레드 풀을 사용한다(Runtime.getRuntime().availableProcessors() + 1). 기본값이 적절하지 않은 경우에는 쉽게 스레드 풀 크기를 변경하거나 사용자 정의 ThreadFactory를 사용하거나 전체 기존 Executor 풀(withExistingPool)을 지정할 수 있다.

스레드 수를 변경하려면 Listing 3과 같이 GParsExecutorsPool 스레드 개수를 withPool 메소드에 전달하기만 하면 된다.


Listing 3. 스레드 개수가 포함된 실행자 풀 시작하기
GParsExecutorsPool.withPool(8) {
  // ...
}

또는 특별하게 이름 지정된 스레드가 포함된 사용자 정의 스레드 팩토리를 전달하려는 경우에는 Listing 4와 같이 이를 수행할 수 있다.


Listing 4. 사용자 정의 스레드 팩토리가 포함된 실행자 풀 시작하기
def threadCounter = new AtomicLong(0)
def threadFactory = {Runnable runnable ->
  Thread thread = new Thread(runnable)
  id = threadCounter.getAndIncrement()
  thread.setName("thread ${id}")
  return thread
} as ThreadFactory

def api = new Twitter()
GParsExecutorsPool.withPool(2, threadFactory) {
  // ...
}

asynccallAsync 메소드는 멈추지 않고 비동기 계산의 향후 결과를 표시하는 Future 오브젝트를 즉시 리턴한다. 수신자는 결과가 준비될 때까지 차단하도록 Future에 요청하거나 결과가 완전한지 폴링하거나 계산을 취소하거나 다른 스레드가 해당 결과를 취소했는지 확인할 수 있다. Executor 인터페이스와 마찬가지로 Future 클래스는 기본 java.util.concurrent 패키지의 일부이다.


CPU를 위한 병렬 처리

백그라운드 처리 예제에서는 프로그램이 다중 I/O 바인드 태스크를 순차적으로 처리하는 대신 병렬로 처리하기 위해 대기하도록 하면 얻을 수 있는 장점에 대해 살펴봤다. 작업자 풀을 활용하여 병렬로 다중 태스크를 실행하는 것도 CPU 바인드 태스크에는 도움이 된다.

애플리케이션이 승인할 수 있는 병렬 처리 등급과 이로 인해 사용자가 가지게 되는 프로그래밍 옵션의 범위는 다음과 같은 애플리케이션의 두 가지 중요 측면의 영향을 받는다.

  • 태스크 세분성 - 시간 또는 데이터의 관점에서 태스크의 범위를 의미함
  • 태스크 종속성 - 태스크 사이에 일반적으로 존재하는 종속성의 수를 의미함

두 측면은 모두 연속체에 존재하며 솔루션을 고안하기 전에 해당 연속체에서 문제점이 존재하는 위치를 고려하는 것이 유용하다. 예를 들어, 프로그램의 태스크 세분성은 작업의 큰 트랜잭션 크기 조작에 의해 정의되거나 큰 데이터 세트의 작은 부분(전체 이미지의 몇몇 픽셀)에 대한 다수의 간단한 계산으로 구성될 수 있다. 임의의 크기의 작업에 대한 프로세스 또는 스레드로의 컨텍스트 전환과 관련된 오버헤드로 인해 세분성 범위가 작은 시스템은 충분하지 않아 성능 저하가 발생하는 경우가 있다. 태스크 세분성을 기반으로 한 일괄처리는 시스템의 최적 효율 지점을 찾는 한 가지 방법이다.

종속성이 거의 없는 태스크는 "지나치게 병렬"인 것으로 묘사되는 경우가 자주 있으며 이는 이러한 태스크가 다수의 병렬 태스크로 매우 쉽게 분리된다는 것을 의미한다. 전형적인 예제로는 그래픽 처리, 무차별 대입 공격 검색, 프랙탈 및 파티클 시뮬레이션이 있다. 많은 수의 주문 또는 파일을 처리하거나 변환하는 비즈니스 처리 프로그램도 이 카테고리에 적합할 수 있다.

실행자 큐 경합

GPars를 사용하여 태스크를 Executor 풀에 밀어넣는 방법에 대해 이미 살펴봤다. 하지만 2005년 경에 Executor가 J2SE(Java 2 Platform, Standard Edition)에 추가된 것을 기억하는 것이 좋다. 실행자는 덜 조밀하여 태스크 간 종속성이 거의 없는 트랜잭션 태스크를 차단할 수 있는 상대적으로 적은 수(2개 - 8개)의 코어에 맞게 조정되었다. Executor는 모든 작업자 스레드에서 공유하는 단일 수신 작업 큐를 사용하여 구현된다.

이 모델의 주요 문제점은 작업자 스레드 수를 늘리면 작업 큐에서의 경합이 증가되는 것이다(그림 3에 표시되어 있음). 이러한 경합은 궁극적으로 스레드와 코어가 증가할 때 확장성 병목 현상이 된다.


그림 3. 실행자 큐 경합
실행자 큐 경합으로 인해 발생하는 확장성 병목 현상을 보여 주는 다이어그램

Executor 큐에 대한 대안은 현재 JSR 166y 유지보수 업데이트(참고자료 참조)에서 사용되고 JDK 7에서 Java 플랫폼에 공식적으로 도입될 분기 실행/결합 프레임이다. 분기 실행/결합은 세분화된 계산 태스크를 실행하는 많은 수의 동시 스레드에 맞게 조정되어 있다.


GPars의 분기 실행/결합

분기 실행/결합에는 태스크 간 종속성 정의와 새로운 태스크 생성에 대한 지원이 포함되어 있다. 이러한 특성으로 인해 분기 실행/결합이 태스크가 서브 태스크로 분기 실행된 후 다시 하위 계산을 결합하는 분할 정복 스타일 알고리즘에 이상적인 것으로 된다(참고자료 참조). 분기 실행/결합은 스레드 당 하나의 작업 큐만 가지도록 하여 큐 경합 문제를 해결한다. 각각의 경우에 사용된 큐는 실제로는 스레드가 다른 큐의 백엔드로부터 작업을 가져와서 풀에 들어오는 작업의 균형을 맞출 수 있게 하는 deque(double-ended queue("데크"라고 읽음))이다.

목록에서 최대값을 찾는 태스크를 생각해 보자. 가장 명백한 전략은 단순히 모든 숫자를 검토하면서 가장 높은 값을 지날 때 표시되는 가장 높은 값에 탭을 유지하는 것이다. 하지만 이 전략은 본질적으로 연속된 전략이며 비용이 많이 드는 모든 해당 코어를 활용하지 않는다.

대신 최대값 함수를 병렬 분할 정복 알고리즘으로 구현하는 경우에 어떤 일이 발생하는지 생각해 보자. 분할 정복은 재귀 알고리즘이며 각 단계의 구조는 Listing 5에 표시된 구조이다.


Listing 5. 분할 정복 알고리즘
IF problem is small enough to solve directly
THEN solve it directly
ELSE {
  Divide the problem in two or more sub-problems 
  Solve each sub-problem
  Combine the results
} 

IF 조건을 사용하면 각 태스크의 세분성을 다양화할 수 있다. 이 알고리즘 스타일은 THEN 분기를 사용하는 태스크에 의해 리프가 정의되는 트리를 생성한다. 트리의 내부 노드는 ELSE 분기를 사용하는 태스크이다. 각 내부 노드는 두 가지 이상의 하위 태스크를 대기(의존)해야 한다. 분기 실행/결합 모델은 종속성 트리에 대기 중인 태스크가 많은 정확하게 이러한 유형의 알고리즘을 위해 설계되었다. 분기 실행/결합의 대기 태스크는 실제로는 스레드를 차단하지 않는다.

GPars를 사용하면 Listing 6과 같이 분기 실행/결합 태스크를 실행하여 분기 실행/결합 알고리즘을 작성하고 실행할 수 있다.


Listing 6. max()의 병렬 분기 실행/결합 구현(computeMax.groovy)
import static groovyx.gpars.GParsPool.runForkJoin
import groovyx.gpars.GParsPool
import groovyx.gpars.AbstractForkJoinWorker

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Config {
	static DATA_COUNT = 2**14
	static GRANULARITY_THRESHHOLD = 128
	static THREADS = 4
}

items = [] as List<Integer>
items.addAll(1..Config.DATA_COUNT)
Collections.shuffle(items)

GParsPool.withPool(Config.THREADS) {
  computedMax = runForkJoin(1, Config.DATA_COUNT, items.asImmutable()) 
    {begin, end, items ->
      int size = end - begin
      if (size <= Config.GRANULARITY_THRESHHOLD) {
        return items[begin..<end].max()
      } else { // divide and conquer
        leftEnd = begin + ((end + 1 - begin) / 2)
        forkOffChild(begin, leftEnd, items)
        forkOffChild(leftEnd + 1, end, items)
        return childrenResults.max()
      }
    }
		
	println "expectedMax = ${Config.DATA_COUNT}"
	println "computedMax = ${computedMax}"
}

분기 실행/결합에는 클래스 groovyx.gpars.GParsPool에 특별한 자체 풀이 있다는 것에 유의한다. GParsPoolGParsExecutorsPool과 많은 공통 기능을 공유하지만 분기 실행/결합에 대해 고유한 특별한 기능을 가지고 있다. 분기 실행/결합을 직접 사용하려면 AbstractForkJoinWorker를 서브클래스화하는 태스크 클래스 또는 태스크 클로저와 함께 runForkJoin() 메소드를 사용해야 한다.


병렬 콜렉션

분기 실행/결합은 병렬 태스크 구조(특히 분할 정복 알고리즘에 있는 구조)를 정의하고 실행하는 탁월한 방법을 제공한다. 하지만 이전 예제에 관련된 상당한 형식이 있음을 알아챘을 수 있다. 태스크 클로저를 정의하고 적절한 태스크 세분성을 판별하고 부차적인 문제를 분할하는 등의 작업을 수행해야 한다.

이상적으로는 데이터 구조를 정의한 후 모든 경우의 세부사항을 관리하는 낮은 레벨의 태스크를 정의할 필요 없이 병렬로 공통 조작을 이에 대해 실행하는 높은 추상화 레벨에서 작업하기를 원한다.

JSR 166y 유지보수 업데이트는 이러한 목적으로 ParallelArray라는 상위 레벨 인터페이스를 지정한다. ParallelArray는 배열 구조에 대한 공통 함수 프로그래밍 조작을 제공하며 이러한 함수는 분기 실행/결합 풀을 사용하여 병렬로 실행된다.

API의 함수적인 특성으로 인해 ParallelArray의 각 항목에서 수행될 수 있도록 함수(메소드)를 이러한 조작 중 다수에 전달해야 한다. JDK 7을 위해 여전히 개발 중인 기능 중 하나는 개발자가 코드 블록을 정의하여 전달할 수 있도록 하는 람다 지원이다. 현재는 JDK 7에 ParallelArray가 포함된 상태로 인해 람다 프로젝트의 결과가 보류되고 있다(참고자료 참조).


GPars의 ParallelArray

Groovy는 코드 블록을 클로저로 정의하여 퍼스트 클래스 오브젝트로 전달하는 기능을 완전히 지원하므로 Groovy 및 GPars를 사용하여 이 함수형 스타일에서 작업하는 것은 매우 자연스러운 것이다. ParallelArray 및 GPars는 핵심 함수형 연산자 세트를 지원한다.

  • map
  • reduce
  • filter
  • size
  • sum
  • min
  • max

또한 GPars는 GParsPool 블록 내에서 콜렉션을 확장하여 기본 요소에 빌드된 추가 병렬 메소드를 제공한다.

  • eachParallel
  • collectParallel
  • findAllParallel
  • everyParallel
  • groupByParallel

병렬 콜렉션 메소드는 투명하게 될 수 있으므로 표준 콜렉션 메소드는 기본적으로 병렬로 작동된다. 이를 통해 병렬 콜렉션을 기존(비병렬) 코드 기반에 전달하고 잠재적으로 해당 코드를 현 상태대로 사용할 수 있다. 하지만 외부 코드는 필수인 동기화를 보증하지 않을 수 있으므로 해당 병렬 메소드에서 사용하는 상태를 고려하는 것이 여전히 중요하다.

Listing 6의 예제를 다시 살펴보면 max()는 병렬 콜렉션에 이미 제공된 메소드라는 것을 알게 된다. 따라서 Listing 7과 같이 분기 실행/결합 태스크를 직접 정의하고 호출하지 않아도 된다.


Listing 7. GPars ParallelArray 함수 사용하기(computeMaxPA.groovy)
import static groovyx.gpars.GParsPool.runForkJoin
import groovyx.gpars.GParsPool

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Config {
	static DATA_COUNT = 2**14
	static THREADS = 4
}

items = [] as List<Integer>
items.addAll(1..Config.DATA_COUNT)
Collections.shuffle(items)

GParsPool.withPool(Config.THREADS) {
	computedMax = items.parallel.max()
	println "expectedMax = ${Config.DATA_COUNT}"
	println "computedMax = ${computedMax}"
}

ParallelArray를 사용한 함수형 프로그래밍

이제 필자가 재고 시스템에서 주문에 대한 보고서를 작성하고 기한이 지난 주문 수와 이러한 주문의 기한 초과 평균 일 수를 계산한다고 가정해 보자. 필자는 먼저 만기 날짜와 오늘 날짜를 비교하여 주문의 기한이 지났는지 판별한 후 오늘 날짜와 만기 날짜 간 일 수 차이를 계산하는 데 필요한 메소드를 정의하여 이를 수행할 수 있다.

이 코드의 핵심은 Listing 8에 표시된 코어 ParallelArray 메소드를 사용하여 필요한 숫자를 계산하는 부분이다.


Listing 8. 병렬 함수형 연산자 최대 활용하기(orders.groovy)
GParsPool.withPool {
  def data = createOrders().parallel.filter(isLate).map(daysOverdue)
  println("# overdue = " + data.size())
  println("avg overdue by = " + (data.sum() / data.size()))
}

여기서 필자는 주문 목록을 가져와서 ParallelArray로 전환한 후 isLate가 true를 리턴한 주문만 보유하고 각 주문에 대한 맵핑 함수를 계산하여 기한이 지난 일 수로 전환했다. 그런 다음 필자는 배열에 대한 내장 집계 함수를 사용하여 기한이 지난 일 수의 크기와 합계를 얻고 평균을 계산할 수 있다. 이 코드는 함수형 프로그래밍 언어에서 볼 수 있는 코드와 매우 비슷하며 자동으로 병렬로 실행된다는 추가적인 장점도 있다.


상태 관리하기

다중 스레드에서 읽거나 쓸 데이터에 대해 작업하는 경우에는 언제나 해당 데이터를 관리하고 변경사항을 조정하는 방법에 대해 고려해야 한다. Java 언어와 다른 언어에서 공유 상태를 관리하는 데 사용되는 지배적인 패러다임은 잠금이나 기타 중요 섹션 마커에 의해 보호되는 가변 상태로 구성된다.

가변 상태와 잠금은 여러 가지 이유로 문제가 있다. 잠금은 개발자가 실행 경로와 예상 결과에 대해 추론할 수 있도록 하는 순서 종속성을 코드에 내포한다. 하지만 잠금의 여러 가지 측면은 강제되지 않기 때문에 가시성, 안전한 공개, 레이스 조건, 교착 상태 및 기타 일반적인 동시성 버그 문제가 포함된 낮은 품질의 코드를 쉽게 볼 수 있다.

더 심각한 문제는 동시성과 관련하여 정확하게 작성된 두 컴포넌트로 시작하는 경우에도 새로운 놀라운 버그를 생성하는 방식으로 이들을 결합할 수 있다는 것이다. 따라서 시스템의 진화에 맞춰 지속적으로 신뢰할 수 있는 가변 상태 및 잠금을 기반으로 빌드된 동시 시스템을 작성하는 것은 어렵다.

다음 섹션에서는 시스템의 스레드에서 상태를 관리하고 공유하기 위한 세 가지 패러다임에 대해 설명한다. 기본적으로 이러한 패러다임은 스레드 및 잠금을 기반으로 빌드될 수 있지만 복잡도를 상당히 줄이는 더 높은 수준의 추상화를 작성한다.

필자가 설명하는 세 가지 접근방식은 액터, 에이전트 및 데이터 플로우 변수이며 이들은 모두 GPars에서 지원된다.


액터

액터 패러다임은 먼저 Erlang에서 유명해진 후 Scala에서 사용하면서 최근에 악평을 더 얻게 되었다(두 언어에 대한 자세한 정보는 참고자료 참조). Erlang은 AXD301 통신 스위치와 같은 디바이스를 위해 Ericsson에서 1980년대와 1990년대에 설계되었다. 이와 같은 스위치에 대한 설계 고려사항은 극한의 신뢰성, 작동 중지 시간 제거(핫 코드 업그레이드) 및 대규모 동시성과 같이 도전적인 것들이다.

Erlang은 운영 체제 프로세스와는 다르고 경량 스레드와 비슷하지만 원시 스레드에 직접 맵핑하지는 않는 "프로세스"를 가정한다. 프로세스 실행은 기본 가상 시스템에 의해 스케줄된다. Erlang의 프로세스는 메모리 소비가 적고 시작이 빠르며 컨텍스트 전환이 신속한 것으로 가정된다.

Erlang 액터는 단순히 프로세스에서 실행되는 함수이다. Erlang은 공유 메모리를 가지고 있지 않으며 항상 불변 상태이다. 불변 데이터는 우수한 특성을 가지고 있기 때문에 동시성에 초점을 두는 다수 언어의 주요 측면이다. 불변 데이터는 변경할 수 없기 때문에 불변 데이터를 읽을 때는 잠금이 필요하지 않으며 다중 스레드가 읽을 때도 마찬가지이다. 불변 데이터를 수정하려면 데이터의 새 버전을 빌드한 후 새 버전에서 작업해야 한다. 주로 공유 가변 상태 언어(예: Java 언어)에 대한 배경지식을 가지고 있는 개발자의 경우에는 이렇게 관점을 전환하려면 조정이 필요할 수 있다.

상태는 불변 메시지를 전달하여 액터 사이에서 "공유"된다. 각각의 액터에는 메일함이 있으며 actor 함수는 메일함에 메시지를 수신하여 반복적으로 실행된다. 메시지 전송은 일반적으로 비동기로 수행되지만 동기 호출을 쉽게 빌드할 수 있으며 일부 액터 구현에서는 동기 호출을 기능으로 제공한다.

GPars의 액터

GPars는 Erlang 및 Scala의 개념을 다수 사용하여 액터 모델을 구현한다. GPars의 액터는 메일함의 메시지를 이용하는 경량 프로세스이다. 메시지가 receive() 메소드와 react() 메소드 중 어느 것에 의해 이용되는지에 따라 스레드 바인딩을 포기할 수도 있고 보유할 수도 있다.

GPars에서는 액터가 클로저를 사용하는 팩토리 메소드로부터 작성되거나 groovyx.gpars.actor.AbstractPooledActor를 서브클래스화하여 작성될 수 있다. 액터 내부에는 act() 메소드가 있어야 한다. 일반적으로 act() 메소드에는 영구적으로 반복되어 react(경량 액터의 경우) 또는 receive(스레드에 대한 바인드를 유지하는 중량 액터의 경우)를 호출하는 루프가 포함되어 있다.


Listing 9. 'Rock, Paper, Scissors'의 액터 구현(rps.groovy)
package puredanger.gparsdemo.rps;

import groovyx.gpars.actor.AbstractPooledActor

enum Move { ROCK, PAPER, SCISSORS }

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Player extends AbstractPooledActor {
  String name
  def random = new Random()
  
  void act() {
    loop {
      react {
        // player replies with a random move 
        reply Move.values()[random.nextInt(Move.values().length)]
      }
    }
  }
}

class Coordinator extends AbstractPooledActor {
  Player player1
  Player player2
  int games
  
  void act() {
    loop {
      react {
	// start the game
        player1.send("play")
        player2.send("play")
        
        // decide the winner
        react {msg1 ->
          react {msg2 ->          
            announce(msg1.sender.name, msg1, msg2.sender.name, msg2) 

            // continue playing
            if(games-- > 0) 
              send("start")
            else 
              stop()
          }
        }
      }
    }
  }
  
  void announce(p1, m1, p2, m2) {
    String winner = "tie"
    if(firstWins(m1, m2) && ! firstWins(m2, m1)) {
      winner = p1
    } else if(firstWins(m2, m1) && ! firstWins(m1, m2)) {
      winner = p2
    } // else tie
    
    if(p1.compareTo(p2) < 0) {
      println toString(p1, m1) + ", " + toString(p2, m2) + ", winner = " + winner      
    } else {
      println toString(p2, m2) + ", " + toString(p1, m1) + ", winner = " + winner 
    }
  }
  
  String toString(player, move) {
    return player + " (" + move + ")"
  }
  
  boolean firstWins(Move m1, Move m2) {
    return (m1 == Move.ROCK && m2 == Move.SCISSORS) || 
        (m1 == Move.PAPER && m2 == Move.ROCK) ||
        (m1 == Move.SCISSORS && m2 == Move.PAPER)
  }
}


final def player1 = new Player(name: "Player 1") 
final def player2 = new Player(name: "Player 2")  
final def coordinator = new Coordinator(player1: player1, player2: player2, games: 10)

[player1,player2,coordinator]*.start()
coordinator << "start"
coordinator.join()
[player1,player2]*.terminate()

Listing 9에는 Coordinator 액터와 두 개의 Player 액터를 사용하여 GPars에서 구현된 "Rock, Paper, Scissors" 게임의 완전한 표시가 포함되어 있다. Coordinatorplay 메시지를 두 Player 모두에게 전송한 후 응답 수신을 기다린다. 두 응답을 수신한 후 Coordinator는 일치 결과를 출력하고 메시지를 자신에게 전송하여 새로운 게임을 시작한다. Player 액터는 이동 요청을 기다린 후 이 요청에 따라 각각 임의 이동으로 응답한다.

GPars는 모든 주요 액터 기능과 일부 추가 기능의 우수한 구현을 제공한다. 액터 접근방식은 모든 동시성 문제에 대한 최적의 해결책은 아닐 수 있지만 메시지 전달과 관련된 문제점을 모델링하는 탁월한 방법을 제공한다.


에이전트

에이전트는 Clojure(참고자료 참조)에서 변경되는 상태의 식별 가능한 부분에 대한 멀티스레드 액세스를 조정하는 데 사용된다. 에이전트는 ID(사용자가 참조하는 항목의 이름)와 이 ID가 참조하는 현재 값의 불필요한 결합을 분할한다. 대부분의 언어에서 이러한 두 측면은 단단히 연결되어 있으므로 이름을 가진다는 것은 값을 변경할 수도 있다는 것을 의미한다. 이 관계는 그림 4에 표시되어 있다.


그림 4. 상태를 관리하는 에이전트
상태에 대한 에이전트의 관계를 보여 주는 다이어그램

에이전트는 변수 보유자와 가변 상태 자체 사이의 간접 계층을 제공한다. 상태를 변경하기 위해 함수를 에이전트에 전달하면 에이전트는 이러한 함수의 스트림을 평가하여 상태를 각 함수의 출력으로 대체한다. 에이전트는 데이터에 대한 액세스를 직렬화하므로 레이스 조건이나 데이터 손상의 위험은 없다.

또한 데이터 읽기는 현재 스냅샷 확인으로 구성되며 스냅샷은 변경되지 않기 때문에 이는 동시성 오버헤드가 거의 없는 상태로 수행될 수 있다.

변경사항은 비동기로 에이전트에 전송된다. 필요할 경우 스레드는 변경사항이 적용될 때까지 에이전트에서 멈출 수 있거나 사용자가 변경사항이 적용될 때 실행될 조치를 지정할 수 있다.

GPars의 에이전트

GPars는 Clojure에 있는 에이전트 기능을 상당 부분 구현한다. Listing 10에서 필자는 좀비 침입을 모델링하고 에이전트에서 상태를 관리했다. 두 가지 스레드가 있는데 하나는 모든 시간 단위에서 좀비가 인간의 뇌를 먹고 그 수만큼 좀비로 바꾼다고 가정한다. 다른 스레드는 나머지 좀비 중 5퍼센트가 생존자들의 총에 죽는다고 가정한다. 기본 스레드는 상황을 감시하여 침입의 진행 상황을 보고한다.


Listing 10. 좀비의 공격으로부터 세상 보호하기
(zombieApocalypse.groovy)
import groovyx.gpars.agent.Agent
import groovyx.gpars.GParsExecutorsPool

public class World {
	int alive = 1000
	int undead = 10
	
	public void eatBrains() {
		alive = alive - undead
		undead = undead * 2		
		if(alive <= 0) {
			alive = 0
			println "ZOMBIE APOCALYPSE!"
		}
	}
	
	public void shotgun() {
		undead = undead * 0.95
	}	
	
	public boolean apocalypse() {
		alive <= 0
	}
	
	public void report() {
		if(alive > 0) {
			println "alive=" + alive + " undead=" + undead
		}
	}
}

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def final world = new Agent<World>(new World())

final Thread zombies = Thread.start {
	while(! world.val.apocalypse()) {
		world << { it.eatBrains() }
		sleep 200
	}
}

final Thread survivors = Thread.start {
	while(! world.val.apocalypse()) {
		world << { it.shotgun() } 
		sleep 200
	}
}

while(! world.instantVal.apocalypse()) {
	world.val.report()
	sleep 200
}

에이전트는 Clojure에서 중요한 기능이며 GPars에서 에이전트를 볼 수 있어서 다행이다. GPars 구현에는 수정 조치 및 감시자와 같은 일부 기능이 누락되어 있지만 이들은 사소한 누락 기능이며 향후 추가될 수 있다.


데이터 플로우 변수

데이터 플로우 변수는 Oz 프로그래밍 언어(참고자료 참조)와 가장 연관되어 있지만 구현은 Clojure, Scala 및 GPars에서 빌드되었다.

데이터 플로우 변수에 대한 가장 일반적인 비유는 스프레드시트에 있는 셀과 같다는 것이다. 데이터 플로우 변수는 발생해야 하는 계산과 이러한 계산을 수행하기 위해 값을 제공해야 하는 변수를 지정하는 데 초점을 둔다. 그러면 기본 스케줄러의 입력을 사용할 수 있기 때문에 기본 스케줄러는 진행할 수 있는 스레드의 실행을 책임진다. 데이터 플로우 시스템은 순수하게 시스템에서의 데이터 플로우에 집중하고 다중 코어의 효율적인 사용 방법은 스레드 스케줄러가 결정하도록 내버려 둔다.

데이터 플로우에는 문제점의 특정 클래스는 사용할 수 없고(레이스 조건) 특정 클래스는 사용할 수 있지만 결정적인(교착 상태) 우수한 특성이 일부 포함되어 있다. 따라서 코드가 테스트 중에 교착 상태를 생성하지 않으면 프로덕션 시 교착 상태가 발생하지 않음을 보증할 수 있다.

데이터 플로우 변수는 한 번만 바인드될 수 있으므로 사용이 제한된다. 데이터 플로우 스트림은 값의 바인드된 큐 역할을 하므로 데이터는 코드에 의해 정의된 구조를 통해 공급되어 유용한 동일한 특성을 다수 유지할 수 있다. 실제로 데이터 플로우 변수는 한 스레드에서 다른 스레드로 값을 통신하는 탁월한 방법을 제공하고 멀티스레드 유닛 테스트에서 결과를 통신하는 데 일반적으로 사용된다. 또한 GPars는 액터와 같은 스레드 풀에 대해 스케줄되고 데이터 플로우 변수를 통해 통신하는 논리적 데이터 플로우 태스크를 정의한다.

데이터 플로우 스트림

Listing 2에서는 각 백그라운드 스레드가 특정 주제에 대한 트윗 검색 결과를 수신하여 출력하는 것을 확인했다. Listing 11은 단일 DataFlowStream을 대신 작성하는 이 프로그램의 변형이다. 백그라운드 테스트는 DataFlowStream을 사용하여 결과 트윗을 다시 스트림에서 읽는 기본 스레드로 이동시킨다.


목록 11. DataFlowStream을 통해 결과 스트리밍
(langTweetsDataflow.groovy)
import twitter4j.Twitter
import twitter4j.Query
import groovyx.gpars.GParsExecutorsPool
import groovyx.gpars.dataflow.DataFlowStream

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def recentTweets(api, queryStr, resultStream) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return 
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.each {
	resultStream << "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
  resultStream << "DONE"
}
 
def api = new Twitter()
def resultStream = new DataFlowStream()
def tags = ['#erlang','#scala','#clojure']
GParsExecutorsPool.withPool {
  tags.each { 
    { query -> recentTweets(api, query, resultStream) }.callAsync(it)
  }
}

int doneMessages = 0
while(doneMessages < tags.size()) {
	msg = resultStream.val
	if(msg == "DONE") {
		doneMessages++
	} else {
		println "${msg}"
	}
}

각 백그라운드 스레드에서 스트림을 통해 결과 리스너에 전송 중인 "DONE" 메시지에 주목한다. 스레드가 결과 전송을 완료했음을 나타내는 이 메시지를 메시지 전달 관련 용어로 "포이즌 메시지"라고 부르는 경우가 있다. 이 메시지는 생산자와 소비자 사이의 신호 역할을 한다.

추가적인 상태 관리 방법

이 기사에서 다루지 않는 두 가지 중요한 동시성 모델은 CSP(Ccommunicating Sequential Processe)와 STM(Software Transactional Memory)이다.

CSP는 동시 프로그램의 동작을 공식화하는 Tony Hoare의 전형적인 작업을 기반으로 하며 프로세스 및 채널의 핵심 개념을 기반으로 한다. 프로세스는 이미 설명한 액터 모델과 일부 측면에서 비슷하며 채널은 데이터 플로우 스트림과 유사성이 많다. CSP 프로세스는 단일 메일함보다 훨씬 풍부한 입력 및 출력 채널 세트를 가진다는 점에서 액터와 다르다. GPars에는 CSP 아이디어의 JCSP 구현을 위한 API가 포함된다.

STM은 최근 몇 년 동안 활발한 연구 대상이었으며 Haskell, Clojure 및 Fortress와 같은 언어에는 이 개념의 약간은 다른 구현이 포함된다. STM을 사용하려면 프로그래머가 소스에서 트랜잭션 경계를 정해야 하며 그러면 시스템이 각각의 트랜잭션을 기본 상태에 적용한다. 충돌이 감지되면 트랜잭션을 재시도할 수 있지만 이는 자동으로 처리된다. GPars에는 아직 STM의 구현이 포함되어 있지 않지만 향후에 포함될 것이다.


결론

동시성은 지금 주목을 받고 있는 분야이며 다양한 언어 및 라이브러리에서 이용하는 접근방식 사이에서 많은 융합이 발생하고 있다. GPars는 오늘날 가장 유명한 동시성 모델 중 일부를 사용하여 Groovy를 통해 Java 플랫폼에서 액세스할 수 있도록 한다.

많은 수의 코어를 가진 컴퓨터는 계속 사용될 것이며 실제로 칩에서 사용할 수 있는 코어 수는 기하급수적으로 증가할 것이다. 이에 따라 동시성도 계속 탐구와 혁신 대상 영역이 될 것이다. 이미 살펴봤듯이 JVM은 가장 오래 지속되는 대안 중 다수의 기초가 된다.



다운로드 하십시오

설명이름크기다운로드 방식
Sample code for this articlej-gpars.zip6KBHTTP

다운로드 방식에 대한 정보


참고자료

교육

제품 및 기술 얻기

토론

  • My developerWorks 커뮤니티에 참여하자. 개발자가 이끌고 있는 블로그, 포럼, 그룹 및 Wiki를 살펴보면서 다른 developerWorks 사용자와 의견을 나눌 수 있다.

필자소개

Alex Miller works at Revelytix, building federated semantic web query products. Prior to Revelytix, Alex was technical lead at Terracotta, an engineer at BEA Systems, and chief architect at MetaMatrix. His interests include Java technology, concurrency, distributed systems, languages, and software design. Alex enjoys tweeting as @puredanger and blogging at Pure Danger Tech. In St. Louis, Alex is the founder of the Lambda Lounge group for the study of functional and dynamic languages and the Strange Loop developer conference.

잘못된 도움말 신고

부정사용 신고

감사합니다. 이 항목은 운영자가 관심을 표시했습니다.


잘못된 도움말 신고

부정사용 신고

제출실패 신고. 나중에 다시 실행해주세요.


디벨로퍼웍스 로그인


IBM ID가 필요하세요?
IBM ID를 잊으셨습니까?


비밀번호를 잊으셨습니까?
비밀번호 변경

developerWorks 이용 약관에 동의하시는 경우 제출을 클릭하십시오. 이용 약관.

 


developerWorks에 처음 로그인하면 developerWorks프로파일이 생성됩니다.귀하의 프로파일에서 동의하신 내용이 공개되지만 이 사항은 언제든지 변경 가능합니다. 귀하의 성명(숨김으로 체크되어 있어도 표시됩니다)과 디스플레이 이름은 게시한 컨텐츠나 사이트 엑세스시 표시됩니다.

화면상에 보여지는 닉네임을 정하세요.

처음 developerWorks에 로그인할 때 프로파일이 작성되므로, 이를 위해 디스플레이 이름을 선택해야 합니다. 선택하신 디스플레이 이름은 developerWorks에 게시한 컨텐츠에 표시됩니다.

3글자 이상 31글자 이하의 길이로 사용 가능합니다. dW커뮤니티 내에서는 보안상 이메일주소를 제외한 다른 이름을 지정하셔야 합니다.

3개의 &이나 대쉬를 포함해주시고 31글자내로 제한해주세요.


developerWorks 이용 약관에 동의하시는 경우 제출을 클릭하십시오. 이용 약관.

 


아티클 순위

의견

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=20
Zone=자바, 오픈 소스
ArticleID=605623
ArticleTitle=GPars를 사용하여 일반적인 동시성 문제점 해결하기
publish-date=09072010
author1-email=alexdmiller@yahoo.com
author1-email-cc=

태그

Help
검색 필드를 사용하여 My developerWorks 내에서 해당 태그가 사용된 모든 종류의 컨텐츠를 검색하십시오.

태그를 더 많이 보거나 적게 보기 위해 슬라이더 막대를 사용하십시오.

인기 태그는 특정 컨텐츠 존(예를 들어, 자바, 리눅스, WebSphere)의 최고 인기 태그를 보여줍니다.

내 태그는 특정 컨텐츠 존(예를 들어, 자바, 리눅스, WebSphere)의 귀하의 태그를 보여줍니다.

검색 필드를 사용하여 My developerWorks 내에서 해당 태그가 사용된 모든 종류의 컨텐츠를 검색하십시오. 인기 태그는 특정 컨텐츠 존(예를 들어, 자바, 리눅스, WebSphere)의 최고 인기 태그를 보여줍니다. 내 태그는 특정 컨텐츠 존(예를 들어, 자바, 리눅스, WebSphere)의 귀하의 태그를 보여줍니다.