内容


彻底转变流,第 2 部分:优化 Java 内部 I/O

替换字节数组流和管道流

Comments

本系列的第一篇文章中,您学习了解决从只能写出数据的源读取数据的问题的一些不同方法。在可能的解决方案中,我们研究了怎样使用字节数组流、管道流以及直接处理该问题的定制框架。定制方法显然是最有效率的解决方案;但是,分析其它几种方法有助于看清标准 Java 流的一些问题。具体地说,字节数组输出流并不提供可提供对它的内容进行只读访问的高效机制,管道流的性能通常很差。

为了处理这些问题,我们将在本文中实现功能同样齐全的替换类,但在实现时更强调性能。让我们先来简要地讨论一下同步问题,因为它与 I/O 流有关。

同步问题

一般来说,我推荐在不是特别需要同步的情况下避免不必要地使用同步。显然,如果多个线程需并发地访问一个类,那么这个类需确保线程安全。但是,在许多情况下并不需要并发的访问,同步成了不必要的开销。例如,对流的并发访问自然是不确定的 ― 您无法预测哪些数据被先写入,也无法预测哪个线程读了哪些数据 ― 也就是说,在多数情况下,对流的并发访问是没用的。所以,对所有的流强制同步是不提供实际好处的花费。如果某个应用程序要求线程安全,那么通过应用程序自己的同步原语可以强制线程安全。

事实上,Collection 类的 API 作出了同样的选择:在缺省的情况下,set、list 等等都不是线程安全的。如果应用程序想使用线程安全的 Collection,那么它可以使用 Collections 类来创建一个线程安全的包装器来包装非线程安全的 Collection。如果这种作法是有用的,那么应用程序可以使用完全相同的机制来包装流,以使它线程安全;例如, OutputStream out = Streams.synchronizedOutputStream (byteStream) 。请参阅附带的 源代码中的 Streams 类,这是一个实现的示例。

所以,对于我所认为的多个并发线程无法使用的类,我没用同步来为这些类提供线程安全。在您广泛采用这种方式前,我推荐您研究一下 Java 语言规范(Java Language Specification)的 Threads and Locks那一章(请参阅 参考资料),以理解潜在的缺陷;具体地说,在未使用同步的情况下无法确保读写的顺序,所以,对不同步的只读方法的并发访问可能导致意外的行为,尽管这种访问看起来是无害的。

更好的字节数组输出流

当您需要把未知容量的数据转储到内存缓冲区时, ByteArrayOutputStream 类是使用效果很好的流。当我为以后再次读取而存储一些数据时,我经常使用这个类。但是,使用 toByteArray() 方法来取得对结果数据的读访问是很低效的,因为它实际返回的是内部字节数组的副本。对于小容量的数据,使用这种方式不会有太大问题;然而,随着容量增大,这种方式的效率被不必要地降低了。这个类必须复制数据,因为它不能强制对结果字节数组进行只读访问。如果它返回它的内部缓冲区,那么在一般的情况下,接收方无法保证该缓冲区未被同一数组的另一个接收方并发地修改。

StringBuffer 类已解决了类似的问题;它提供可写的字符缓冲区,它还支持高效地返回能从内部字符数组直接读取的只读 String 。因为 StringBuffer 类控制着对它的内部数组的写访问,所以它仅在必要时才复制它的数组;也就是说,当它导出了 String 且后来调用程序修改了 StringBuffer 的时候。如果没有发生这样的修改,那么任何不必要的复制都不会被执行。通过支持能够强制适当的访问控制的字节数组的包装器,新的 I/O 框架以类似的方式解决了这个问题。

我们可以使用相同的通用机制为需要使用标准流 API 的应用程序提供高效的数据缓冲和再次读取。我们的示例给出了可替代 ByteArrayOutputStream 类的类,它能高效地导出对内部缓冲区的只读访问,方法是返回直接读取内部字节数组的只读 InputStream

我们来看一下代码。清单 1 中的构造函数分配了初始缓冲区,以存储写到这个流的数据。为了存储更多的数据,该缓冲区将按需自动地扩展。

清单 1. 不同步的字节数组输出流
package org.merlin.io;
import java.io.*;
/**
 * An unsynchronized ByteArrayOutputStream alternative that efficiently
 * provides read-only access to the internal byte array with no 
 * unnecessary copying.
 *
 * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
 */
public class BytesOutputStream extends OutputStream {
  private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192;
  
  // internal buffer
  private byte[] buffer;
  private int index, capacity;
  // is the stream closed?
  private boolean closed;
  // is the buffer shared?
  private boolean shared;
  
  public BytesOutputStream () {
    this (DEFAULT_INITIAL_BUFFER_SIZE);
  }
  public BytesOutputStream (int initialBufferSize) {
    capacity = initialBufferSize;
    buffer = new byte[capacity];
  }

清单 2 显示的是写方法。这些方法按需扩展内部缓冲区,然后把新数据复制进来。在扩展内部缓冲区时,我们使缓冲区的大小增加了一倍再加上存储新数据所需的容量;这样,为了存储任何所需的数据,缓冲区的容量成指数地增长。为了提高效率,如果您知道您将写入的数据的预期容量,那么您应该指定相应的初始缓冲区的大小。 close() 方法只是设置了一个合适的标志。

清单 2. 写方法
  public void write (int datum) throws IOException {
    if (closed) {
      throw new IOException ("Stream closed");
    } else {
      if (index >= capacity) {
        // expand the internal buffer
        capacity = capacity * 2 + 1;
        byte[] tmp = new byte[capacity];
        System.arraycopy (buffer, 0, tmp, 0, index);
        buffer = tmp;
        // the new buffer is not shared
        shared = false;
      }
      // store the byte
      buffer[index ++] = (byte) datum;
    }
  }
  public void write (byte[] data, int offset, int length) 
    throws IOException {
    if (data == null) {
      throw new NullPointerException ();
    } else if ((offset < 0) || (offset + length > data.length) 
        || (length < 0)) {
      throw new IndexOutOfBoundsException ();
    } else if (closed) {
      throw new IOException ("Stream closed");
    } else {
      if (index + length > capacity) {
        // expand the internal buffer
        capacity = capacity * 2 + length;
        byte[] tmp = new byte[capacity];
        System.arraycopy (buffer, 0, tmp, 0, index);
        buffer = tmp;
        // the new buffer is not shared
        shared = false;
      }
      // copy in the subarray
      System.arraycopy (data, offset, buffer, index, length);
      index += length;
    }
  }
  public void close () {
    closed = true;
  }

清单 3 中的字节数组抽取方法返回内部字节数组的副本。因为我们无法防止调用程序把数据写到结果数组,所以我们无法安全地返回对内部缓冲区的直接引用。

清单 3. 转换成字节数组
  public byte[] toByteArray () {
    // return a copy of the internal buffer
    byte[] result = new byte[index];
    System.arraycopy (buffer, 0, result, 0, index);
    return result;
  }

当方法提供对存储的数据的只读访问的时候,它们可以安全地高效地直接使用内部字节数组。清单 4 显示了两个这样的方法。 writeTo() 方法把这个流的内容写到输出流;它直接从内部缓冲区进行写操作。 toInputStream() 方法返回了可被高效地读取数据的输入流。它所返回的 BytesInputStream (这是 ByteArrayInputStream 的非同步替代品。)能直接从我们的内部字节数组读取数据。在这个方法中,我们还设置了标志,以表示内部缓冲区正被输入流共享。这一点很重要,因为这样做可以防止在内部缓冲区正被共享时这个流被修改。

清单 4. 只读访问方法
  public void writeTo (OutputStream out) throws IOException {
    // write the internal buffer directly
    out.write (buffer, 0, index);
  }
  public InputStream toInputStream () {
    // return a stream reading from the shared internal buffer
    shared = true;
    return new BytesInputStream (buffer, 0, index);
  }

可能会覆盖共享数据的唯一的一个方法是显示在清单 5 中的 reset() 方法,该方法清空了这个流。所以,如果 shared 等于 true 且 reset() 被调用,那么我们创建新的内部缓冲区,而不是重新设置写索引。

清单 5. 重新设置流
  public void reset () throws IOException {
    if (closed) {
      throw new IOException ("Stream closed");
    } else {
      if (shared) {
        // create a new buffer if it is shared
        buffer = new byte[capacity];
        shared = false;
      }
      // reset index
      index = 0;
    }
  }
}

更好的字节数组输入流

ByteArrayInputStream 类来提供对内存中的二进制数据基于流的读访问是很理想的。但是,有时候,它的两个设计特点使我觉得需要一个替代它的类。第一,这个类是同步的;我已讲过,对于多数应用程序来说没有这个必要。第二,如果在执行 mark() 前调用它所实现的 reset() 方法,那么 reset() 将忽略初始读偏移。这两点都不是缺陷;但是,它们不一定总是人们所期望的。

清单 6 中的 BytesInputStream 类是不同步的较为普通的字节数组输入流类。

清单 6. 不同步的字节数组输入流
package org.merlin.io;
import java.io.*;
/**
 * An unsynchronized ByteArrayInputStream alternative.
 *
 * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
 */
public class BytesInputStream extends InputStream {
  // buffer from which to read
  private byte[] buffer;
  private int index, limit, mark;
  // is the stream closed?
  private boolean closed;
  
  public BytesInputStream (byte[] data) {
    this (data, 0, data.length);
  }
  public BytesInputStream (byte[] data, int offset, int length) {
    if (data == null) {
      throw new NullPointerException ();
    } else if ((offset < 0) || (offset + length > data.length) 
        || (length < 0)) {
      throw new IndexOutOfBoundsException ();
    } else {
      buffer = data;
      index = offset;
      limit = offset + length;
      mark = offset;
    }
  }
  public int read () throws IOException {
    if (closed) {
      throw new IOException ("Stream closed");
    } else if (index >= limit) {
      return -1; // EOF
    } else {
      return buffer[index ++] & 0xff;
    }
  }
  public int read (byte data[], int offset, int length) 
      throws IOException {
    if (data == null) {
      throw new NullPointerException ();
    } else if ((offset < 0) || (offset + length > data.length) 
        || (length < 0)) {
      throw new IndexOutOfBoundsException ();
    } else if (closed) {
      throw new IOException ("Stream closed");
    } else if (index >= limit) {
      return -1; // EOF
    } else {
      // restrict length to available data
      if (length > limit - index)
        length = limit - index;
      // copy out the subarray
      System.arraycopy (buffer, index, data, offset, length);
      index += length;
      return length;
    }
  }
  public long skip (long amount) throws IOException {
    if (closed) {
      throw new IOException ("Stream closed");
    } else if (amount <= 0) {
      return 0;
    } else {
      // restrict amount to available data
      if (amount > limit - index)
        amount = limit - index;
      index += (int) amount;
      return amount;
    }
  }
  public int available () throws IOException {
    if (closed) {
      throw new IOException ("Stream closed");
    } else {
      return limit - index;
    }
  }
  
  public void close () {
    closed = true;
  }
  public void mark (int readLimit) {
    mark = index;
  }
  
  public void reset () throws IOException {
    if (closed) {
      throw new IOException ("Stream closed");
    } else {
      // reset index
      index = mark;
    }
  }
  
  public boolean markSupported () {
    return true;
  }
}

使用新的字节数组流

清单 7 中的代码演示了怎样使用新的字节数组流来解决第一篇文章中处理的问题(读一些压缩形式的数据):

清单 7. 使用新的字节数组流
public static InputStream newBruteForceCompress (InputStream in) 
    throws IOException {
  BytesOutputStream sink = new BytesOutputStream ();
  OutputStream out = new GZIPOutputStream (sink);
  Streams.io (in, out);
  out.close ();
  return sink.toInputStream ();
}

更好的管道流

虽然标准的管道流既安全又可靠,但在性能方面不能令人满意。几个因素导致了它的性能问题:

  • 对于不同的使用情况,大小为 1024 字节的内部缓冲区并不都适用;对于大容量的数据,该缓冲区太小了。
  • 基于数组的操作只是反复调用低效的一个字节一个字节地复制操作。该操作本身是同步的,从而导致非常严重的锁争用。
  • 如果管道变空或变满而在这种状态改变时一个线程阻塞了,那么,即使仅有一个字节被读或写,该线程也被唤醒。在许多情况下,线程将使用这一个字节并立即再次阻塞,这将导致只做了很少有用的工作。

最后一个因素是 API 提供的严格的约定的后果。对于最通用的可能的应用程序中使用的流来说,这种严格的约定是必要的。但是,对于管道流实现,提供一种更宽松的约定是可能的,这个约定牺牲严格性以换取性能的提高:

  • 仅当缓冲区的可用数据(对阻塞的读程序而言)或可用空间(对写程序而言)达到指定的某个 滞后阈值或发生异常事件(例如管道关闭)时,阻塞的读程序和写程序才被唤醒。这将提高性能,因为仅当线程能完成适度的工作量时它们才被唤醒。
  • 只有一个线程可以从管道读取数据,只有一个线程可以把数据写到管道。否则,管道无法可靠地确定读程序线程或写程序线程何时意外死亡。

这个约定可完全适合典型应用程序情形中独立的读程序线程和写程序线程;需要立即唤醒的应用程序可以使用零滞后级别。我们将在后面看到,这个约定的实现的操作速度比标准 API 流的速度快两个数量级(100 倍)。

我们可以使用几个可能的 API 中的一个来开发这些管道流:我们可以模仿标准类,显式地连接两个流;我们也可以开发一个 Pipe 类并从这个类抽取输出流和输入流。我们不使用这两种方式而是使用更简单的方式:创建一个 PipeInputStream ,然后抽取关联的输出流。

这些流的一般操作如下:

  • 我们把内部数组用作环缓冲区(请看图 1):这个数组中维护着一个读索引和一个写索引;数据被写到写索引所指的位置,数据从读索引所指的位置被读取;当两个索引到达缓冲区末尾时,它们回绕到缓冲区起始点。任一个索引不能超越另一个索引。当写索引到达读索引时,管道是满的,不能再写任何数据。当读索引到达写索引时,管道是空的,不能再读任何数据。
  • 同步被用来确保两个协作线程看到管道状态的最新值。Java 语言规范对内存访问的顺序的规定是很宽容的,因此,无法使用无锁缓冲技术。
图 1. 环缓冲区
环缓冲区
环缓冲区

在下面的代码清单中给出的是实现这些管道流的代码。清单 8 显示了这个类所用的构造函数和变量。您可以从这个 InputStream 中抽取相应的 OutputStream (请看清单 17 中的代码)。在构造函数中您可以指定内部缓冲区的大小和滞后级别;这是缓冲区容量的一部分,在相应的读程序线程或写程序线程被立即唤醒前必须被使用或可用。我们维护两个变量, readerwriter ,它们与读程序线程和写程序线程相对应。我们用它们来发现什么时候一个线程已死亡而另一个线程仍在访问流。

清单 8. 一个替代的管道流实现
package org.merlin.io;
import java.io.*;
/**
 * An efficient connected stream pair for communicating between
 * the threads of an application. This provides a less-strict contract
 * than the standard piped streams, resulting in much-improved
 * performance. Also supports non-blocking operation.
 *
 * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
 */
public class PipeInputStream extends InputStream {
  // default values
  private static final int DEFAULT_BUFFER_SIZE = 8192;
  private static final float DEFAULT_HYSTERESIS = 0.75f;
  private static final int DEFAULT_TIMEOUT_MS = 1000;
  
  // flag indicates whether method applies to reader or writer
  private static final boolean READER = false, WRITER = true;
  // internal pipe buffer
  private byte[] buffer;
  // read/write index
  private int readx, writex;
  // pipe capacity, hysteresis level
  private int capacity, level;
  // flags
  private boolean eof, closed, sleeping, nonBlocking;
  // reader/writer thread
  private Thread reader, writer;
  // pending exception
  private IOException exception;
  // deadlock-breaking timeout
  private int timeout = DEFAULT_TIMEOUT_MS;
  
  public PipeInputStream () {
    this (DEFAULT_BUFFER_SIZE, DEFAULT_HYSTERESIS);
  }
  public PipeInputStream (int bufferSize) {
    this (bufferSize, DEFAULT_HYSTERESIS);
  }
  // e.g., hysteresis .75 means sleeping reader/writer is not
  // immediately woken until the buffer is 75% full/empty
  public PipeInputStream (int bufferSize, float hysteresis) {
    if ((hysteresis < 0.0) || (hysteresis > 1.0))
      throw new IllegalArgumentException ("Hysteresis: " + hysteresis);
    capacity = bufferSize;
    buffer = new byte[capacity];
    level = (int) (bufferSize * hysteresis);
  }

清单 9 中的配置方法允许您配置流的超时值和非阻塞模式。超时值的单位是毫秒,它表示阻塞的线程在过了这段时间后将被自动唤醒;这对于打破在一个线程死亡的情况下可能发生的死锁是必要的。在非阻塞模式中,如果线程阻塞,那么 InterruptedIOException 将被抛出。

清单 9. 管道配置
  public void setTimeout (int ms) {
    this.timeout = ms;
  }
  public void setNonBlocking (boolean nonBlocking) {
    this.nonBlocking = nonBlocking;
  }

清单 10 中的读方法都遵循相当标准的模式:如果我们还没有读线程的引用,那么我们先取得它,然后我们验证输入参数,核对流未被关闭或没有异常待处理,确定可以读取多少数据,最后把数据从内部的环缓冲区复制到读程序的缓冲区。清单 12 中的 checkedAvailable() 方法在返回前自动地等待,直到出现一些可用的数据或流被关闭。

清单 10. 读数据
  private byte[] one = new byte[1];
  
  public int read () throws IOException {
    // read 1 byte
    int amount = read (one, 0, 1);
    // return EOF / the byte
    return (amount < 0) ? -1 : one[0] & 0xff;
  }
  
  public synchronized int read (byte data[], int offset, int length) 
      throws IOException {
    // take a reference to the reader thread
    if (reader == null)
      reader = Thread.currentThread ();
    // check parameters
    if (data == null) {
      throw new NullPointerException ();
    } else if ((offset < 0) || (offset + length > data.length) 
        || (length < 0)) { // check indices
      throw new IndexOutOfBoundsException ();
    } else {
      // throw an exception if the stream is closed
      closedCheck ();
      // throw any pending exception
      exceptionCheck ();
      if (length <= 0) {
        return 0;
      } else {
        // wait for some data to become available for reading
        int available = checkedAvailable (READER);
        // return -1 on EOF
        if (available < 0)
          return -1;
        // calculate amount of contiguous data in pipe buffer
        int contiguous = capacity - (readx % capacity);
        // calculate how much we will read this time
        int amount = (length > available) ? available : length;
        if (amount > contiguous) {
          // two array copies needed if data wrap around the buffer end
          System.arraycopy (buffer, readx % capacity, data, offset, 
            contiguous);
          System.arraycopy (buffer, 0, data, offset + contiguous, 
            amount - contiguous);
        } else {
          // otherwise, one array copy needed
          System.arraycopy (buffer, readx % capacity, data, offset, 
            amount);
        }
        // update indices with amount of data read
        processed (READER, amount);
        // return amount read
        return amount;
      }
    }
  }
  public synchronized long skip (long amount) throws IOException {
    // take a reference to the reader thread
    if (reader == null)
      reader = Thread.currentThread ();
    // throw an exception if the stream is closed
    closedCheck ();
    // throw any pending exception
    exceptionCheck ();
    if (amount <= 0) {
      return 0;
    } else {
      // wait for some data to become available for skipping
      int available = checkedAvailable (READER);
      // return 0 on EOF
      if (available < 0)
        return 0;
      // calculate how much we will skip this time
      if (amount > available)
        amount = available;
      // update indices with amount of data skipped
      processed (READER, (int) amount);
      // return amount skipped
      return amount;
    }
  }

当数据从这个管道被读取或数据被写到这个管道时,清单 11 中的方法被调用。该方法更新有关的索引,如果管道达到它的滞后级别,该方法自动地唤醒阻塞的线程。

清单 11. 更新索引
  private void processed (boolean rw, int amount) {
    if (rw == READER) {
      // update read index with amount read
      readx = (readx + amount) % (capacity * 2);
    } else {
      // update write index with amount written
      writex = (writex + amount) % (capacity * 2);
    }
    // check whether a thread is sleeping and we have reached the
    // hysteresis threshold
    if (sleeping && (available (!rw) >= level)) {
      // wake sleeping thread
      notify ();
      sleeping = false;
    }
  }

在管道有可用空间或可用数据(取决于 rw 参数)前,清单 12 中的 checkedAvailable() 方法一直等待,然后把空间的大小或数据的多少返回给调用程序。在这个方法内还核对流未被关闭、管道未被破坏等。

清单 12. 检查可用性
  public synchronized int available () throws IOException {
    // throw an exception if the stream is closed
    closedCheck ();
    // throw any pending exception
    exceptionCheck ();
    // determine how much can be read
    int amount = available (READER);
    // return 0 on EOF, otherwise the amount readable
    return (amount < 0) ? 0 : amount;
  }
  
  private int checkedAvailable (boolean rw) throws IOException {
    // always called from synchronized(this) method
    try {
      int available;
      // loop while no data can be read/written
      while ((available = available (rw)) == 0) {
        if (rw == READER) { // reader
          // throw any pending exception
          exceptionCheck ();
        } else { // writer
          // throw an exception if the stream is closed
          closedCheck ();
        }
        // throw an exception if the pipe is broken
        brokenCheck (rw);
        if (!nonBlocking) { // blocking mode
          // wake any sleeping thread
          if (sleeping)
            notify ();
          // sleep for timeout ms (in case of peer thread death)
          sleeping = true;
          wait (timeout);
          // timeout means that hysteresis may not be obeyed
        } else { // non-blocking mode
          // throw an InterruptedIOException
          throw new InterruptedIOException 
            ("Pipe " + (rw ? "full" : "empty"));
        }
      }
      return available;
    } catch (InterruptedException ex) {
      // rethrow InterruptedException as InterruptedIOException
      throw new InterruptedIOException (ex.getMessage ());
    }
  }
  private int available (boolean rw) {
    // calculate amount of space used in pipe
    int used = (writex + capacity * 2 - readx) % (capacity * 2);
    if (rw == WRITER) { // writer
      // return amount of space available for writing 
      return capacity - used;
    } else { // reader
      // return amount of data in pipe or -1 at EOF
      return (eof && (used == 0)) ? -1 : used;
    }
  }

清单 13 中的方法关闭这个流;该方法还提供对读程序或写程序关闭流的支持。阻塞的线程被自动唤醒,该方法还检查各种其它情况是否正常。

清单 13. 关闭流
  public void close () throws IOException {
    // close the read end of this pipe
    close (READER);
  }
  private synchronized void close (boolean rw) throws IOException {
    if (rw == READER) { // reader
      // set closed flag
      closed = true;
    } else if (!eof) { // writer
      // set eof flag
      eof = true;
      // check if data remain unread
      if (available (READER) > 0) {
        // throw an exception if the reader has already closed the pipe
        closedCheck ();
        // throw an exception if the reader thread has died
        brokenCheck (WRITER);
      }
    }
    // wake any sleeping thread
    if (sleeping) {
      notify ();
      sleeping = false;
    }
  }

清单 14 中的方法检查这个流的状态。如果有异常待处理,那么流被关闭或管道被破坏(也就是说,读程序线程或写程序线程已死亡),异常被抛出。

清单 14. 检查流状态
  private void exceptionCheck () throws IOException {
    // throw any pending exception
    if (exception != null) {
      IOException ex = exception;
      exception = null;
      throw ex; // could wrap ex in a local exception
    }
  }
  private void closedCheck () throws IOException {
    // throw an exception if the pipe is closed
    if (closed)
      throw new IOException ("Stream closed");
  }
  private void brokenCheck (boolean rw) throws IOException {
    // get a reference to the peer thread
    Thread thread = (rw == WRITER) ? reader : writer;
    // throw an exception if  the peer thread has died
    if ((thread != null) && !thread.isAlive ())
      throw new IOException ("Broken pipe");
  }

当数据被写入这个管道时,清单 15 中的方法被调用。总的来说,它类似于读方法:我们先取得写程序线程的副本,然后检查流是否被关闭,接着进入把数据复制到管道的循环。和前面一样,该方法使用 checkedAvailable() 方法,checkedAvailable() 自动阻塞,直到管道中有可用的容量。

清单 15. 写数据
private synchronized void writeImpl (byte[] data, int offset, int length) 
      throws IOException {
    // take a reference to the writer thread
    if (writer == null)
      writer = Thread.currentThread ();
    // throw an exception if the stream is closed
    if (eof || closed) {
      throw new IOException ("Stream closed");
    } else {
      int written = 0;
      try {
        // loop to write all the data
        do {
          // wait for space to become available for writing
          int available = checkedAvailable (WRITER);
          // calculate amount of contiguous space in pipe buffer
          int contiguous = capacity - (writex % capacity);
          // calculate how much we will write this time
          int amount = (length > available) ? available : length;
          if (amount > contiguous) {
          // two array copies needed if space wraps around the buffer end
            System.arraycopy (data, offset, buffer, writex % capacity, 
              contiguous);
            System.arraycopy (data, offset + contiguous, buffer, 0, 
              amount - contiguous);
          } else {
            // otherwise, one array copy needed
            System.arraycopy (data, offset, buffer, writex % capacity, 
              amount);
          }
          // update indices with amount of data written
          processed (WRITER, amount);
          // update amount written by this method
          written += amount;
        } while (written < length);
        // data successfully written
      } catch (InterruptedIOException ex) {
        // write operation was interrupted; set the bytesTransferred
        // exception field to reflect the amount of data written
        ex.bytesTransferred = written;
        // rethrow exception
        throw ex;
      }
    }
  }

如清单 16 所示,这个管道流实现的特点之一是写程序可设置一个被传递给读程序的异常。

清单 16. 设置异常
  private synchronized void setException (IOException ex) 
      throws IOException {
    // fail if an exception is already pending
    if (exception != null)
      throw new IOException ("Exception already set: " + exception);
    // throw an exception if the pipe is broken
    brokenCheck (WRITER);
    // take a reference to the pending exception
    this.exception = ex;
    // wake any sleeping thread
    if (sleeping) {
      notify ();
      sleeping = false;
    }
  }

清单 17 给出这个管道的有关输出流的代码。 getOutputStream() 方法返回 OutputStreamImpl ,OutputStreamImpl 是使用前面给出的方法来把数据写到内部管道的输出流。OutputStreamImpl 类继承了 OutputStreamEx ,OutputStreamEx 是允许为读线程设置异常的输出流类的扩展。

清单 17. 输出流
  public OutputStreamEx getOutputStream () {
    // return an OutputStreamImpl associated with this pipe
    return new OutputStreamImpl ();
  }
  
  private class OutputStreamImpl extends OutputStreamEx {
    private byte[] one = new byte[1];
    
    public void write (int datum) throws IOException {
      // write one byte using internal array
      one[0] = (byte) datum;
      write (one, 0, 1);
    }
    public void write (byte[] data, int offset, int length) 
        throws IOException {
      // check parameters
      if (data == null) {
        throw new NullPointerException ();
      } else if ((offset < 0) || (offset + length > data.length) 
          || (length < 0)) {
        throw new IndexOutOfBoundsException ();
      } else if (length > 0) {
        // call through to writeImpl()
        PipeInputStream.this.writeImpl (data, offset, length);
      }
    }
    public void close () throws IOException {
      // close the write end of this pipe
      PipeInputStream.this.close (WRITER);
    }
    public void setException (IOException ex) throws IOException {
      // set a pending exception
      PipeInputStream.this.setException (ex);
    }
  }
  // static OutputStream extension with setException() method
  public static abstract class OutputStreamEx extends OutputStream {
    public abstract void setException (IOException ex) throws IOException;
  }
}

使用新的管道流

清单 18 演示了怎样使用新的管道流来解决上一篇文章中的问题。请注意,写程序线程中出现的任何异常均可在流中被传递。

清单 18. 使用新的管道流
public static InputStream newPipedCompress (final InputStream in) 
    throws IOException {
  PipeInputStream source = new PipeInputStream ();
  final PipeInputStream.OutputStreamEx sink = source.getOutputStream ();
  new Thread () {
    public void run () {
      try {
        GZIPOutputStream gzip = new GZIPOutputStream (sink);
        Streams.io (in, gzip);
        gzip.close ();
      } catch (IOException ex) {
        try {
          sink.setException (ex);
        } catch (IOException ignored) {
        }
      }
    }
  }.start ();
  return source;
}

性能结果

在下面的表中显示的是这些新的流和标准流的性能,测试环境是运行 Java 2 SDK,v1.4.0 的 800MHz Linux 机器。性能测试程序与我在上一篇文章中用的相同:

管道流
15KB:21ms;15MB:20675ms
新的管道流
15KB:0.68ms;15MB:158ms
字节数组流
15KB:0.31ms;15MB:745ms
新的字节数组流
15KB:0.26ms;15MB:438ms

与上一篇文章中的性能差异只反映了我的机器中不断变化的环境负载。您可以从这些结果中看到,在大容量数据方面,新的管道流的性能远好于蛮力解决方案;但是,新的管道流的速度仍然只有我们分析的工程解决方案的速度的一半左右。显然,在现代的 Java 虚拟机中使用多个线程的开销远比以前小得多。

结束语

我们分析了两组可替代标准 Java API 的流的流: BytesOutputStreamBytesInputStream 是字节数组流的非同步替代者。因为这些类的预期的用例涉及单个线程的访问,所以不采用同步是合理的选择。实际上,执行时间的缩短(最多可缩短 40%)很可能与同步的消灭没有多大关系;性能得到提高的主要原因是在提供只读访问时避免了不必要的复制。第二个示例 PipeInputStream 可替代管道流;为了减少超过 99% 的执行时间,这个流使用宽松的约定、改进的缓冲区大小和基于数组的操作。在这种情况下无法使用不同步的代码;Java 语言规范排除了可靠地执行这种代码的可能性,否则,在理论上是可以实现最少锁定的管道。

字节数组流和管道流是基于流的应用程序内部通信的主要选择。虽然新的 I/O API 提供了一些其它选择,但是许多应用程序和 API 仍然依赖标准流,而且对于这些特殊用途来说,新的 I/O API 并不一定有更高的效率。通过适当地减少同步的使用、有效地采用基于数组的操作以及最大程度地减少不必要的复制,性能结果得到了很大的提高,从而提供了完全适应标准流框架的更高效的操作。在应用程序开发的其它领域中采用相同的步骤往往能取得类似地性能提升。


相关主题

  • 您可以参阅本文在 developerWorks 全球站点上的 英文原文.
  • 请单击文章顶部或底部的 讨论参与关于本文的 讨论论坛
  • 本系列的第一部分描述了一个框架,这个框架使应用程序能高效地从仅支持把数据写到输出流的源读取数据。
  • 请下载在本文中讨论过的 源代码。在接受 GNU General Public License的条款的情况下,这些代码可被免费地授权使用。
  • 请阅读 J2SE 1.4 指南,了解新的 I/O API。
  • 在 Merlin 系统上, IBM Developer Kit, Java 2 Technology Edition, V1.3运行管道流示例的速度明显快于 Sun J2SE 1.4.0 运行该示例的速度。
  • Java 语言规范中的 Threads and Locks章提供了有关这个难懂的主题的极好的信息。
  • JSR 133正计划进一步阐明 Java 平台的这一方面。
  • Brian Goetz 的 developerWorks系列文章 Threading lightly深刻地分析了并发编程所产生的实际问题,提供更多高明的见解:
    • 第 1 部分“ Synchronization is not the enemy”(2001 年 7 月)讲述了为什么非争用同步对性能的影响并不象许多人所认为的那样大。
    • 第 2 部分“ Reducing contention”(2001 年 9 月)讲述了减小争用同步对程序性能的影响的技术。
    • 第 3 部分“ Sometimes it's best not to share”(2001 年 10 月)分析了 ThreadLocal 并提供了利用它的技巧。
  • developerWorks Java 技术专区,您能找到有关 Java 编程的每个方面的许多文章。

评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Java technology
ArticleID=53424
ArticleTitle=彻底转变流,第 2 部分:优化 Java 内部 I/O
publish-date=12202002