@@ -255,7 +255,7 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
255255 self .upstreams = []
256256
257257 # Lazily loaded exception handler to avoid recursion
258- self ._on_exception = None
258+ self ._on_exception = None
259259
260260 self ._set_asynchronous (asynchronous )
261261 self ._set_loop (loop )
@@ -678,30 +678,36 @@ def _release_refs(self, metadata, n=1):
678678 m ['ref' ].release (n )
679679
680680 def on_exception (self ):
681- """Returns the exception handler associated with this stream
681+ """ Returns the exception handler associated with this stream. The exception handler is either lazily loaded
682+ at this point or (if alredy loaded) just returned.
682683 """
683684 self ._on_exception = self ._on_exception or _on_exception ()
684685 return self ._on_exception
685686
686687
687688class InvalidDataError (Exception ):
688- pass
689+ """Generic error that is raised when data passed into a node causes an exception
690+ """
691+
689692
690693class _on_exception (Stream ):
694+ """ Internal exception-handler for Stream-nodes.
695+ """
691696
692697 def __init__ (self , * args , ** kwargs ):
693698 self .silent = False
694699 Stream .__init__ (self , * args , ** kwargs )
695700
696701 def update (self , x , who = None , metadata = None ):
697702 cause , exc = x
698-
703+
699704 if self .silent or len (self .downstreams ) > 0 :
700705 return self ._emit (x , metadata = metadata )
701706 else :
702707 logger .exception (exc )
703708 raise InvalidDataError (cause ) from exc
704709
710+
705711@Stream .register_api ()
706712class map (Stream ):
707713 """ Apply a function to every element in the stream
@@ -737,7 +743,7 @@ def __init__(self, upstream, func, *args, **kwargs):
737743
738744 def update (self , x , who = None , metadata = None ):
739745 result = self .func (x , * self .args , ** self .kwargs )
740- self ._emit (result , metadata = metadata )
746+ return self ._emit (result , metadata = metadata )
741747
742748
743749@Stream .register_api ()
0 commit comments