如何将输出流转换为输入流?

我处于开发阶段,在那里我有两个模块,其中一个模块输出为OutputStream,第二个模块只接受InputStream。你知道如何将OutputStream转换为InputStream(不是反过来,我的意思是真的这样),我将能够连接这两个部分吗?

谢谢

416783 次浏览

OutputStream是你写入数据的地方。如果某个模块公开了OutputStream,则期望在另一端有数据读取。

另一方面,暴露InputStream的东西表明你需要监听这个流,并且会有你可以读取的数据。

因此,可以将InputStream连接到OutputStream

InputStream----read---> intermediateBytes[n] ----write----> OutputStream

正如有人提到的,这就是IOUtils中的copy()方法让你做的事情。相反的方向是没有意义的。希望这能让你们理解

更新:

当然,我越想这一点,就越能看出这实际上是一个要求。我知道有些评论提到了Piped输入/输出流,但还有另一种可能性。

如果公开的输出流是ByteArrayOutputStream,那么您总是可以通过调用toByteArray()方法来获得完整的内容。然后你可以使用ByteArrayInputStream子类创建一个输入流包装器。这两个是伪流,它们基本上都只是包装一个字节数组。因此,以这种方式使用流在技术上是可行的,但对我来说还是很奇怪……

你需要一个中间类来缓冲。每次调用InputStream.read(byte[]...)时,缓冲类将用从OutputStream.write(byte[]...)传入的下一个块填充传入的字节数组。由于块的大小可能不相同,适配器类需要存储一定数量的块,直到它有足够的容量填满读缓冲区和/或能够存储任何缓冲区溢出。

这篇文章很好地分解了解决这个问题的几种不同方法:

http://blog.ostermiller.org/convert-java-outputstream-inputstream

easystream开源库直接支持将OutputStream转换为InputStream

// create conversion
final OutputStreamToInputStream<Void> out = new OutputStreamToInputStream<Void>() {
@Override
protected Void doRead(final InputStream in) throws Exception {
LibraryClass2.processDataFromInputStream(in);
return null;
}
};
try {
LibraryClass1.writeDataToTheOutputStream(out);
} finally {
// don't miss the close (or a thread would not terminate correctly).
out.close();
}

它们还列出了其他选项:http://io-tools.sourceforge.net/easystream/outputstream_to_inputstream/implementations.html

  • 将数据写入内存缓冲区(ByteArrayOutputStream),获取byteArray并使用ByteArrayInputStream再次读取它。如果您确定数据适合内存,这是最好的方法。
  • 将数据复制到一个临时文件并读回来。
  • 使用管道:这是内存使用和速度的最佳方法(您可以充分利用多核处理器),也是Sun提供的标准解决方案。
  • 使用easystream库中的InputStreamFromOutputStream和OutputStreamToInputStream。

如果你想从一个InputStream生成一个OutputStream,有一个基本的问题。写入OutputStream的方法会阻塞,直到完成为止。因此,当编写方法完成时,结果是可用的。这有两个后果:

  1. 如果只使用一个线程,则需要等待所有内容写入(因此需要将流数据存储在内存或磁盘中)。
  2. 如果希望在数据完成之前访问数据,则需要第二个线程。
变体1可以使用字节数组或字段实现。 变体1可以使用pipies实现(直接或额外的抽象-例如RingBuffer或来自其他注释的谷歌库)

事实上,在标准java中,没有其他方法可以解决这个问题。每个解决方案都是其中一个的实现。

有一个概念叫做“continuation”(详见维基百科)。在这种情况下,这基本上意味着:

  • 有一个特殊的输出流,它需要一定数量的数据
  • 如果达到数量,则流将控制权交给对应的特殊输入流
  • 输入流在读取数据之前提供可用的数据量,在读取之后,它将控制传递回输出流

虽然有些语言内置了这个概念,但对于java,您需要一些“魔法”。例如apache中的“commons-javaflow”实现了这样的java。缺点是这需要在构建时进行一些特殊的字节码修改。因此,将所有的东西都放在一个带有自定义构建脚本的额外库中是有意义的。

老帖子,但可能会帮助别人,使用这种方式:

OutputStream out = new ByteArrayOutputStream();
...
out.write();
...
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toString().getBytes()));

由于输入和输出流只是起点和终点,解决方案是将数据临时存储在字节数组中。所以你必须创建中间的ByteArrayOutputStream,从中创建byte[],用作新的ByteArrayInputStream的输入。

public void doTwoThingsWithStream(InputStream inStream, OutputStream outStream){
//create temporary bayte array output stream
ByteArrayOutputStream baos = new ByteArrayOutputStream();
doFirstThing(inStream, baos);
//create input stream from baos
InputStream isFromFirstData = new ByteArrayInputStream(baos.toByteArray());
doSecondThing(isFromFirstData, outStream);
}

希望能有所帮助。

似乎有许多链接和其他类似的东西,但没有使用管道的实际代码。使用java.io.PipedInputStreamjava.io.PipedOutputStream的优点是不会额外消耗内存。ByteArrayOutputStream.toByteArray()返回原始缓冲区的副本,所以这意味着无论你在内存中有什么,你现在有两个副本。然后写入InputStream意味着你现在有了三份数据副本。

使用lambdas的代码(从评论中向@John Manko致敬):

PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
// in a background thread, write the given output stream to the
// PipedOutputStream for consumption
new Thread(() -> {originalOutputStream.writeTo(out);}).start();

@John Manko注意到的一件事是,在某些情况下,当你无法控制OutputStream对象的创建时,你可能会在创建者过早地清理OutputStream对象的情况下结束。如果你正在获取ClosedPipeException,那么你应该尝试反向构造函数:

PipedInputStream in = new PipedInputStream(out);
new Thread(() -> {originalOutputStream.writeTo(out);}).start();

注意,您也可以为下面的示例反转构造函数。

也感谢@AlexK纠正我开始一个Thread,而不是仅仅开始一个Runnable


使用try-with-resources的代码:

// take the copy of the stream and re-write it to an InputStream
PipedInputStream in = new PipedInputStream();
new Thread(new Runnable() {
public void run () {
// try-with-resources here
// putting the try block outside the Thread will cause the
// PipedOutputStream resource to close before the Runnable finishes
try (final PipedOutputStream out = new PipedOutputStream(in)) {
// write the original OutputStream to the PipedOutputStream
// note that in order for the below method to work, you need
// to ensure that the data has finished writing to the
// ByteArrayOutputStream
originalByteArrayOutputStream.writeTo(out);
}
catch (IOException e) {
// logging and exception handling should go here
}
}
}).start();

我写的原始代码:

// take the copy of the stream and re-write it to an InputStream
PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
new Thread(new Runnable() {
public void run () {
try {
// write the original OutputStream to the PipedOutputStream
// note that in order for the below method to work, you need
// to ensure that the data has finished writing to the
// ByteArrayOutputStream
originalByteArrayOutputStream.writeTo(out);
}
catch (IOException e) {
// logging and exception handling should go here
}
finally {
// close the PipedOutputStream here because we're done writing data
// once this thread has completed its run
if (out != null) {
// close the PipedOutputStream cleanly
out.close();
}
}
}
}).start();

这段代码假设originalByteArrayOutputStream 是一个ByteArrayOutputStream,因为它通常是唯一可用的输出流,除非你写入一个文件。我希望这能有所帮助!这样做的好处是,因为它在一个单独的线程中,所以它也是并行工作的,所以无论消耗你的输入流的是什么,它也会从你的旧输出流中流出。这是有益的,因为缓冲区可以保持更小,您将有更少的延迟和内存使用。

如果你没有ByteArrayOutputStream,那么你将不得不使用java.io.OutputStream类中的write()方法之一,或者子类中可用的其他方法之一,而不是使用writeTo()

我在将ByteArrayOutputStream转换为ByteArrayInputStream时遇到了同样的问题,并通过使用ByteArrayOutputStream的派生类来解决这个问题,该派生类能够返回一个用ByteArrayOutputStream的内部缓冲区初始化的ByteArrayInputStream。这种方式不会使用额外的内存,而且“转换”非常快:

package info.whitebyte.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;


/**
* This class extends the ByteArrayOutputStream by
* providing a method that returns a new ByteArrayInputStream
* which uses the internal byte array buffer. This buffer
* is not copied, so no additional memory is used. After
* creating the ByteArrayInputStream the instance of the
* ByteArrayInOutStream can not be used anymore.
* <p>
* The ByteArrayInputStream can be retrieved using <code>getInputStream()</code>.
* @author Nick Russler
*/
public class ByteArrayInOutStream extends ByteArrayOutputStream {
/**
* Creates a new ByteArrayInOutStream. The buffer capacity is
* initially 32 bytes, though its size increases if necessary.
*/
public ByteArrayInOutStream() {
super();
}


/**
* Creates a new ByteArrayInOutStream, with a buffer capacity of
* the specified size, in bytes.
*
* @param   size   the initial size.
* @exception  IllegalArgumentException if size is negative.
*/
public ByteArrayInOutStream(int size) {
super(size);
}


/**
* Creates a new ByteArrayInputStream that uses the internal byte array buffer
* of this ByteArrayInOutStream instance as its buffer array. The initial value
* of pos is set to zero and the initial value of count is the number of bytes
* that can be read from the byte array. The buffer array is not copied. This
* instance of ByteArrayInOutStream can not be used anymore after calling this
* method.
* @return the ByteArrayInputStream instance
*/
public ByteArrayInputStream getInputStream() {
// create new ByteArrayInputStream that respects the current count
ByteArrayInputStream in = new ByteArrayInputStream(this.buf, 0, this.count);


// set the buffer of the ByteArrayOutputStream
// to null so it can't be altered anymore
this.buf = null;


return in;
}
}

我把东西放在github: https://github.com/nickrussler/ByteArrayInOutStream

ByteArrayOutputStream buffer = (ByteArrayOutputStream) aOutputStream;
byte[] bytes = buffer.toByteArray();
InputStream inputStream = new ByteArrayInputStream(bytes);

虽然不能将输出流转换为输入流,但java提供了一种使用PipedOutputStream和PipedInputStream的方法,您可以将数据写入PipedOutputStream,从而通过相关的PipedInputStream变得可用。以前我在处理第三方库时遇到过类似的情况,这些库需要将InputStream实例而不是OutputStream实例传递给它们。我修复这个问题的方法是使用PipedInputStream和PipedOutputStream。顺便说一下,它们使用起来很棘手,你必须使用多线程来实现你想要的。我最近在github上发布了一个你可以使用的实现。这里是链接。你可以通过维基来了解如何使用它。

在我看来,java.io. pipedinputstream /java.io. pipedinputstreamPipedOutputStream是最好的选择。在某些情况下,你可能想使用ByteArrayInputStream/ByteArrayOutputStream。问题是您需要复制缓冲区来将ByteArrayOutputStream转换为ByteArrayInputStream。ByteArrayOutpuStream/ByteArrayInputStream限制为2GB。以下是我编写的一个OutpuStream/InputStream实现,它绕过了ByteArrayOutputStream/ByteArrayInputStream的限制(Scala代码,但对于java开发人员来说很容易理解):

import java.io.{IOException, InputStream, OutputStream}


import scala.annotation.tailrec


/** Acts as a replacement for ByteArrayOutputStream
*
*/
class HugeMemoryOutputStream(capacity: Long) extends OutputStream {
private val PAGE_SIZE: Int = 1024000
private val ALLOC_STEP: Int = 1024


/** Pages array
*
*/
private var streamBuffers: Array[Array[Byte]] = Array.empty[Array[Byte]]


/** Allocated pages count
*
*/
private var pageCount: Int = 0


/** Allocated bytes count
*
*/
private var allocatedBytes: Long = 0


/** Current position in stream
*
*/
private var position: Long = 0


/** Stream length
*
*/
private var length: Long = 0


allocSpaceIfNeeded(capacity)


/** Gets page count based on given length
*
* @param length   Buffer length
* @return         Page count to hold the specified amount of data
*/
private def getPageCount(length: Long) = {
var pageCount = (length / PAGE_SIZE).toInt + 1


if ((length % PAGE_SIZE) == 0) {
pageCount -= 1
}


pageCount
}


/** Extends pages array
*
*/
private def extendPages(): Unit = {
if (streamBuffers.isEmpty) {
streamBuffers = new Array[Array[Byte]](ALLOC_STEP)
}
else {
val newStreamBuffers = new Array[Array[Byte]](streamBuffers.length + ALLOC_STEP)
Array.copy(streamBuffers, 0, newStreamBuffers, 0, streamBuffers.length)
streamBuffers = newStreamBuffers
}


pageCount = streamBuffers.length
}


/** Ensures buffers are bug enough to hold specified amount of data
*
* @param value  Amount of data
*/
private def allocSpaceIfNeeded(value: Long): Unit = {
@tailrec
def allocSpaceIfNeededIter(value: Long): Unit = {
val currentPageCount = getPageCount(allocatedBytes)
val neededPageCount = getPageCount(value)


if (currentPageCount < neededPageCount) {
if (currentPageCount == pageCount) extendPages()


streamBuffers(currentPageCount) = new Array[Byte](PAGE_SIZE)
allocatedBytes = (currentPageCount + 1).toLong * PAGE_SIZE


allocSpaceIfNeededIter(value)
}
}


if (value < 0) throw new Error("AllocSpaceIfNeeded < 0")
if (value > 0) {
allocSpaceIfNeededIter(value)


length = Math.max(value, length)
if (position > length) position = length
}
}


/**
* Writes the specified byte to this output stream. The general
* contract for <code>write</code> is that one byte is written
* to the output stream. The byte to be written is the eight
* low-order bits of the argument <code>b</code>. The 24
* high-order bits of <code>b</code> are ignored.
* <p>
* Subclasses of <code>OutputStream</code> must provide an
* implementation for this method.
*
* @param      b the <code>byte</code>.
*/
@throws[IOException]
override def write(b: Int): Unit = {
val buffer: Array[Byte] = new Array[Byte](1)


buffer(0) = b.toByte


write(buffer)
}


/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> to this output stream.
* The general contract for <code>write(b, off, len)</code> is that
* some of the bytes in the array <code>b</code> are written to the
* output stream in order; element <code>b[off]</code> is the first
* byte written and <code>b[off+len-1]</code> is the last byte written
* by this operation.
* <p>
* The <code>write</code> method of <code>OutputStream</code> calls
* the write method of one argument on each of the bytes to be
* written out. Subclasses are encouraged to override this method and
* provide a more efficient implementation.
* <p>
* If <code>b</code> is <code>null</code>, a
* <code>NullPointerException</code> is thrown.
* <p>
* If <code>off</code> is negative, or <code>len</code> is negative, or
* <code>off+len</code> is greater than the length of the array
* <code>b</code>, then an <tt>IndexOutOfBoundsException</tt> is thrown.
*
* @param      b   the data.
* @param      off the start offset in the data.
* @param      len the number of bytes to write.
*/
@throws[IOException]
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
@tailrec
def writeIter(b: Array[Byte], off: Int, len: Int): Unit = {
val currentPage: Int = (position / PAGE_SIZE).toInt
val currentOffset: Int = (position % PAGE_SIZE).toInt


if (len != 0) {
val currentLength: Int = Math.min(PAGE_SIZE - currentOffset, len)
Array.copy(b, off, streamBuffers(currentPage), currentOffset, currentLength)


position += currentLength


writeIter(b, off + currentLength, len - currentLength)
}
}


allocSpaceIfNeeded(position + len)
writeIter(b, off, len)
}


/** Gets an InputStream that points to HugeMemoryOutputStream buffer
*
* @return InputStream
*/
def asInputStream(): InputStream = {
new HugeMemoryInputStream(streamBuffers, length)
}


private class HugeMemoryInputStream(streamBuffers: Array[Array[Byte]], val length: Long) extends InputStream {
/** Current position in stream
*
*/
private var position: Long = 0


/**
* Reads the next byte of data from the input stream. The value byte is
* returned as an <code>int</code> in the range <code>0</code> to
* <code>255</code>. If no byte is available because the end of the stream
* has been reached, the value <code>-1</code> is returned. This method
* blocks until input data is available, the end of the stream is detected,
* or an exception is thrown.
*
* <p> A subclass must provide an implementation of this method.
*
* @return the next byte of data, or <code>-1</code> if the end of the
*         stream is reached.
*/
@throws[IOException]
def read: Int = {
val buffer: Array[Byte] = new Array[Byte](1)


if (read(buffer) == 0) throw new Error("End of stream")
else buffer(0)
}


/**
* Reads up to <code>len</code> bytes of data from the input stream into
* an array of bytes.  An attempt is made to read as many as
* <code>len</code> bytes, but a smaller number may be read.
* The number of bytes actually read is returned as an integer.
*
* <p> This method blocks until input data is available, end of file is
* detected, or an exception is thrown.
*
* <p> If <code>len</code> is zero, then no bytes are read and
* <code>0</code> is returned; otherwise, there is an attempt to read at
* least one byte. If no byte is available because the stream is at end of
* file, the value <code>-1</code> is returned; otherwise, at least one
* byte is read and stored into <code>b</code>.
*
* <p> The first byte read is stored into element <code>b[off]</code>, the
* next one into <code>b[off+1]</code>, and so on. The number of bytes read
* is, at most, equal to <code>len</code>. Let <i>k</i> be the number of
* bytes actually read; these bytes will be stored in elements
* <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>,
* leaving elements <code>b[off+</code><i>k</i><code>]</code> through
* <code>b[off+len-1]</code> unaffected.
*
* <p> In every case, elements <code>b[0]</code> through
* <code>b[off]</code> and elements <code>b[off+len]</code> through
* <code>b[b.length-1]</code> are unaffected.
*
* <p> The <code>read(b,</code> <code>off,</code> <code>len)</code> method
* for class <code>InputStream</code> simply calls the method
* <code>read()</code> repeatedly. If the first such call results in an
* <code>IOException</code>, that exception is returned from the call to
* the <code>read(b,</code> <code>off,</code> <code>len)</code> method.  If
* any subsequent call to <code>read()</code> results in a
* <code>IOException</code>, the exception is caught and treated as if it
* were end of file; the bytes read up to that point are stored into
* <code>b</code> and the number of bytes read before the exception
* occurred is returned. The default implementation of this method blocks
* until the requested amount of input data <code>len</code> has been read,
* end of file is detected, or an exception is thrown. Subclasses are encouraged
* to provide a more efficient implementation of this method.
*
* @param      b   the buffer into which the data is read.
* @param      off the start offset in array <code>b</code>
*                 at which the data is written.
* @param      len the maximum number of bytes to read.
* @return the total number of bytes read into the buffer, or
*         <code>-1</code> if there is no more data because the end of
*         the stream has been reached.
* @see java.io.InputStream#read()
*/
@throws[IOException]
override def read(b: Array[Byte], off: Int, len: Int): Int = {
@tailrec
def readIter(acc: Int, b: Array[Byte], off: Int, len: Int): Int = {
val currentPage: Int = (position / PAGE_SIZE).toInt
val currentOffset: Int = (position % PAGE_SIZE).toInt


val count: Int = Math.min(len, length - position).toInt


if (count == 0 || position >= length) acc
else {
val currentLength = Math.min(PAGE_SIZE - currentOffset, count)
Array.copy(streamBuffers(currentPage), currentOffset, b, off, currentLength)


position += currentLength


readIter(acc + currentLength, b, off + currentLength, len - currentLength)
}
}


readIter(0, b, off, len)
}


/**
* Skips over and discards <code>n</code> bytes of data from this input
* stream. The <code>skip</code> method may, for a variety of reasons, end
* up skipping over some smaller number of bytes, possibly <code>0</code>.
* This may result from any of a number of conditions; reaching end of file
* before <code>n</code> bytes have been skipped is only one possibility.
* The actual number of bytes skipped is returned. If <code>n</code> is
* negative, the <code>skip</code> method for class <code>InputStream</code> always
* returns 0, and no bytes are skipped. Subclasses may handle the negative
* value differently.
*
* The <code>skip</code> method of this class creates a
* byte array and then repeatedly reads into it until <code>n</code> bytes
* have been read or the end of the stream has been reached. Subclasses are
* encouraged to provide a more efficient implementation of this method.
* For instance, the implementation may depend on the ability to seek.
*
* @param      n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
*/
@throws[IOException]
override def skip(n: Long): Long = {
if (n < 0) 0
else {
position = Math.min(position + n, length)
length - position
}
}
}
}

使用方便,无缓冲区重复,无2GB内存限制

val out: HugeMemoryOutputStream = new HugeMemoryOutputStream(initialCapacity /*may be 0*/)


out.write(...)
...


val in1: InputStream = out.asInputStream()


in1.read(...)
...


val in2: InputStream = out.asInputStream()


in2.read(...)
...

< em > io-extras < / em >库可能有用。例如,如果你想使用GZIPOutputStream gzip一个InputStream,并且你希望它发生同步(使用默认缓冲区大小8192):

InputStream is = ...
InputStream gz = IOUtil.pipe(is, o -> new GZIPOutputStream(o));

请注意,该库具有100%的单元测试覆盖率(当然,这是值得的!),并且位于Maven Central上。Maven依赖项是:

<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>io-extras</artifactId>
<version>0.1</version>
</dependency>

一定要查看更新的版本。