有没有可能从带有超时的 InputStream 读取数据?

具体来说,问题在于编写这样一个方法:

int maybeRead(InputStream in, long timeout)

如果数据在‘ timeout’毫秒内可用,返回值与 in.read ()相同,否则返回 -2。在方法返回之前,任何衍生线程都必须退出。

为了避免争论,这里的主题 java.io。由 Sun (任何 Java 版本)记录的 InputStream。请注意,这并不像看起来那么简单。以下是 Sun 的文档直接支持的一些事实。

  1. Read ()方法可以是不可中断的。

  2. 将 InputStream 包装在 Reader 或 InterruptibleChannel 中没有任何帮助,因为这些类只能调用 InputStream 的方法。如果可以使用这些类,就有可能编写一个解决方案,直接在 InputStream 上执行相同的逻辑。

  3. In.able ()返回0总是可以接受的。

  4. Close ()方法可能会阻塞或什么也不做。

  5. 没有一般的方法可以杀死另一个线程。

160607 次浏览

我没有使用 JavaNIO 包中的类,但是它是 看起来,它们在这里可能会有一些帮助。特别是 Java.nio.channel通道

正如 jt 所说,NIO 是最好的(也是正确的)解决方案。不过,如果你真的被 InputStream 卡住了,你也可以这么做

  1. 生成一个线程,它的独占工作是从 InputStream 读取并将结果放入一个缓冲区,这个缓冲区可以从原始线程读取而不会阻塞。如果您只有流的一个实例,那么这应该可以很好地工作。否则,可以使用 Thread 类中不推荐的方法杀死线程,但这可能会导致资源泄漏。

  2. 依赖 isApply 来指示可以在不阻塞的情况下读取的数据。但是,在某些情况下(比如使用 Sockets) ,可能需要一个潜在的阻塞读操作才能使 isiliable 报告0以外的内容。

下面是一种从 System.in 获取 NIO FileChannel 并使用超时检查数据可用性的方法,这是问题中描述的问题的特殊情况。在控制台运行它,不要输入任何输入,等待结果。它在 Java6下成功地在 Windows 和 Linux 上进行了测试。

import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;


public class Main {


static final ByteBuffer buf = ByteBuffer.allocate(4096);


public static void main(String[] args) {


long timeout = 1000 * 5;


try {
InputStream in = extract(System.in);
if (! (in instanceof FileInputStream))
throw new RuntimeException(
"Could not extract a FileInputStream from STDIN.");


try {
int ret = maybeAvailable((FileInputStream)in, timeout);
System.out.println(
Integer.toString(ret) + " bytes were read.");


} finally {
in.close();
}


} catch (Exception e) {
throw new RuntimeException(e);
}


}


/* unravels all layers of FilterInputStream wrappers to get to the
* core InputStream
*/
public static InputStream extract(InputStream in)
throws NoSuchFieldException, IllegalAccessException {


Field f = FilterInputStream.class.getDeclaredField("in");
f.setAccessible(true);


while( in instanceof FilterInputStream )
in = (InputStream)f.get((FilterInputStream)in);


return in;
}


/* Returns the number of bytes which could be read from the stream,
* timing out after the specified number of milliseconds.
* Returns 0 on timeout (because no bytes could be read)
* and -1 for end of stream.
*/
public static int maybeAvailable(final FileInputStream in, long timeout)
throws IOException, InterruptedException {


final int[] dataReady = {0};
final IOException[] maybeException = {null};
final Thread reader = new Thread() {
public void run() {
try {
dataReady[0] = in.getChannel().read(buf);
} catch (ClosedByInterruptException e) {
System.err.println("Reader interrupted.");
} catch (IOException e) {
maybeException[0] = e;
}
}
};


Thread interruptor = new Thread() {
public void run() {
reader.interrupt();
}
};


reader.start();
for(;;) {


reader.join(timeout);
if (!reader.isAlive())
break;


interruptor.start();
interruptor.join(1000);
reader.join(1000);
if (!reader.isAlive())
break;


System.err.println("We're hung");
System.exit(1);
}


if ( maybeException[0] != null )
throw maybeException[0];


return dataReady[0];
}
}

有趣的是,当在 NetBeans 6.5内而不是在控制台上运行程序时,超时根本不起作用,实际上需要调用 System.exit ()来杀死僵尸线程。发生的情况是中断线程阻塞(!)在调用 reader. 中断()。另一个测试程序(这里没有显示)额外尝试关闭通道,但这也不起作用。

如果 InputStream 由 Socket 支持,则可以使用 SetSoTimeout设置 Socket 超时(以毫秒为单位)。如果 read ()调用没有在指定的超时内解除阻塞,它将引发 SocketTimeoutException。

只需确保在调用 read ()之前在 Socket 上调用 setSoTimeout。

我会质疑问题陈述,而不是盲目地接受它。您只需要从控制台或通过网络超时。如果后者你有 Socket.setSoTimeout()HttpURLConnection.setReadTimeout(),这两者都做正确的需要,只要你设置正确的时候,你建造/获取它们。将它留给应用程序中的一个任意点,当您拥有的只是糟糕的设计导致非常笨拙的实现时。

假设您的流没有套接字支持(因此您不能使用 Socket.setSoTimeout()) ,我认为解决此类问题的标准方法是使用 Future。

假设我有以下执行器和流:

    ExecutorService executor = Executors.newFixedThreadPool(2);
final PipedOutputStream outputStream = new PipedOutputStream();
final PipedInputStream inputStream = new PipedInputStream(outputStream);

我有一个写入器,它写入一些数据,然后在写入最后一段数据并关闭流之前等待5秒钟:

    Runnable writeTask = new Runnable() {
@Override
public void run() {
try {
outputStream.write(1);
outputStream.write(2);
Thread.sleep(5000);
outputStream.write(3);
outputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
executor.submit(writeTask);

通常的解读方式如下。读取将无限期地阻塞数据,因此这将在5秒内完成:

    long start = currentTimeMillis();
int readByte = 1;
// Read data without timeout
while (readByte >= 0) {
readByte = inputStream.read();
if (readByte >= 0)
System.out.println("Read: " + readByte);
}
System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");

产出:

Read: 1
Read: 2
Read: 3
Complete in 5001ms

如果有一个更基本的问题,比如作者没有回应,读者就会永远阻塞。 如果在将来包装该读取,则可以按照以下方式控制超时:

    int readByte = 1;
// Read data with timeout
Callable<Integer> readTask = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return inputStream.read();
}
};
while (readByte >= 0) {
Future<Integer> future = executor.submit(readTask);
readByte = future.get(1000, TimeUnit.MILLISECONDS);
if (readByte >= 0)
System.out.println("Read: " + readByte);
}

产出:

Read: 1
Read: 2
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
at java.util.concurrent.FutureTask.get(FutureTask.java:91)
at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)

我可以捕获 TimeoutException 并执行任何我想要的清理操作。

使用 inputStream.ability ()

System.in.ability ()总是可以返回0。

我发现了相反的情况——它总是返回可用字节数的最佳值。InputStream.available()的 Javadoc:

Returns an estimate of the number of bytes that can be read (or skipped over)
from this input stream without blocking by the next invocation of a method for
this input stream.

由于时间/过时,估计是不可避免的。这个数字可能是一次性的低估,因为新的数据不断到来。然而,它总是“赶上”下一个呼叫-它应该说明所有到达的数据,除了刚好到达的时刻,新的呼叫。如果上述条件失败,则永久返回0。

第一个警告: InputStream 的具体子类负责可用()

InputStream是一个抽象类。它没有数据源。没有可用的数据是毫无意义的。因此,javadoc for available()也指出:

The available method for class InputStream always returns 0.


This method should be overridden by subclasses.

实际上,具体的输入流类确实覆盖了可用() ,提供了有意义的值,而不是常量0。

第二个警告: 确保在 Windows 中键入输入时使用回车符。

如果使用 System.in,您的程序只有在命令 shell 交出输入时才能接收输入。如果您正在使用文件重定向/管道(例如 somfile > java myJavaApp 或 someccommand | java myJavaApp) ,那么输入数据通常会立即传递。但是,如果手动键入输入,则可能会延迟数据切换。例如,对于 windows cmd.exe shell,数据在 cmd.exe shell 中进行缓冲。数据仅在回车后传递给正在执行的 java 程序(control-m 或 <enter>)。这是执行环境的一个限制。当然,只要 shell 缓冲数据,InputStream.ability ()就会返回0——这是正确的行为; 此时没有可用的数据。一旦 shell 中的数据可用,该方法就返回一个 > 0的值。注意: Cygwin 也使用 cmd.exe。

最简单的解决方案(没有阻塞,所以不需要超时)

用这个:

    byte[] inputData = new byte[1024];
int result = is.read(inputData, 0, is.available());
// result will indicate number of bytes read; -1 for EOF with no data read.

或等价地,

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
// ...
// inside some iteration / processing logic:
if (br.ready()) {
int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
}

更丰富的解决方案(在超时期间最大限度地填充缓冲区)

声明如下:

public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
throws IOException  {
int bufferOffset = 0;
long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
// can alternatively use bufferedReader, guarded by isReady():
int readResult = is.read(b, bufferOffset, readLength);
if (readResult == -1) break;
bufferOffset += readResult;
}
return bufferOffset;
}

那就用这个:

    byte[] inputData = new byte[1024];
int readCount = readInputStreamWithTimeout(System.in, inputData, 6000);  // 6 second timeout
// readCount will indicate number of bytes read; -1 for EOF with no data read.

受到 这个答案的启发,我想出了一个更加面向对象的解决方案。

这只有在您打算读取字符时才有效

您可以覆盖 BufferedReader 并实现以下内容:

public class SafeBufferedReader extends BufferedReader{


private long millisTimeout;


( . . . )


@Override
public int read(char[] cbuf, int off, int len) throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return 0;
}
return super.read(cbuf, off, len);
}


protected void waitReady() throws IllegalThreadStateException, IOException {
if(ready()) return;
long timeout = System.currentTimeMillis() + millisTimeout;
while(System.currentTimeMillis() < timeout) {
if(ready()) return;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break; // Should restore flag
}
}
if(ready()) return; // Just in case.
throw new IllegalThreadStateException("Read timed out");
}
}

这里有一个几乎完整的例子。

我在一些方法上返回0,您应该将其更改为 -2以满足您的需要,但是我认为0更适合 BufferedReader 合同。没有错误发生,它只是读取0个字符。ReadLine 方法是一个可怕的性能杀手。如果您确实想要使用 readLin,那么应该创建一个全新的 BufferedReadere.现在,它不是线程安全的。如果在 readLines 等待一行时有人调用了一个操作,它将产生意外的结果

我不喜欢回到 -2。我会抛出一个异常,因为有些人可能只是检查 int < 0来考虑 EOS。无论如何,这些方法声明“ can’t block”,你应该检查这个语句是否真实,只是不要覆盖它们。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.CharBuffer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;


/**
*
* readLine
*
* @author Dario
*
*/
public class SafeBufferedReader extends BufferedReader{


private long millisTimeout;


private long millisInterval = 100;


private int lookAheadLine;


public SafeBufferedReader(Reader in, int sz, long millisTimeout) {
super(in, sz);
this.millisTimeout = millisTimeout;
}


public SafeBufferedReader(Reader in, long millisTimeout) {
super(in);
this.millisTimeout = millisTimeout;
}






/**
* This is probably going to kill readLine performance. You should study BufferedReader and completly override the method.
*
* It should mark the position, then perform its normal operation in a nonblocking way, and if it reaches the timeout then reset position and throw IllegalThreadStateException
*
*/
@Override
public String readLine() throws IOException {
try {
waitReadyLine();
} catch(IllegalThreadStateException e) {
//return null; //Null usually means EOS here, so we can't.
throw e;
}
return super.readLine();
}


@Override
public int read() throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return -2; // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
}
return super.read();
}


@Override
public int read(char[] cbuf) throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return -2;  // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
}
return super.read(cbuf);
}


@Override
public int read(char[] cbuf, int off, int len) throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return 0;
}
return super.read(cbuf, off, len);
}


@Override
public int read(CharBuffer target) throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return 0;
}
return super.read(target);
}


@Override
public void mark(int readAheadLimit) throws IOException {
super.mark(readAheadLimit);
}


@Override
public Stream<String> lines() {
return super.lines();
}


@Override
public void reset() throws IOException {
super.reset();
}


@Override
public long skip(long n) throws IOException {
return super.skip(n);
}


public long getMillisTimeout() {
return millisTimeout;
}


public void setMillisTimeout(long millisTimeout) {
this.millisTimeout = millisTimeout;
}


public void setTimeout(long timeout, TimeUnit unit) {
this.millisTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
}


public long getMillisInterval() {
return millisInterval;
}


public void setMillisInterval(long millisInterval) {
this.millisInterval = millisInterval;
}


public void setInterval(long time, TimeUnit unit) {
this.millisInterval = TimeUnit.MILLISECONDS.convert(time, unit);
}


/**
* This is actually forcing us to read the buffer twice in order to determine a line is actually ready.
*
* @throws IllegalThreadStateException
* @throws IOException
*/
protected void waitReadyLine() throws IllegalThreadStateException, IOException {
long timeout = System.currentTimeMillis() + millisTimeout;
waitReady();


super.mark(lookAheadLine);
try {
while(System.currentTimeMillis() < timeout) {
while(ready()) {
int charInt = super.read();
if(charInt==-1) return; // EOS reached
char character = (char) charInt;
if(character == '\n' || character == '\r' ) return;
}
try {
Thread.sleep(millisInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore flag
break;
}
}
} finally {
super.reset();
}
throw new IllegalThreadStateException("readLine timed out");


}


protected void waitReady() throws IllegalThreadStateException, IOException {
if(ready()) return;
long timeout = System.currentTimeMillis() + millisTimeout;
while(System.currentTimeMillis() < timeout) {
if(ready()) return;
try {
Thread.sleep(millisInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore flag
break;
}
}
if(ready()) return; // Just in case.
throw new IllegalThreadStateException("read timed out");
}


}