将async/wait与for每个循环一起使用

forEach循环中使用async/await有任何问题吗?我正在尝试遍历文件数组并在每个文件的内容上遍历await

import fs from 'fs-promise'
async function printFiles () {const files = await getFilePaths() // Assume this works fine
files.forEach(async (file) => {const contents = await fs.readFile(file, 'utf8')console.log(contents)})}
printFiles()

这段代码确实有效,但是会有什么问题吗?有人告诉我,你不应该在这样的高阶函数中使用async/await,所以我只想问一下是否有任何问题。

1693685 次浏览

当然代码确实有效,但我很确定它不会做你期望它做的事情。它只是触发多个异步调用,但printFiles函数确实在那之后立即返回。

按顺序读

如果您想按顺序读取文件,确实是你不能使用#0。只需使用现代的for … of循环,其中await将按预期工作:

async function printFiles () {const files = await getFilePaths();
for (const file of files) {const contents = await fs.readFile(file, 'utf8');console.log(contents);}}

并行阅读

如果你想并行读取文件,确实是你不能使用#0async的每个回调函数调用都会返回一个Promise,但是你正在丢弃它们而不是等待它们。只需使用map代替,你可以等待Promise.all将获得的Promise数组:

async function printFiles () {const files = await getFilePaths();
await Promise.all(files.map(async (file) => {const contents = await fs.readFile(file, 'utf8')console.log(contents)}));}

npm上的p迭代模块实现了Array迭代方法,因此它们可以以非常直接的方式与async/wait一起使用。

以您的案例为例:

const { forEach } = require('p-iteration');const fs = require('fs-promise');
(async function printFiles () {const files = await getFilePaths();
await forEach(files, async (file) => {const contents = await fs.readFile(file, 'utf8');console.log(contents);});})();

上面的两种解决方案都有效,但是,Antonio用更少的代码完成了这项工作,以下是它如何帮助我从数据库中解析数据,从几个不同的子引用中解析数据,然后将它们全部推送到一个数组中,并在完成所有工作后以Promise的形式解析它:

Promise.all(PacksList.map((pack)=>{return fireBaseRef.child(pack.folderPath).once('value',(snap)=>{snap.forEach( childSnap => {const file = childSnap.val()file.id = childSnap.key;allItems.push( file )})})})).then(()=>store.dispatch( actions.allMockupItems(allItems)))

在一个文件中弹出几个方法是非常容易的,这些方法将以序列化的顺序处理异步数据,并为您的代码提供更传统的味道。例如:

module.exports = function () {var self = this;
this.each = async (items, fn) => {if (items && items.length) {await Promise.all(items.map(async (item) => {await fn(item);}));}};
this.reduce = async (items, fn, initialValue) => {await self.each(items, async (item) => {initialValue = await fn(initialValue, item);});return initialValue;};};

现在,假设保存在'./myAsync.js',您可以在相邻文件中执行类似于以下操作:

.../* your server setup here */...var MyAsync = require('./myAsync');var Cat = require('./models/Cat');var Doje = require('./models/Doje');var example = async () => {var myAsync = new MyAsync();var doje = await Doje.findOne({ name: 'Doje', noises: [] }).save();var cleanParams = [];
// FOR EACH EXAMPLEawait myAsync.each(['bork', 'concern', 'heck'],async (elem) => {if (elem !== 'heck') {await doje.update({ $push: { 'noises': elem }});}});
var cat = await Cat.findOne({ name: 'Nyan' });
// REDUCE EXAMPLEvar friendsOfNyanCat = await myAsync.reduce(cat.friends,async (catArray, friendId) => {var friend = await Friend.findById(friendId);if (friend.name !== 'Long cat') {catArray.push(friend.name);}}, []);// Assuming Long Cat was a friend of Nyan Cat...assert(friendsOfNyanCat.length === (cat.friends.length - 1));}

一个重要的警告是:await + for .. of方法和forEach + async方法实际上有不同的效果。

在真正的for循环中包含await将确保所有异步调用都一个接一个地执行。forEach + async方式将同时触发所有Promise,这更快,但有时会不知所措(如果您进行一些数据库查询或访问一些具有音量限制的Web服务并且不想一次触发100,000个调用)。

如果您不使用async/await并希望确保文件被读取一个接一个,您也可以使用reduce + promise(不太优雅)。

files.reduce((lastPromise, file) =>lastPromise.then(() =>fs.readFile(file, 'utf8')), Promise.resolve())

或者你可以创建一个forEachAsync来帮助,但基本上使用相同的for循环底层。

Array.prototype.forEachAsync = async function(cb){for(let x of this){await cb(x);}}

我会使用经过良好测试(每周数百万次下载)的pifyasync模块。如果你不熟悉异步模块,我强烈建议你查看其文档。我见过多个开发人员浪费时间重新创建其方法,或者更糟的是,当高阶异步方法可以简化代码时,使维护异步代码变得困难。

const async = require('async')const fs = require('fs-promise')const pify = require('pify')
async function getFilePaths() {return Promise.resolve(['./package.json','./package-lock.json',]);}
async function printFiles () {const files = await getFilePaths()
await pify(async.eachSeries)(files, async (file) => {  // <-- run in series// await pify(async.each)(files, async (file) => {  // <-- run in parallelconst contents = await fs.readFile(file, 'utf8')console.log(contents)})console.log('HAMBONE')}
printFiles().then(() => {console.log('HAMBUNNY')})// ORDER OF LOGS:// package.json contents// package-lock.json contents// HAMBONE// HAMBUNNY```

除了@Bergi的回答之外,我还想提供第三种选择。它与@Bergi的第二个例子非常相似,但不是单独等待每个readFile,而是创建一个承诺数组,每个承诺都在最后等待。

import fs from 'fs-promise';async function printFiles () {const files = await getFilePaths();
const promises = files.map((file) => fs.readFile(file, 'utf8'))
const contents = await Promise.all(promises)
contents.forEach(console.log);}

请注意,传递给.map()的函数不需要是async,因为fs.readFile无论如何都会返回一个Promise对象。因此promises是一个Promise对象数组,可以发送给Promise.all()

在@Bergi的回答中,控制台可能会按照读取的顺序记录文件内容。例如,如果一个非常小的文件在一个非常大的文件之前完成读取,它将首先被记录,即使小文件在files数组中的大文件中排名第一。然而,在我上面的方法中,你可以保证控制台会以与提供的数组相同的顺序记录文件。

使用任务、未来化和可遍历列表,您可以简单地执行

async function printFiles() {const files = await getFiles();
List(files).traverse( Task.of, f => readFile( f, 'utf-8')).fork( console.error, console.log)}

你是这样安排的

import fs from 'fs';import { futurize } from 'futurize';import Task from 'data.task';import { List } from 'immutable-ext';
const future = futurizeP(Task)const readFile = future(fs.readFile)

构建所需代码的另一种方法是

const printFiles = files =>List(files).traverse( Task.of, fn => readFile( fn, 'utf-8')).fork( console.error, console.log)

或者更注重功能

// 90% of encodings are utf-8, making that use case super easy is prudent
// handy-library.jsexport const readFile = f =>future(fs.readFile)( f, 'utf-8' )
export const arrayToTaskList = list => taskFn =>List(files).traverse( Task.of, taskFn )
export const readFiles = files =>arrayToTaskList( files, readFile )
export const printFiles = files =>readFiles(files).fork( console.error, console.log)

然后从父函数

async function main() {/* awesome code with side-effects before */printFiles( await getFiles() );/* awesome code with side-effects after */}

如果你真的想要更多的编码灵活性,你可以这样做(为了好玩,我使用了提议的管道前进操作员

import { curry, flip } from 'ramda'
export const readFile = fs.readFile|> future,|> curry,|> flip
export const readFileUtf8 = readFile('utf-8')

PS-我没有在控制台上尝试这个代码,可能有一些拼写错误……“直接自由式,从穹顶顶部开始!”正如90年代的孩子们所说。

以下是一些forEachAsync原型。请注意,您需要await它们:

Array.prototype.forEachAsync = async function (fn) {for (let t of this) { await fn(t) }}
Array.prototype.forEachAsyncParallel = async function (fn) {await Promise.all(this.map(fn));}

说明虽然您可以将其包含在您自己的代码中,但您不应该将其包含在您分发给其他人的库中(以避免污染他们的全局)。

而不是Promise.all结合Array.prototype.map(这并不能保证Promise被解析的顺序),我使用Array.prototype.reduce,从解析的Promise开始:

async function printFiles () {const files = await getFilePaths();
await files.reduce(async (promise, file) => {// This line will wait for the last async function to finish.// The first iteration uses an already resolved Promise// so, it will immediately continue.await promise;const contents = await fs.readFile(file, 'utf8');console.log(contents);}, Promise.resolve());}

使用ES2018,您可以大大简化上述所有答案:

async function printFiles () {const files = await getFilePaths()
for await (const contents of files.map(file => fs.readFile(file, 'utf8'))) {console.log(contents)}}

见规格:提案-同步-迭代

简化:

  for await (const results of array) {await longRunningTask()}console.log('I will wait')

2018-09-10:这个答案最近得到了很多关注,有关异步迭代的更多信息,请参阅Axel Rauschmayer的博客文章

与Antonio Val的#0类似,另一个npm模块是#1

const AsyncAF = require('async-af');const fs = require('fs-promise');
function printFiles() {// since AsyncAF accepts promises or non-promises, there's no need to await hereconst files = getFilePaths();
AsyncAF(files).forEach(async file => {const contents = await fs.readFile(file, 'utf8');console.log(contents);});}
printFiles();

或者,#0有一个静态方法(log/logAF)来记录Promise的结果:

const AsyncAF = require('async-af');const fs = require('fs-promise');
function printFiles() {const files = getFilePaths();
AsyncAF(files).forEach(file => {AsyncAF.log(fs.readFile(file, 'utf8'));});}
printFiles();

但是,该库的主要优点是您可以链接异步方法来执行以下操作:

const aaf = require('async-af');const fs = require('fs-promise');
const printFiles = () => aaf(getFilePaths()).map(file => fs.readFile(file, 'utf8')).forEach(file => aaf.log(file));
printFiles();

async-af

目前Array.for每个原型属性不支持异步操作,但我们可以创建自己的多边形填充来满足我们的需求。

// Example of asyncForEach Array poly-fill for NodeJs// file: asyncForEach.js// Define asynForEach functionasync function asyncForEach(iteratorFunction){let indexer = 0for(let data of this){await iteratorFunction(data, indexer)indexer++}}// Append it as an Array prototype propertyArray.prototype.asyncForEach = asyncForEachmodule.exports = {Array}

就是这样!您现在在这些to操作之后定义的任何数组上都有一个async for每个方法可用。

让我们测试一下…

// Nodejs style// file: someOtherFile.js
const readline = require('readline')Array = require('./asyncForEach').Arrayconst log = console.log
// Create a stream interfacefunction createReader(options={prompt: '>'}){return readline.createInterface({input: process.stdin,output: process.stdout,prompt: options.prompt !== undefined ? options.prompt : '>'})}// Create a cli stream readerasync function getUserIn(question, options={prompt:'>'}){log(question)let reader = createReader(options)return new Promise((res)=>{reader.on('line', (answer)=>{process.stdout.cursorTo(0, 0)process.stdout.clearScreenDown()reader.close()res(answer)})})}
let questions = [`What's your name`,`What's your favorite programming language`,`What's your favorite async function`]let responses = {}
async function getResponses(){// Notice we have to prepend await before calling the async Array function// in order for it to function as expectedawait questions.asyncForEach(async function(question, index){let answer = await getUserIn(question)responses[question] = answer})}
async function main(){await getResponses()log(responses)}main()// Should prompt user for an answer to each question and then// log each question and answer as an object to the terminal

我们可以对其他一些数组函数(如map…

async function asyncMap(iteratorFunction){let newMap = []let indexer = 0for(let data of this){newMap[indexer] = await iteratorFunction(data, indexer, this)indexer++}return newMap}
Array.prototype.asyncMap = asyncMap

…等等:)

有些事情要注意:

  • 你的iteratorFunction必须是一个异步函数或Promise
  • Array.prototype.<yourAsyncFunc> = <yourAsyncFunc>之前创建的任何数组都不具有此功能

fs基于Promise时,Bergi的解决方案工作得很好。您可以使用bluebirdfs-extrafs-promise

然而,解决节点的本机fs库如下:

const result = await Promise.all(filePaths.map( async filePath => {const fileContents = await getAssetFromCache(filePath, async function() {
// 1. Wrap with Promise// 2. Return the result of the Promisereturn await new Promise((res, rej) => {fs.readFile(filePath, 'utf8', function(err, data) {if (data) {res(data);}});});});
return fileContents;}));

注:require('fs')强制接受函数作为第3个参数,否则抛出错误:

TypeError [ERR_INVALID_CALLBACK]: Callback must be a function

要查看如何出错,请在方法末尾打印console.log。

一般情况下可能出错的事情:

  • 任意命令。
  • printFiles可以在打印文件之前完成运行。
  • 业绩不佳。

这些并不总是错误的,但经常出现在标准用例中。

一般来说,使用for每个将导致除最后一个之外的所有结果。它将调用每个函数而不等待函数,这意味着它告诉所有函数开始然后完成,而不等待函数完成。

import fs from 'fs-promise'
async function printFiles () {const files = (await getFilePaths()).map(file => fs.readFile(file, 'utf8'))
for(const file of files)console.log(await file)}
printFiles()

这是原生JS中的一个示例,它将保持顺序,防止函数过早返回,并在理论上保持最佳性能。

这将:

  • 启动所有文件读取并行发生。
  • 通过使用map将文件名映射到要等待的Promise来保留顺序。
  • 按照数组定义的顺序等待每个Promise。

使用此解决方案,第一个文件将在可用时立即显示,而无需等待其他文件首先可用。

它还将同时加载所有文件,而不必等待第一个文件完成后才能开始读取第二个文件。

这个和原始版本的唯一缺点是,如果一次启动多个读取,那么由于一次可能发生更多错误,因此处理错误更加困难。

使用一次读取文件的版本,然后将在失败时停止,而不会浪费时间尝试读取更多文件。即使使用精心设计的取消系统,也很难避免它在第一个文件上失败,但已经读取了大部分其他文件。

性能并不总是可预测的。虽然许多系统通过并行文件读取会更快,但有些系统会更喜欢顺序读取。有些是动态的,可能会在负载下移动,提供延迟的优化在严重竞争下并不总是产生良好的吞吐量。

该示例中也没有错误处理。如果某些内容要求它们要么全部成功显示,要么根本不显示,它不会这样做。

建议在每个阶段使用console.log和假文件读取解决方案(改为随机延迟)进行深入实验。尽管许多解决方案在简单情况下似乎也这样做,但所有解决方案都有细微的差异,需要一些额外的审查才能挤出。

使用此模拟来帮助区分解决方案:

(async () => {const start = +new Date();const mock = () => {return {fs: {readFile: file => new Promise((resolve, reject) => {// Instead of this just make three files and try each timing arrangement.// IE, all same, [100, 200, 300], [300, 200, 100], [100, 300, 200], etc.const time = Math.round(100 + Math.random() * 4900);console.log(`Read of ${file} started at ${new Date() - start} and will take ${time}ms.`)setTimeout(() => {// Bonus material here if random reject instead.console.log(`Read of ${file} finished, resolving promise at ${new Date() - start}.`);resolve(file);}, time);})},console: {log: file => console.log(`Console Log of ${file} finished at ${new Date() - start}.`)},getFilePaths: () => ['A', 'B', 'C', 'D', 'E']};};
const printFiles = (({fs, console, getFilePaths}) => {return async function() {const files = (await getFilePaths()).map(file => fs.readFile(file, 'utf8'));
for(const file of files)console.log(await file);};})(mock());
console.log(`Running at ${new Date() - start}`);await printFiles();console.log(`Finished running at ${new Date() - start}`);})();

今天我遇到了这个问题的多种解决方案。在for每个循环中运行异步等待函数。通过围绕我们构建包装器,我们可以实现这一点。

关于它在内部是如何工作的更详细的解释,对于原生for每个以及为什么它不能进行异步函数调用,以及关于各种方法的其他详细信息在这里的链接中提供

它可以通过多种方式完成,它们如下:

方法1:使用包装器。

await (()=>{return new Promise((resolve,reject)=>{items.forEach(async (item,index)=>{try{await someAPICall();} catch(e) {console.log(e)}count++;if(index === items.length-1){resolve('Done')}});});})();

方法2:使用相同的泛型函数Array.prototype

Array.prototype.forEachAsync.js

if(!Array.prototype.forEachAsync) {Array.prototype.forEachAsync = function (fn){return new Promise((resolve,reject)=>{this.forEach(async(item,index,array)=>{await fn(item,index,array);if(index === array.length-1){resolve('done');}})});};}

用法:

require('./Array.prototype.forEachAsync');
let count = 0;
let hello = async (items) => {
// Method 1 - Using the Array.prototype.forEach
await items.forEachAsync(async () => {try{await someAPICall();} catch(e) {console.log(e)}count++;});
console.log("count = " + count);}
someAPICall = () => {return new Promise((resolve, reject) => {setTimeout(() => {resolve("done") // or reject('error')}, 100);})}
hello(['', '', '', '']); // hello([]) empty array is also be handled by default

方法三:

使用Promise.all

  await Promise.all(items.map(async (item) => {await someAPICall();count++;}));
console.log("count = " + count);

方法4:传统for循环或现代for循环

// Method 4 - using for loop directly
// 1. Using the modern for(.. in..) loopfor(item in items){
await someAPICall();count++;}
//2. Using the traditional for loop
for(let i=0;i<items.length;i++){
await someAPICall();count++;}

console.log("count = " + count);

只是增加了原来的答案

  • 原答案中的并行阅读语法有时令人困惑,难以阅读,也许我们可以用不同的方法来写它
async function printFiles() {const files = await getFilePaths();const fileReadPromises = [];
const readAndLogFile = async filePath => {const contents = await fs.readFile(file, "utf8");console.log(contents);return contents;};
files.forEach(file => {fileReadPromises.push(readAndLogFile(file));});
await Promise.all(fileReadPromises);}
  • 对于顺序操作,不仅仅是为…的,普通的for循环也可以
async function printFiles() {const files = await getFilePaths();
for (let i = 0; i < files.length; i++) {const file = files[i];const contents = await fs.readFile(file, "utf8");console.log(contents);}}

就像@Bergi的回应,但有一个区别。

如果一个被拒绝,Promise.all会拒绝所有的承诺。

所以,使用递归。

const readFilesQueue = async (files, index = 0) {const contents = await fs.readFile(files[index], 'utf8')console.log(contents)
return files.length <= index? readFilesQueue(files, ++index): files
}
const printFiles async = () => {const files = await getFilePaths();const printContents = await readFilesQueue(files)
return printContents}
printFiles()

PS

readFilesQueueprintFiles之外,导致console.log引入的副作用*,最好是模拟、测试和/或监视,所以,有一个返回内容的函数(旁注)并不酷。

因此,代码可以简单地设计为:三个分离的函数是“纯”**并且没有引入副作用,处理整个列表并且可以轻松修改以处理失败情况。

const files = await getFilesPath()
const printFile = async (file) => {const content = await fs.readFile(file, 'utf8')console.log(content)}
const readFiles = async = (files, index = 0) => {await printFile(files[index])
return files.lengh <= index? readFiles(files, ++index): files}
readFiles(files)

未来编辑/当前状态

Node支持顶级等待(这还没有插件,不会有,可以通过和谐标志启用),它很酷,但不能解决一个问题(从战略上讲,我只在LTS版本上工作)。如何获取文件?

使用组合。给定代码,让我感觉这是在模块内部,所以应该有一个函数来完成它。如果没有,你应该使用IIFE将角色代码包装成异步函数,创建简单的模块,这一切都适合你,或者你可以使用正确的方法,有组合。

// more complex version with IIFE to a single module(async (files) => readFiles(await files())(getFilesPath)

请注意,变量的名称由于语义学而改变。您传递一个仿函数(可以被另一个函数调用的函数)并接收内存上的一个指针,该指针包含应用程序的初始逻辑块。

但是,如果不是一个模块,你需要导出逻辑?

将函数包装在异步函数中。

export const readFilesQueue = async () => {// ... to code goes here}

或者更改变量的名称,无论…


副作用*表示应用程序的任何细菌效应,可以改变应用程序中的状态/行为或插入错误,例如IO。

**由“纯”,它在撇号中,因为函数不是纯的,代码可以收敛到纯版本,当没有控制台输出时,只有数据操作。

除此之外,为了纯粹,您需要使用处理副作用的monad,这很容易出错,并将该错误与应用程序分开处理。

此解决方案还对内存进行了优化,因此您可以在10,000个数据项和请求上运行它。这里的一些其他解决方案会在大型数据集上使服务器崩溃。

在TypeScript中:

export async function asyncForEach<T>(array: Array<T>, callback: (item: T, index: number) => Promise<void>) {for (let index = 0; index < array.length; index++) {await callback(array[index], index);}}

如何使用?

await asyncForEach(receipts, async (eachItem) => {await ...})

您可以使用Array.prototype.forEach,但async/wait不太兼容。这是因为从异步回调返回的Promise期望得到解析,但Array.prototype.forEach不会从其回调的执行中解析任何Promise。因此,您可以使用for每个,但您必须自己处理Promise解析。

以下是使用Array.prototype.forEach读取和打印每个文件的方法

async function printFilesInSeries () {const files = await getFilePaths()
let promiseChain = Promise.resolve()files.forEach((file) => {promiseChain = promiseChain.then(() => {fs.readFile(file, 'utf8').then((contents) => {console.log(contents)})})})await promiseChain}

这是一种并行打印文件内容的方法(仍在使用Array.prototype.forEach

async function printFilesInParallel () {const files = await getFilePaths()
const promises = []files.forEach((file) => {promises.push(fs.readFile(file, 'utf8').then((contents) => {console.log(contents)}))})await Promise.all(promises)}

如果你想同时遍历所有元素:

async function asyncForEach(arr, fn) {await Promise.all(arr.map(fn));}

如果你想非并发地遍历所有元素(例如,当你的映射函数有副作用或一次在所有数组元素上运行mapper会占用太多资源):

选项A:承诺

function asyncForEachStrict(arr, fn) {return new Promise((resolve) => {arr.reduce((promise, cur, idx) => promise.then(() => fn(cur, idx, arr)),Promise.resolve(),).then(() => resolve());});}

选项B:异步/等待

async function asyncForEachStrict(arr, fn) {for (let idx = 0; idx < arr.length; idx += 1) {const cur = arr[idx];
await fn(cur, idx, arr);}}

正如其他答案所提到的,您可能希望它按顺序而不是并行执行。例如。运行第一个文件,等待它完成,然后完成后运行第二个文件。这不是会发生的事情。

我认为重要的是要解决为什么这不会发生。

想想forEach是如何工作的。我找不到源,但我假设它是这样工作的:

const forEach = (arr, cb) => {for (let i = 0; i < arr.length; i++) {cb(arr[i]);}};

现在想想当你做这样的事情时会发生什么:

forEach(files, async logFile(file) {const contents = await fs.readFile(file, 'utf8');console.log(contents);});

forEachfor循环中,我们调用cb(arr[i]),它最终是logFile(file)logFile函数内部有一个await,所以也许for循环会在继续i++之前等待这个await

不,不会的。令人困惑的是,这不是await的工作方式。来自的文档

等待拆分执行流,允许异步函数的调用者恢复执行。在等待推迟异步函数的延续后,随后执行后续语句。如果此等待是其函数执行的最后一个表达式,则继续执行,方法是向函数的调用者返回一个挂起的Promise,以完成等待的函数并恢复该调用者的执行。

因此,如果您有以下情况,则不会在"b"之前记录数字:

const delay = (ms) => {return new Promise((resolve) => {setTimeout(resolve, ms);});};
const logNumbers = async () => {console.log(1);await delay(2000);console.log(2);await delay(2000);console.log(3);};
const main = () => {console.log("a");logNumbers();console.log("b");};
main();

回到forEachforEach就像mainlogFile就像logNumbersmain不会因为logNumbers做了一些awaiting而停止,forEach不会因为logFile做了一些awaiting而停止。

替换不起作用的forEach()等待循环的简单解决方案是用map替换forEach并在开头添加Promise.all(

例如:

await y.forEach(async (x) => {

await Promise.all(y.map(async (x) => {

最后需要一个额外的)

这是一个在for每个循环中使用async的好例子。

编写你自己的asyncFor每个

async function asyncForEach(array, callback) {for (let index = 0; index < array.length; index++) {await callback(array[index], index, array)}}

你可以像这样使用它

await asyncForEach(array, async function(item,index,array){//await here})

从循环调用异步方法是不好的。这是因为每次循环迭代都会延迟到整个异步操作完成。这不是很高性能。它还避免了async/await的并行化优势。

更好的解决方案是一次创建所有Promise,然后使用Promise.all()访问结果。否则,每个连续的操作都不会开始,直到前一个操作完成。

因此,代码可以重构如下;

const printFiles = async () => {const files = await getFilePaths();const results = [];files.forEach((file) => {results.push(fs.readFile(file, 'utf8'));});const contents = await Promise.all(results);console.log(contents);}

图片价值1000字-仅用于顺序方法


背景:我昨晚也遇到了类似的情况。我使用了async函数作为foreach参数。结果是不可预测的。当我对我的代码进行3次测试时,它运行了2次没有问题,失败了1次。(有点奇怪)

最后我得到了我的头周围,并做了一些便签板测试。

场景1-它如何在Foreach中使用async获得非顺序性

在此处输入图片描述

const getPromise = (time) => {return new Promise((resolve, reject) => {setTimeout(() => {resolve(`Promise resolved for ${time}s`)}, time)})}
const main = async () => {const myPromiseArray = [getPromise(1000), getPromise(500), getPromise(3000)]console.log('Before For Each Loop')
myPromiseArray.forEach(async (element, index) => {let result = await element;console.log(result);})
console.log('After For Each Loop')}
main();

场景2-使用for - of循环作为上面建议的@Bergi

在此处输入图片描述

const getPromise = (time) => {return new Promise((resolve, reject) => {setTimeout(() => {resolve(`Promise resolved for ${time}s`)}, time)})}
const main = async () => {const myPromiseArray = [getPromise(1000), getPromise(500), getPromise(3000)]console.log('Before For Each Loop')
// AVOID USING THIS// myPromiseArray.forEach(async (element, index) => {//   let result = await element;//   console.log(result);// })
// This works wellfor (const element of myPromiseArray) {let result = await element;console.log(result)}
console.log('After For Each Loop')}
main();

如果你是像我这样的小老派,你可以简单地使用经典的for循环,这也有效:)

const getPromise = (time) => {return new Promise((resolve, reject) => {setTimeout(() => {resolve(`Promise resolved for ${time}s`)}, time)})}
const main = async () => {const myPromiseArray = [getPromise(1000), getPromise(500), getPromise(3000)]console.log('Before For Each Loop')
// AVOID USING THIS// myPromiseArray.forEach(async (element, index) => {//   let result = await element;//   console.log(result);// })
// This works well too - the classic for loop :)for (let i = 0; i < myPromiseArray.length; i++) {const result = await myPromiseArray[i];console.log(result);}
console.log('After For Each Loop')}
main();

希望对大家有所帮助,美好的一天,干杯!

如果你不能使用async/wait(IE11、旧打包程序等)然后您可以尝试这个递归函数。我使用fetch作为我的异步调用,但您可以使用任何返回Promise的函数。

var urlsToGet = ['https://google.com', 'https://yahoo.com'];
fetchOneAtATime(urlsToGet);
function fetchOneAtATime(urls) {if (urls.length === 0) {return;}fetch(urls[0]).finally(() => fetchOneAtATime(urls.slice(1)));}

您可以使用async包中的async.for每个循环:

async.forEach(dataToLoop(array), async(data, cb) => {variable = await MongoQuery;}, function(err) {console.log(err);})}).catch((err)=>{console.log(err);})

前面Bergi已经给出了如何正确处理的思路,我就不复述了。浓缩芝士小测试

当涉及到asyncawait时,我想解决使用forEachfor循环之间的区别

forEach如何工作

让我们来看看forEach是如何工作的。根据ECMAScript规范,MDN提供了一个实施,它可以用作一个多边形填充。我复制它并粘贴在这里并删除注释。

Array.prototype.forEach = function (callback, thisArg) {if (this == null) { throw new TypeError('Array.prototype.forEach called on null or undefined'); }var T, k;var O = Object(this);var len = O.length >>> 0;if (typeof callback !== "function") { throw new TypeError(callback + ' is not a function'); }if (arguments.length > 1) { T = thisArg; }k = 0;while (k < len) {var kValue;if (k in O) {kValue = O[k];callback.call(T, kValue, k, O); // pay attention to this line}k++;}};

让我们回到您的代码,让我们将回调提取为函数。

async function callback(file){const contents = await fs.readFile(file, 'utf8')console.log(contents)}

所以,基本上callback返回一个Promise,因为它是用async声明的。在forEach内部,callback只是以正常方式调用,如果回调本身返回一个Promise,javascript引擎不会等待它被解析或拒绝。相反,它将promise放入作业队列中,并继续执行循环。

callback里面的await fs.readFile(file, 'utf8')怎么样?

基本上,当你的asynccallback有机会被执行时,js引擎会暂停到fs.readFile(file, 'utf8')被解析或拒绝,并在实现后恢复执行async函数。所以contents变量存储的是fs.readFile的实际结果,而不是promise。所以,console.log(contents)记录的是文件内容,而不是Promise

为什么for ... of有效?

当我们编写一个通用的for of循环时,我们获得了比forEach更多的控制权。让我们重构printFiles

async function printFiles () {const files = await getFilePaths() // Assume this works fine
for (const file of files) {const contents = await fs.readFile(file, 'utf8')console.log(contents)// or await callback(file)}}

当评估for循环时,我们在async函数中有await Promise,执行将暂停,直到await Promise解决。所以,你可以认为文件是按照确定的顺序一个接一个地读取的。

顺序执行

有时候,我们真的需要异步函数按顺序执行。例如,我有一些新记录存储在一个数组中要保存到数据库中,我希望它们按顺序保存,这意味着数组中的第一条记录应该先保存,然后再保存,直到最后一条被保存。

下面是一个例子:

const records = [1, 2, 3, 4];
async function saveRecord(record) {return new Promise((resolved, rejected) => {setTimeout(()=> {resolved(`record ${record} saved`)}, Math.random() * 500)});}
async function forEachSaveRecords(records) {records.forEach(async (record) => {const res = await saveRecord(record);console.log(res);})}
async function forofSaveRecords(records) {for (const record of records) {const res = await saveRecord(record);console.log(res);}}(async () => {console.log("=== for of save records ===")await forofSaveRecords(records)  
console.log("=== forEach save records ===")await forEachSaveRecords(records)})()

我使用setTimeout来模拟将记录保存到数据库的过程-它是异步的并且花费随机时间。使用forEach,记录以未确定的顺序保存,但使用for..of,它们按顺序保存。

这并没有像OP请求的那样使用async/wait,如果你在NodeJS的后端,<强>只有可以工作。尽管它对某些人来说仍然可能有帮助,因为OP给出的示例是读取文件内容,通常你在后端读取文件。

完全异步和非阻塞:

const fs = require("fs")const async = require("async")
const obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"}const configs = {}
async.forEachOf(obj, (value, key, callback) => {fs.readFile(__dirname + value, "utf8", (err, data) => {if (err) return callback(err)try {configs[key] = JSON.parse(data);} catch (e) {return callback(e)}callback()});}, err => {if (err) console.error(err.message)// configs is now a map of JSON datadoSomethingWith(configs)})

OP的原始问题

是否有任何问题与使用async/wait在一个for循环?…

在一定程度上被@Bergi的选择答案所覆盖,它展示了如何串行和并行处理。但是,并行性还存在其他问题-

  1. Order--@陈志立指出-

例如,如果一个非常小的文件在一个非常大的文件之前完成读取,它将首先被记录,即使小文件在文件数组中的大文件之后。

  1. 可能一次打开太多文件——Bergi在另一个回答下的评论

同时打开数千个文件以同时读取它们也不好。人们总是必须评估顺序、并行或混合方法是否更好。

因此,让我们解决这些问题,展示简洁明了的实际代码,并且没有使用第三方库。易于剪切、粘贴和修改的东西。

并行读取(一次全部),串行打印(每个文件尽可能早)。

最简单的改进是像@Bergi的回答一样执行完全并行,但做一个小的更改,使每个文件都是在保持秩序的同时尽快打印

async function printFiles2() {const readProms = (await getFilePaths()).map((file) =>fs.readFile(file, "utf8"));await Promise.all([await Promise.all(readProms),                      // branch 1(async () => {                                     // branch 2for (const p of readProms) console.log(await p);})(),]);}

上面,两个独立的分支同时运行。

  • 分支1:并行阅读,一次性阅读,
  • 分支2:以串行方式读取以强制排序,但等待时间不会超过必要

那很容易。

并行读取具有并发限制,串行打印(每个文件尽可能早)。

并发限制意味着同一时间读取的文件不超过N个。
就像一个商店一次只允许这么多顾客(至少在COVID期间)。

首先引入一个辅助函数——

function bootablePromise(kickMe: () => Promise<any>) {let resolve: (value: unknown) => void = () => {};const promise = new Promise((res) => { resolve = res; });const boot = () => { resolve(kickMe()); };return { promise, boot };}

函数bootablePromise(kickMe:() => Promise<any>)需要一个函数kickMe作为启动任务的参数(在我们的示例readFile中),但不会立即启动。

bootablePromise返回几个属性

  • promise类型Promise
  • boot类型函数()=>void

promise人生有两个阶段

  1. 作为开始一项任务的承诺
  2. 作为一个承诺,完成它已经开始的任务。

当调用boot()时,promise从第一状态转换到第二状态。

bootablePromise用于printFiles--

async function printFiles4() {const files = await getFilePaths();const boots: (() => void)[] = [];const set: Set<Promise<{ pidx: number }>> = new Set<Promise<any>>();const bootableProms = files.map((file,pidx) => {const { promise, boot } = bootablePromise(() => fs.readFile(file, "utf8"));boots.push(boot);set.add(promise.then(() => ({ pidx })));return promise;});const concurLimit = 2;await Promise.all([(async () => {                                       // branch 1let idx = 0;boots.slice(0, concurLimit).forEach((b) => { b(); idx++; });while (idx<boots.length) {const { pidx } = await Promise.race([...set]);set.delete([...set][pidx]);boots[idx++]();}})(),(async () => {                                       // branch 2for (const p of bootableProms) console.log(await p);})(),]);}

和以前一样有两个分支

  • 分支1:用于运行和处理并发。
  • 分支2:用于打印

现在的区别是允许并发运行的Promise不超过concurLimit

重要的变量是

  • boots:要调用以强制其对应的Promise转换的函数数组。它仅在分支1中使用。
  • set:随机访问容器中有Promises,因此一旦实现就可以轻松删除它们。此容器仅在分支1中使用。
  • bootableProms:这些Promise与最初的set相同,但它是一个数组而不是集合,并且数组永远不会改变。它仅在分支2中使用。

使用模拟fs.readFile运行,所需时间如下(文件名与毫秒时间)。

const timeTable = {"1": 600,"2": 500,"3": 400,"4": 300,"5": 200,"6": 100,};

可以看到这样的测试运行时间,表明并发正在工作——

[1]0--0.601[2]0--0.502[3]0.503--0.904[4]0.608--0.908[5]0.905--1.105[6]0.905--1.005

可在打字游乐场沙盒中作为可执行文件提供

files.forEach(async (file) => {const contents = await fs.readFile(file, 'utf8')})

问题是,迭代函数返回的Promise被forEach()忽略。每次异步代码执行完成后,forEach都不会等待移动到下一次迭代。所有fs.readFile函数将在事件循环的同一轮中被调用,这意味着它们是并行启动的,而不是顺序启动的,并且在调用for每个()后立即执行,而不等待所有fs.readFile操作完成。由于for每个都不会等待每个Promise解析,因此循环实际上会在Promise解析之前完成迭代。你期望在forEach完成后,所有的异步代码都已经执行,但事实并非如此。你最终可能会尝试访问尚不可用的值。

您可以使用此示例代码测试行为

const array = [1, 2, 3];
const simulateAsync = async (num) => {return new Promise((resolve, _) => {setTimeout(() => {const square = num * num;resolve(square);}, [100]);});};
const testForEach = (numbersArray) => {const store = [];// this code here treated as sync codearray.forEach(async (num) => {const squaredNum = await simulateAsync(num);// this will console corrent squaredNum valueconsole.log(squaredNum);store.push(squaredNum);});// you expect that store array is populated but is not// this will return []console.log(store);};testForEach();

解决方案是使用for-of循环。

for (const file of files){const contents = await fs.readFile(file, 'utf8')}

在2022年,我仍然建议使用外部库来处理所有这些异步流。

你的例子是:

import fs from 'fs-promise'import alot from 'alot'
async function printFiles () {const files = await getFilePaths() // Assume this works fine
await alot(files).forEachAsync(async file => {let content = await fs.readFile(file, 'utf8');console.log(content);}).toArrayAsync({ threads: 4 });}}printFiles()

对于简单的例子,asyncfor..of肯定会完成这项工作,但是一旦任务变得更加复杂,您就必须为此使用一些实用程序。

Alot还有许多其他方法可以链接,例如mapAsyncfilterAsyncgroupAsync等。

举个例子:

  • 使用产品元加载JSON文件
  • 提取ProductID
  • 从服务器加载产品
  • 过滤价格>100$的
  • 按价格升序排列
  • 选前50名

import fs from 'fs-promise'import alot from 'alot'import axios from 'axios'import { File } from 'atma-io'
let paths = await getFilePaths();let products = await alot(paths).mapAsync(async path => await File.readAsync<IProductMeta>(path)).mapAsync(async meta => await axios.get(`${server}/api/product/${meta.productId}`)).mapAsync(resp => resp.data).filterAsync(product => product.price > 100).sortBy(product => product.price, 'asc').takeAsync(50).toArrayAsync({ threads: 5, errors: 'include' });