Skip to main content

developerWorks >  Java technology  >  Forums  >  Multithreaded Java programming  >  developerWorks

Thread Pool - Blocking    Point your RSS reader here for a feed of the latest messages in this thread

Thread Archived This thread has been archived - replies are not allowed.



     

 
 

My developerWorks
 Welcome, Guest
Sign in or register
Permlink Replies: 5 - Pages: 1 - Last Post: Jan 31, 2006 5:14 PM Last Post By:

Posts: 3
Registered: Jan 30, 2006 06:29:18 PM
Thread Pool - Blocking
Posted: Jan 30, 2006 06:56:08 PM
Click to report abuse...
I have a large text file in which each line requires intensive process. The design is to have a class that reads the file and delegates the processing of each line to a thread, via thread pool. The file reader class should be blocked from reading the next line once there is no free thread in the pool to do the processing.
code// read each line {
// create a Runnable to process the line
// pass the Runnable to the thread pool
// block if there is no free thread in the pool
// }[/code]
Using Executors.newFixedThreadPool() seems not a good solution because the the created thread pool operates on a shared unbounded queue. This might cause an out-of-memory because we keep filling the Runnable to the unbounded queue, which is almost like loading the whole file into memory).
codeExecutorService tpes = Executors.newFixedThreadPool(3);
int c = 0;
while ((line = in.readLine()) != null) {
c++;
System.out.println("Manager: line #" + c);
tpes.execute(new WorkerThread(line)); // WorkerThread is a Runnable
}[/code]
Using ThreadPoolExecutor does not provide the solution as well--the execute() method of ThreadPoolExecutor does not actually block the caller; it only throws a RejectedExecutionException exception. [b]Do I need to create a special RejectedExecutionException handler to block the reading and to resubmit the same Runnable? But, how will the caller know when a thread becomes available in the pool?[/b]
codeThreadPoolExecutor tpExe = new ThreadPoolExecutor(2, 3, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(5));
int c = 0;
while ((line = in.readLine()) != null) {
c++;
System.out.println("Manager: line #" + c);
try {
tpExe.execute(new WorkerThread(line));
}
catch (RejectedExecutionException e) {
System.out.println("Manager: RejectedExecutionException line #" + c);
}
}[/code]

Posts: 466
Registered: Dec 12, 2002 04:37:06 PM
Re: Thread Pool - Blocking
Posted: Jan 30, 2006 07:07:32 PM   in response to: in response to: 's post
Click to report abuse...
One easy way to do this is to use a bounded work queue with the the caller-runs saturation policy (set with setRejectedExecutionHandler). This will force the submitting thread to run the runnable, which has the effect of slowing it down and giving the pool threads time to catch up.

To calculate a sensible queue size, lets imagine processing each line takes 1s and there are N threads. So you want the queue to be at least 2N, so that while you stall the submitting thread for long enough to process a line, you don't deprive the other pool threads of work to do.

An alternate approach would be to use a Semaphore with N permits, and require the submitter to acquire a permit before reading a line, and have the pool threads release a permit every time they process a line.

Posts: 3
Registered: Jan 30, 2006 06:29:18 PM
Re: Thread Pool - Blocking
Posted: Jan 31, 2006 02:01:57 PM   in response to: in response to: 's post
Click to report abuse...
Using a Semaphore seems a good idea. But the permits in Semaphore can not be increased; they can only be reduced (i.e. [i]reducePermits[/i]).

Posts: 466
Registered: Dec 12, 2002 04:37:06 PM
Re: Thread Pool - Blocking
Posted: Jan 31, 2006 02:20:37 PM   in response to: in response to: 's post
Click to report abuse...
Permits are effectively created by calling release(). A counting semaphore starts with a permit count specified in the constructor. Each successful acquire() operation decrements the counter; each release() increments is. An acquire() cannot proceed if the counter is at zero, in which case it will block until it can proceed. So this should do what you want.

Posts: 3
Registered: Jan 30, 2006 06:29:18 PM
Re: Thread Pool - Blocking
Posted: Jan 31, 2006 03:08:44 PM   in response to: in response to: 's post
Click to report abuse...
Pardon my fuzzy description.

What I really meant was it was not possible to increase the initial permit (i.e. maximum number of permits), which was passed into the constructor, during runtime.
codeSemaphore s = new Semaphore(10); // this max. number of permits (10) can not be resized during runtime[/code]

Posts: 466
Registered: Dec 12, 2002 04:37:06 PM
Re: Thread Pool - Blocking
Posted: Jan 31, 2006 05:14:24 PM   in response to: in response to: 's post
Click to report abuse...
Also not true. Releasing creates a permit; you can say

Semaphore s = new Semaphore(10);
s.release();

and now it goes up to 11.

Semaphores are not like locks; it does't keep track of who has the permits or even how many there were.
 Tags
Help

Use the search field to find all types of content in My developerWorks with that tag.

Use the slider bar to see more or fewer tags.

Popular tags shows the top tags for this particular type of content or application that you're viewing.

My tags shows your tags for this particular type of content or application that you're viewing.

 

MoreLess 


Point your RSS reader here for a feed of the latest messages in all forums