Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sanitize(filename, expected):
api = NonCallableMock()
with open(os.path.join('tests', filename), 'r') as f:
status = Status.parse(api, json.load(f))
text = get_sanitized_text(status)
assert '&' not in text
assert 'http' not in text
assert text == expected
def on_data(self, raw_data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
data = json.loads(raw_data)
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = data['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif 'limit' in data:
if self.on_limit(data['limit']['track']) is False:
def on_data(self, data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, json.loads(data))
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'limit' in data:
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
data = json.loads(raw_data)
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = data['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif 'limit' in data:
if self.on_limit(data['limit']['track']) is False:
return False
elif 'disconnect' in data:
if self.on_disconnect(data['disconnect']) is False:
return False
else:
logging.error("Unknown message type: " + str(raw_data))
data = json.loads(raw_data)
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = data['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif 'limit' in data:
if self.on_limit(data['limit']['track']) is False:
return False
elif 'disconnect' in data:
if self.on_disconnect(data['disconnect']) is False:
return False
else:
logging.error("Unknown message type: " + str(raw_data))
def on_data(self, data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, json.loads(data))
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'limit' in data:
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
data = json.loads(raw_data)
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = data['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif 'friends' in data:
if self.on_friends(data['friends']) is False:
return False
elif 'limit' in data:
if self.on_limit(data['limit']['track']) is False:
return False
elif 'disconnect' in data:
if self.on_disconnect(data['disconnect']) is False:
return False
elif 'warning' in data:
def parse(cls, api, json):
result = cls(api)
for k, v in json.items():
if k == 'value' and json['kind'] in ['Tweet', 'LookedupStatus']:
setattr(result, k, Status.parse(api, v))
elif k == 'results':
setattr(result, k, Relation.parse_list(api, v))
else:
setattr(result, k, v)
return result