读取输入流两次

如何读取相同的输入流两次? 是否有可能以某种方式复制它?

我需要从网上得到一个图像,保存在本地,然后返回保存的图像。我只是认为使用相同的流会更快,而不是开始一个新的流到下载的内容,然后再读一遍。

138517 次浏览

根据 InputStream 的来源,您可能无法重置它。您可以使用 markSupported()检查是否支持 mark()reset()

如果是,您可以在 InputStream 上调用 reset()返回到开始。如果没有,您需要再次从源代码中读取 InputStream。

如果您使用的是 InputStream的实现,您可以检查 InputStream#markSupported()的结果,它告诉您是否可以使用 mark()/reset()方法。

如果读取时可以标记流,则调用 reset()返回开始。

如果你不能,你将不得不再次打开一条小溪。

另一种解决方案是将 InputStream 转换为字节数组,然后根据需要对数组进行多次迭代。您可以找到几个解决方案在这个后 在 Java 中将 InputStream 转换为字节数组使用第三方库或没有。注意,如果阅读内容太大,您可能会遇到一些记忆问题。

最后,如果你需要阅读图片,那么使用:

BufferedImage image = ImageIO.read(new URL("http://www.example.com/images/toto.jpg"));

使用 ImageIO#read(java.net.URL)还允许您使用缓存。

将 inputstream 转换为字节,然后将其传递到保存文件函数,在该函数中将其组装成 inputstream。 还在原函数中使用字节来完成其他任务

您可以使用 org.apache.commons.io.IOUtils.copy将 InputStream 的内容复制到一个字节数组,然后使用 ByteArrayInputStream 重复读取字节数组。例如:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
org.apache.commons.io.IOUtils.copy(in, baos);
byte[] bytes = baos.toByteArray();


// either
while (needToReadAgain) {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
yourReadMethodHere(bais);
}


// or
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
while (needToReadAgain) {
bais.reset();
yourReadMethodHere(bais);
}

如果您的 InputStream支持使用标记,那么您可以 mark()您的 inputStream,然后 reset()它。如果您的 InputStrem不支持标记,那么您可以使用类 java.io.BufferedInputStream,所以您可以像这样将您的流嵌入到 BufferedInputStream

    InputStream bufferdInputStream = new BufferedInputStream(yourInputStream);
bufferdInputStream.mark(some_value);
//read your bufferdInputStream
bufferdInputStream.reset();
//read it again

您可以用 PushbackInputStream 包装输入流。PushbackInputStream 允许读取 未读(“ 回信”)字节,所以你可以这样做:

public class StreamTest {
public static void main(String[] args) throws IOException {
byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };


InputStream originalStream = new ByteArrayInputStream(bytes);


byte[] readBytes = getBytes(originalStream, 3);
printBytes(readBytes); // prints: 1 2 3


readBytes = getBytes(originalStream, 3);
printBytes(readBytes); // prints: 4 5 6


// now let's wrap it with PushBackInputStream


originalStream = new ByteArrayInputStream(bytes);


InputStream wrappedStream = new PushbackInputStream(originalStream, 10); // 10 means that maximnum 10 characters can be "written back" to the stream


readBytes = getBytes(wrappedStream, 3);
printBytes(readBytes); // prints 1 2 3


((PushbackInputStream) wrappedStream).unread(readBytes, 0, readBytes.length);


readBytes = getBytes(wrappedStream, 3);
printBytes(readBytes); // prints 1 2 3




}


private static byte[] getBytes(InputStream is, int howManyBytes) throws IOException {
System.out.print("Reading stream: ");


byte[] buf = new byte[howManyBytes];


int next = 0;
for (int i = 0; i < howManyBytes; i++) {
next = is.read();
if (next > 0) {
buf[i] = (byte) next;
}
}
return buf;
}


private static void printBytes(byte[] buffer) throws IOException {
System.out.print("Reading stream: ");


for (int i = 0; i < buffer.length; i++) {
System.out.print(buffer[i] + " ");
}
System.out.println();
}




}

请注意,PushbackInputStream 存储字节的内部缓冲区,因此它确实在内存中创建了一个缓冲区,其中保存了“写回”的字节。

了解了这种方法,我们可以进一步将其与 FilterInputStream 结合起来。FilterInputStream 将原始输入流作为委托存储。这允许创建新的类定义,允许“ 未读”原始数据自动。这个类别的定义如下:

public class TryReadInputStream extends FilterInputStream {
private final int maxPushbackBufferSize;


/**
* Creates a <code>FilterInputStream</code>
* by assigning the  argument <code>in</code>
* to the field <code>this.in</code> so as
* to remember it for later use.
*
* @param in the underlying input stream, or <code>null</code> if
*           this instance is to be created without an underlying stream.
*/
public TryReadInputStream(InputStream in, int maxPushbackBufferSize) {
super(new PushbackInputStream(in, maxPushbackBufferSize));
this.maxPushbackBufferSize = maxPushbackBufferSize;
}


/**
* Reads from input stream the <code>length</code> of bytes to given buffer. The read bytes are still avilable
* in the stream
*
* @param buffer the destination buffer to which read the data
* @param offset  the start offset in the destination <code>buffer</code>
* @aram length how many bytes to read from the stream to buff. Length needs to be less than
*        <code>maxPushbackBufferSize</code> or IOException will be thrown
*
* @return number of bytes read
* @throws java.io.IOException in case length is
*/
public int tryRead(byte[] buffer, int offset, int length) throws IOException {
validateMaxLength(length);


// NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
// because read() guarantees to read a byte


int bytesRead = 0;


int nextByte = 0;


for (int i = 0; (i < length) && (nextByte >= 0); i++) {
nextByte = read();
if (nextByte >= 0) {
buffer[offset + bytesRead++] = (byte) nextByte;
}
}


if (bytesRead > 0) {
((PushbackInputStream) in).unread(buffer, offset, bytesRead);
}


return bytesRead;


}


public byte[] tryRead(int maxBytesToRead) throws IOException {
validateMaxLength(maxBytesToRead);


ByteArrayOutputStream baos = new ByteArrayOutputStream(); // as ByteArrayOutputStream to dynamically allocate internal bytes array instead of allocating possibly large buffer (if maxBytesToRead is large)


// NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
// because read() guarantees to read a byte


int nextByte = 0;


for (int i = 0; (i < maxBytesToRead) && (nextByte >= 0); i++) {
nextByte = read();
if (nextByte >= 0) {
baos.write((byte) nextByte);
}
}


byte[] buffer = baos.toByteArray();


if (buffer.length > 0) {
((PushbackInputStream) in).unread(buffer, 0, buffer.length);
}


return buffer;


}


private void validateMaxLength(int length) throws IOException {
if (length > maxPushbackBufferSize) {
throw new IOException(
"Trying to read more bytes than maxBytesToRead. Max bytes: " + maxPushbackBufferSize + ". Trying to read: " +
length);
}
}


}

这个类有两个方法。一个用于读入现有缓冲区(定义类似于调用 InputStream 类的 public int read(byte b[], int off, int len))。其次,返回新的缓冲区(如果要读取的缓冲区大小未知,这可能更有效)。

现在让我们来看看我们班的实际情况:

public class StreamTest2 {
public static void main(String[] args) throws IOException {
byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };


InputStream originalStream = new ByteArrayInputStream(bytes);


byte[] readBytes = getBytes(originalStream, 3);
printBytes(readBytes); // prints: 1 2 3


readBytes = getBytes(originalStream, 3);
printBytes(readBytes); // prints: 4 5 6


// now let's use our TryReadInputStream


originalStream = new ByteArrayInputStream(bytes);


InputStream wrappedStream = new TryReadInputStream(originalStream, 10);


readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // NOTE: no manual call to "unread"(!) because TryReadInputStream handles this internally
printBytes(readBytes); // prints 1 2 3


readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
printBytes(readBytes); // prints 1 2 3


readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
printBytes(readBytes); // prints 1 2 3


// we can also call normal read which will actually read the bytes without "writing them back"
readBytes = getBytes(wrappedStream, 3);
printBytes(readBytes); // prints 1 2 3


readBytes = getBytes(wrappedStream, 3);
printBytes(readBytes); // prints 4 5 6


readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // now we can try read next bytes
printBytes(readBytes); // prints 7 8 9


readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
printBytes(readBytes); // prints 7 8 9




}






}

这个怎么样:

if (stream.markSupported() == false) {


// lets replace the stream object
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copy(stream, baos);
stream.close();
stream = new ByteArrayInputStream(baos.toByteArray());
// now the stream should support 'mark' and 'reset'


}

如果有人正在运行 Spring Boot 应用程序,并且您想读取 RestTemplate的响应体(这就是为什么我想读取两次流) ,那么有一种干净(呃)的方法可以做到这一点。

首先,需要使用 Spring 的 StreamUtils将流复制到 String:

String text = StreamUtils.copyToString(response.getBody(), Charset.defaultCharset()))

但这还不是全部。您还需要使用一个可以为您缓冲流的请求工厂,如下所示:

ClientHttpRequestFactory factory = new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
RestTemplate restTemplate = new RestTemplate(factory);

或者,如果你使用的是工厂 bean,那么(这里是 Kotlin,但不管怎样) :

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createRestTemplate(): RestTemplate = RestTemplateBuilder()
.requestFactory { BufferingClientHttpRequestFactory(SimpleClientHttpRequestFactory()) }
.additionalInterceptors(loggingInterceptor)
.build()

资料来源: https://objectpartners.com/2018/03/01/log-your-resttemplate-request-and-response-without-destroying-the-body/

将一个 InputStream分成两部分,即 同时避免加载内存中的所有数据,然后独立处理它们:

  1. 创建一对 OutputStream,精确地说: PipedOutputStream
  2. 用 PipedInputStream 连接每个 PipedOutputStream,这些 PipedInputStream是返回的 InputStream
  3. 将源代码流与刚刚创建的 OutputStream连接起来。因此,所有从源代码 InputStream读取的内容,都将用两个 OutputStream写入。不需要实现它,因为它已经在 TeeInputStream(commons.io)中完成了。
  4. 在一个单独的线程中读取整个源代码 inputStream,并且隐式地将输入数据传输到目标 inputStreams。

    public static final List<InputStream> splitInputStream(InputStream input)
    throws IOException
    {
    Objects.requireNonNull(input);
    
    
    PipedOutputStream pipedOut01 = new PipedOutputStream();
    PipedOutputStream pipedOut02 = new PipedOutputStream();
    
    
    List<InputStream> inputStreamList = new ArrayList<>();
    inputStreamList.add(new PipedInputStream(pipedOut01));
    inputStreamList.add(new PipedInputStream(pipedOut02));
    
    
    TeeOutputStream tout = new TeeOutputStream(pipedOut01, pipedOut02);
    
    
    TeeInputStream tin = new TeeInputStream(input, tout, true);
    
    
    Executors.newSingleThreadExecutor().submit(tin::readAllBytes);
    
    
    return Collections.unmodifiableList(inputStreamList);
    }
    

Be aware to close the inputStreams after being consumed, and close the thread that runs: TeeInputStream.readAllBytes()

In case, you need to split it into multiple InputStream, instead of just two. Replace in the previous fragment of code the class TeeOutputStream for your own implementation, which would encapsulate a List<OutputStream> and override the OutputStream interface:

public final class TeeListOutputStream extends OutputStream {
private final List<? extends OutputStream> branchList;


public TeeListOutputStream(final List<? extends OutputStream> branchList) {
Objects.requireNonNull(branchList);
this.branchList = branchList;
}


@Override
public synchronized void write(final int b) throws IOException {
for (OutputStream branch : branchList) {
branch.write(b);
}
}


@Override
public void flush() throws IOException {
for (OutputStream branch : branchList) {
branch.flush();
}
}


@Override
public void close() throws IOException {
for (OutputStream branch : branchList) {
branch.close();
}
}
}

如果您使用 RestTemplate 进行 http 调用,只需添加一个拦截器。 响应主体由 ClientHttpResponse 的实现缓存。 现在输入流可以从响应检索多次,因为我们需要

ClientHttpRequestInterceptor interceptor =  new ClientHttpRequestInterceptor() {


@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body,
ClientHttpRequestExecution execution) throws IOException {
ClientHttpResponse  response = execution.execute(request, body);


// additional work before returning response
return response
}
};


// Add the interceptor to RestTemplate Instance


restTemplate.getInterceptors().add(interceptor);
ByteArrayInputStream ins = new ByteArrayInputStream("Hello".getBytes());
System.out.println("ins.available() at begining:: " + ins.available());
ins.mark(0);
// Read input stream for some operations
System.out.println("ins.available() after reading :: " + ins.available());
ins.reset();
System.out.println("ins.available() after resetting :: " + ins.available());
// ins is ready for reading once again.