@@ -389,6 +389,33 @@ def compileme(self, inputid):
389389 }
390390
391391
392+ class MyriaStreamingSink (algebra .Stream , MyriaOperator ):
393+
394+ """A Myria StreamingSink"""
395+
396+ def __init__ (self , input ):
397+ algebra .UnaryOperator .__init__ (self , input )
398+
399+ def num_tuples (self ):
400+ return self .input .num_tuples ()
401+
402+ def partitioning (self ):
403+ # TODO: have a way to say it is on a specific worker
404+ return RepresentationProperties ()
405+
406+ def shortStr (self ):
407+ return "%s" % self .opname ()
408+
409+ def compileme (self , inputid ):
410+ return {
411+ "opType" : "StreamingSink" ,
412+ "argChild" : inputid ,
413+ }
414+
415+ def __repr__ (self ):
416+ return "{op}({inp!r})" .format (op = self .opname (), inp = self .input )
417+
418+
392419class MyriaAppendTemp (algebra .AppendTemp , MyriaOperator ):
393420
394421 def compileme (self , inputid ):
@@ -623,6 +650,14 @@ def compileme(self, inputid):
623650 raise NotImplementedError ('shouldn' 't ever get here, should be turned into CP-CC pair' ) # noqa
624651
625652
653+ class MyriaStream (algebra .Stream , MyriaOperator ):
654+
655+ """Represents a streaming sink operator"""
656+
657+ def compileme (self , inputid ):
658+ raise NotImplementedError ('shouldn' 't ever get here, should be turned into CP-CC-StreamingSink triple' ) # noqa
659+
660+
626661class MyriaDupElim (algebra .Distinct , MyriaOperator ):
627662
628663 """Represents duplicate elimination"""
@@ -1837,6 +1872,18 @@ def fire(self, expr):
18371872 return expr
18381873
18391874
1875+ class ExpandStreamSink (rules .Rule ):
1876+
1877+ def fire (self , expr ):
1878+ if not isinstance (expr , MyriaStream ):
1879+ return expr
1880+
1881+ producer = MyriaCollectProducer (expr .input , None )
1882+ consumer = MyriaCollectConsumer (producer )
1883+ sink = MyriaStreamingSink (consumer )
1884+ return sink
1885+
1886+
18401887# 6. shuffle logics, hyper_cube_shuffle_logic is only used in HCAlgebra
18411888left_deep_tree_shuffle_logic = [
18421889 ShuffleBeforeSetop (),
@@ -1873,6 +1920,7 @@ def fire(self, expr):
18731920 rules .OneToOne (algebra .Difference , MyriaDifference ),
18741921 rules .OneToOne (algebra .OrderBy , MyriaInMemoryOrderBy ),
18751922 rules .OneToOne (algebra .Sink , MyriaSink ),
1923+ rules .OneToOne (algebra .Stream , MyriaStream ),
18761924 rules .OneToOne (algebra .IDBController , MyriaIDBController ),
18771925]
18781926
@@ -1902,7 +1950,7 @@ class MyriaAlgebra(Algebra):
19021950 MyriaScanTemp ,
19031951 MyriaFileScan ,
19041952 MyriaEmptyRelation ,
1905- MyriaSingleton
1953+ MyriaSingleton ,
19061954 )
19071955
19081956
@@ -2187,6 +2235,7 @@ def opt_rules(self, **kwargs):
21872235 [AddAppendTemp ()],
21882236 break_communication ,
21892237 idb_until_convergence (kwargs .get ('async_ft' )),
2238+ [ExpandStreamSink ()],
21902239 ]
21912240
21922241 if kwargs .get ('add_splits' , True ):
@@ -2241,7 +2290,7 @@ def opt_rules(self, **kwargs):
22412290 rules .push_select ,
22422291 distributed_group_by (MyriaGroupBy ),
22432292 [rules .DeDupBroadcastInputs ()],
2244- hyper_cube_shuffle_logic
2293+ hyper_cube_shuffle_logic ,
22452294 ]
22462295
22472296 if kwargs .get ('push_sql' , False ):
@@ -2250,7 +2299,8 @@ def opt_rules(self, **kwargs):
22502299 compile_grps_sequence = [
22512300 myriafy ,
22522301 [AddAppendTemp ()],
2253- break_communication
2302+ break_communication ,
2303+ [ExpandStreamSink ()],
22542304 ]
22552305
22562306 if kwargs .get ('add_splits' , True ):
@@ -2428,7 +2478,7 @@ def compile_to_json(raw_query, logical_plan, physical_plan,
24282478 string and passed along unchanged."""
24292479
24302480 # Store/StoreTemp is a reasonable physical plan... for now.
2431- root_ops = (algebra .Store , algebra .StoreTemp , algebra .Sink )
2481+ root_ops = (algebra .Store , algebra .StoreTemp , algebra .Sink , algebra . Stream )
24322482 if isinstance (physical_plan , root_ops ):
24332483 physical_plan = algebra .Parallel ([physical_plan ])
24342484
0 commit comments