Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_connection_parameters(user, info=''):
timestamp = str(int(time.time()))
user_pk = str(user.pk) if _is_authenticated(user) else ""
token = generate_token(
settings.CENTRIFUGE_SECRET,
user_pk,
timestamp,
info=info
)
return {
'sockjs_endpoint': settings.CENTRIFUGE_ADDRESS + '/connection',
'ws_endpoint': settings.CENTRIFUGE_ADDRESS + '/connection/websocket',
'user': user_pk,
'timestamp': timestamp,
'token': token,
'info': info
}
def get_auth_data():
user = USER_ID
now = str(int(time.time()))
token = generate_token(options.project_secret, options.project_key, user, now, info=INFO)
auth_data = {
'token': token,
'user': user,
'project': options.project_key,
'timestamp': now,
'info': INFO
}
return auth_data
def post(self):
#raise tornado.web.HTTPError(403)
logging.info("client wants to refresh its connection parameters")
user = USER_ID
now = str(int(time.time()))
token = generate_token(options.project_secret, options.project_key, user, now, info=INFO)
to_return = {
'token': token,
'user': user,
'project': options.project_key,
'timestamp': now,
'info': INFO
}
self.set_header('Content-Type', 'application/json; charset="utf-8"')
self.write(json.dumps(to_return))
def mq_generate_token(user, timestamp, info=""):
token = generate_token(SECRET_KEY, user, timestamp, info)
return token
@register.simple_tag
def mq_generate_token(user, timestamp, info=""):
token = generate_token(SECRET_KEY, user, timestamp, info)
if DEBUG is True:
print "Generating token:"
print "Key: "+SECRET_KEY
print "User: "+user
print "Timestamp: "+timestamp
print "Generated token for user "+user+" at "+timestamp+": "+token
return token
def post(self, request, *args, **kwargs):
"""
Returns a token identifying the user in Centrifugo.
"""
current_timestamp = "%.0f" % time.time()
user_id_str = u"{0}".format(request.user.id)
token = generate_token(settings.CENTRIFUGE_SECRET, user_id_str, "{0}".format(current_timestamp), info="")
# we get all the channels to which the user can subscribe
participant = Participant.objects.get(id=request.user.id)
# we use the threads as channels ids
channels = []
for thread in Thread.managers.get_threads_where_participant_is_active(participant_id=participant.id):
channels.append(
build_channel(settings.CENTRIFUGO_MESSAGE_NAMESPACE, thread.id, thread.participants.all())
)
# we also have a channel to alert us about new threads
threads_channel = build_channel(settings.CENTRIFUGO_THREAD_NAMESPACE, request.user.id, [request.user.id]) # he is the only one to have access to the channel
channels.append(threads_channel)
# we return the information