Flutter : Bad state: Stream has already been listened to


class MyPage extends StatelessWidget {
@override
Widget build(BuildContext context) {
return DefaultTabController(
length: 2,
child: new Scaffold(
appBar: TabBar(
tabs: [
Tab(child: Text("MY INFORMATION",style: TextStyle(color: Colors.black54),)),
Tab(child: Text("WEB CALENDER",style: TextStyle(color: Colors.black54),)),
],
),
body:PersonalInformationBlocProvider(
movieBloc: PersonalInformationBloc(),
child: TabBarView(
children: [
MyInformation(),
new SmallCalendarExample(),
],
),
),
),
);
}
}
    

class MyInformation extends StatelessWidget{
// TODO: implement build
var deviceSize;
    

//Column1
Widget profileColumn(PersonalInformation snapshot) => Container(
height: deviceSize.height * 0.24,
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Row(
mainAxisAlignment: MainAxisAlignment.spaceEvenly,
children: <Widget>[
Container(
decoration: BoxDecoration(
borderRadius:
new BorderRadius.all(new Radius.circular(50.0)),
border: new Border.all(
color: Colors.black,
width: 4.0,
),
),
child: CircleAvatar(
backgroundImage: NetworkImage(
"http://www.binaythapa.com.np/img/me.jpg"),
foregroundColor: Colors.white,
backgroundColor: Colors.white,
radius: 40.0,
),
),
ProfileTile(
title: snapshot.firstName,
subtitle: "Developer",
),
SizedBox(
height: 10.0,
),
],
)
],
),
);
Widget bodyData(PersonalInformation snapshot) {
return SingleChildScrollView(
child: Column(
children: <Widget>[
profileColumn(snapshot)
],
),
);
}
    

    

@override
Widget build(BuildContext context) {
final personalInformationBloc = PersonalInformationBlocProvider.of(context);
    

deviceSize = MediaQuery.of(context).size;
return StreamBuilder(
stream: personalInformationBloc.results,
builder: (context,snapshot){
if (!snapshot.hasData)
return Center(
child: CircularProgressIndicator(),
);
return bodyData(snapshot.data);
}
);
}
}
   



I am using Bloc Pattern for retrieving data from Rest API (just called the whole object from JSON and parsed user name only). The Page consists of two tabs MyInformation and SmallCalendar. When the app runs the data are fetched correctly and everything is good. When I go to tab two and return to tab one then the whole screens in tab one goes to red showing error: Bad state: Stream has already been listened to.

94838 次浏览

最常见的 Stream形式一次只能听一次。如果尝试添加多个侦听器,它将引发

错误状态: 流已被监听

要防止此错误,请公开一个 广播Stream。您可以使用 myStream.asBroadcastStream将您的流转换为广播

这需要在公开 Stream的类中完成。不作为 StreamBuilder的参数。由于 asBroadcastStream在内部监听原始流以生成广播流,这意味着您不能在同一个流上调用此方法两次。

The problem was due to not disposing the controllers in bloc.

  void dispose() {
monthChangedController.close();
dayPressedController.close();
resultController.close();
}

您应该使用以下内容。

StreamController<...> _controller = StreamController<...>.broadcast();

我曾经遇到过同样的问题,当我使用 Obable.comineLatest2 for StreamBuilder into Drawer 的结果时:

颤动: 不良状态: 流已被监听。

对于我来说,最好的解决方案是将这种结合的结果添加到新的行为主题中,并听取新的行为主题。

别忘了听老朋友的话! ! !

class VisitsBloc extends Object {
Map<Visit, Location> visitAndLocation;


VisitsBloc() {
visitAndLocations.listen((data) {
visitAndLocation = data;
});
}


final _newOne = new BehaviorSubject<Map<Visit, Location>>();


Stream<Map<Visit, Location>> get visitAndLocations => Observable.combineLatest2(_visits.stream, _locations.stream, (List<vis.Visit> visits, Map<int, Location> locations) {
Map<vis.Visit, Location> result = {};


visits.forEach((visit) {
if (locations.containsKey(visit.skuLocationId)) {
result[visit] = locations[visit.skuLocationId];
}
});


if (result.isNotEmpty) {
_newOne.add(result);
}
});
}

我没有使用 .broadcast,因为它减慢了我的用户界面。

在我的例子中,我得到这个错误是因为同一行代码 myStream.listen()在同一个流的同一个小部件中被调用了两次。显然这是不允许的!

更新: 如果你打算多次订阅同一个流,你应该使用一个行为主题来代替:


// 1- Create a behavior subject
final _myController = BehaviorSubject<String>();


// 2- To emit/broadcast new events, we will use Sink of the behavior subject.
Sink<String> get mySteamInputSink => _myController.sink;


// 3- To listen/subscribe to those emitted events, we will use Stream (observable) of the behavior subject.
Stream<String> get myStream => _myController.stream;


// 4- Firstly, Listen/subscribe to stream events.
myStream.listen((latestEvent) {
// use latestEvent data here.
});


// 5- Emit new events by adding them to the BehaviorSubject's Sink.
myStreamInputSink.add('new event');

That's it!

然而,还有最后一个重要的步骤。

我们必须在一个小部件被销毁之前取消所有流监听器的订阅。

为什么? (你可能会问)

因为如果一个小部件订阅了一个流,当这个小部件被销毁时,被销毁的小部件流订阅将保留在应用程序内存中,导致内存泄漏和不可预测的行为:

_flush() {
_myController.close();
_myController = StreamController<String>();
}

############################### ###############################

老答案:

对我来说,解决这个问题的方法是创建一个我的流控制器作为广播流控制器:

var myStreamController = StreamController<bool>.broadcast();

还有

使用数据流作为广播数据流:

myStreamController.stream.asBroadcastStream().listen(onData);

总结一下:

主要的区别是 broadcast() 创造 a Stream可监听多个源,但是至少需要监听一个源才能开始发送项目。

A Stream should be inert until a subscriber starts listening on it (using the [onListen] callback to start producing events).

asBroadcastStream将一个现有的 Stream转换成一个可以多重监听的 Stream,但是它不需要监听就可以开始发射,因为它在底层调用 onListen()

你可以使用 broadcast,它允许多次监听流,但它也阻止了我们倾听过去的事件:

当没有侦听器时,广播流不缓冲事件。

更好的选项 是使用 rxdart包类中的 BehaviorSubject作为 StreamController:

一个特殊的 StreamController,它捕获已添加到控制器中的最新项,并将其作为第一个项发送给任何新的侦听器。

用法简单如下:

StreamController<...> _controller = BehaviorSubject();

对于那些在做 Future.asStream()的时候遇到这个问题的人来说,你需要 Future.asStream().shareReplay(maxSize: 1)使它成为一个广播/热流。

对我来说,将流定义为一个全局变量是有效的

Stream infostream (在一个有状态小部件的... State 中,我在小部件之外定义了它,它工作了

(不确定是否是最好的解决方案,但尝试一下)

StreamSplitter.split() from the async can be used for this use case

import 'package:async/async.dart';
...


main() {
var process = Process.start(...);
var stdout = StreamSplitter<List<int>>(process.stdout);
readStdoutFoo(stdout.split());
readStdoutBar(stdout.split());
}


readStdoutFoo(Stream<List<int>> stdout) {
stdout.transform(utf8.decoder)...
}


readStdoutBar(Stream<List<int>> stdout) {
stdout.transform(utf8.decoder)...
}

在我的情况下,我是使用软件包连接,而在扑动网络。 Commenting all Connectivity calls solved the issue.

I'm now just using Connectivity while only on Android/iOS.

因此,也许检查你的软件包即时您正在使用的一些软件包有一些问题,在网络上的情况下,你正在为网络开发。

Hopefully I could help someone with this Information.

这是提供程序的问题,我通过更改提供程序初始化解决了这个问题

例如

locator.registerSingleton<LoginProvider>(LoginProvider());

 locator.registerFactory(() => TaskProvider());

定位器在哪

GetIt locator = GetIt.instance;

This could help any other person. In my case i was using two StreamBuilder one in each tab. So when i swipe to above the tab and back. The other stream was already listened so i get the error.

我所做的就是从选项卡中移除 StreamBuilder 并把它放在顶部。每次有变化时,我都设置 State。我返回一个空的 Text (”)以避免显示任何内容。我希望这种方法

其他情况下。如果您以某种方式在无状态类中使用流,请注意。这是您得到上述错误的原因之一。 将无状态类转换为有状态类,并在 stream Controller 上调用 init 和 pose 方法:

 @override
void initState() {
super.initState();
YourStreamController.init();
}


@override
void dispose() {
YourStreamController.dispose();
super.dispose();
}

我认为不是所有的答案考虑到的情况下,你 不想或者根本不能使用广播流

More often than not, you have to rely on receiving past events because the listener might be created later than the stream it listens to and it's important to receive such information.

在 Flutter 中经常发生的情况是,监听流(“监听器”)的小部件被破坏并重新构建。如果尝试将侦听器附加到与前面相同的流,则会得到此错误。

为了克服这个问题,您必须手动管理您的流。我创建了 这个要点,演示了如何实现这一点。您还可以在 这个飞镖靶上运行这段代码,以查看它的行为并使用它。我已经使用了简单的 String id 来引用特定的 StreamController实例,但可能还有更好的解决方案(也许是 符号)。

大意如下:

/* NOTE: This approach demonstrates how to recreate streams when
your listeners are being recreated.
It is useful when you cannot or do not want to use broadcast
streams. Downside to broadcast streams is that it is not
guaranteed that your listener will receive values emitted
by the stream before it was registered.
*/


import 'dart:async';
import 'dart:math';


// [StreamService] manages state of your streams. Each listener
// must have id which is used in [_streamControllers] map to
// look up relevant stream controller.
class StreamService {
final Map<String, StreamController<int>?> _streamControllers = {};


Stream<int> getNamedStream(String id) {
final controller = _getController(id);
return controller.stream;
}


// Will get existing stream controller by [id] or create a new
// one if it does not exist
StreamController<int> _getController(String id) {
final controller = _streamControllers[id] ?? _createController();


_streamControllers[id] = controller;


return controller;
}


void push(String id) {
final controller = _getController(id);


final rand = Random();
final value = rand.nextInt(1000);


controller.add(value);
}


// This method can be called by listener so
// memory leaks are avoided. This is a cleanup
// method that will make sure the stream controller
// is removed safely
void disposeController(String id) {
final controller = _streamControllers[id];


if (controller == null) {
throw Exception('Controller $id is not registered.');
}


controller.close();
_streamControllers.remove(id);
print('Removed controller $id');
}


// This method should be called when you want to remove
// all controllers. It should be called before the instance
// of this class is garbage collected / removed from memory.
void dispose() {
_streamControllers.forEach((id, controller) {
controller?.close();
print('Removed controller $id during dispose phase');
});
_streamControllers.clear();
}


StreamController<int> _createController() {
return StreamController<int>();
}
}


class ManagedListener {
ManagedListener({
required this.id,
required StreamService streamService,
}) {
_streamService = streamService;
}


final String id;
late StreamService _streamService;
StreamSubscription<int>? _subscription;


void register() {
_subscription = _streamService.getNamedStream(id).listen(_handleStreamChange);
}


void dispose() {
_subscription?.cancel();
_streamService.disposeController(id);
}


void _handleStreamChange(int n) {
print('[$id]: streamed $n');
}
}


void main(List<String> arguments) async {
final streamService = StreamService();


final listener1Id = 'id_1';
final listener2Id = 'id_2';


final listener1 = ManagedListener(id: listener1Id, streamService: streamService);
listener1.register();
  

streamService.push(listener1Id);
streamService.push(listener1Id);
streamService.push(listener1Id);


await Future.delayed(const Duration(seconds: 1));


final listener2 = ManagedListener(id: listener2Id, streamService: streamService);
listener2.register();


streamService.push(listener2Id);
streamService.push(listener2Id);


await Future.delayed(const Duration(seconds: 1));
  

listener1.dispose();
listener2.dispose();


streamService.dispose();
}

确保处理好控制器!

@override
void dispose() {
scrollController.dispose();
super.dispose();
}

当导航离开然后返回到监听流的视图时,我得到了这个错误,因为我将同一视图的一个新实例推入了 Navigator 堆栈,这实际上最终创建了一个新的监听器,尽管它在代码中的位置是相同的。

具体而详细地说,我有一个 ListItemsView小部件,它使用 StreamBuilder来显示流中的所有项目。用户点击“添加项”按钮,这将推动导航器堆栈中的 AddItemView,并在提交表单后,用户被带回到 ListItemsView,其中“糟糕的状态: 流已经被监听。”错误时有发生。

对我来说,解决办法是用 Navigator.pop(context)代替 Navigator.pushNamed(context, ListItemsView.routeName)。这有效地防止了新 ListItemsView 的实例化(作为同一流的第二个订阅者) ,并且只是将用户带回到上一个 ListItemsView 实例。

在您的流控制器上调用

例如:

StreamController<T> sampleController =
StreamController<T>.broadcast();

我之所以经历这种情况是因为,我使用了一个流构建器来创建 tabview 的选项卡列表,并且每当我切换选项卡并返回到前一个选项卡时,我都会得到这个错误。“用构建器小部件包装流构建器”给我带来了奇迹。

我经历过这种情况,总是关闭流量控制器为我工作。