Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_transforms(self, configs):
"""
Create transforms based on configs, set each transform's result address to
this object's transform_address, so that all transformed events will be delivered
to this object.
"""
self.transforms = {}
for props in configs:
class_name = props['class']
if(class_name == 'MovingAverage'):
mavg = ta.MovingAverage(self.feed, props, self.transform_address)
self.transforms[mavg.config.name] = mavg
keys = copy.copy(self.transforms.keys())
keys.append("feed") #for the raw feed
self.data_buffer = qmsg.MergedParallelBuffer(keys)
self.buffers = {}
for name, transform in self.transforms.iteritems():
self.buffers[name] = []