Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def worksheet_from_json(json_string):
#use jsonlib for read ops because of better performance
#keep simplejson for write ops as it's more robust
worksheet_dict = jsonlib.read(json_string)
worksheet = Worksheet()
for (key, value) in worksheet_dict.iteritems():
if key == "_console_text":
worksheet._console_text = value
elif key == "_usercode_error":
worksheet._usercode_error = value
else:
col_str, row_str = key.split(",")
cell = Cell()
cell._formula = value["formula"]
cell._python_formula = value.get("python_formula")
cell.dependencies = map(tuple, value.get("dependencies", []))
cell.error = value.get("error")
cell._value = value.get("value", undefined)
cell.formatted_value = value["formatted_value"]
worksheet[int(col_str), int(row_str)] = cell
def sendOutIfFull():
if MeasureIt.queue.qsize() < 1 or not UDP_IP:
return
with MeasureIt.lock:
if MeasureIt.to_be_sent is None:
MeasureIt.to_be_sent = "[ "
try:
while True:
item = MeasureIt.queue.get_nowait()
item_json = jsonlib.write(item)
if len(MeasureIt.to_be_sent) + len(item_json) + 2 > MAX_FRAME and len(MeasureIt.to_be_sent) > 2:
# print "Sending..."
MeasureIt.to_be_sent = "%s%s" %(MeasureIt.to_be_sent[:-1], ",\"%s\",%.2f ]" % (API_KEY, time.time()))
sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
sock.sendto( zlib.compress(MeasureIt.to_be_sent), (UDP_IP, UDP_PORT) )
# print "sent", MeasureIt.to_be_sent
#print len(zlib.compress(MeasureIt.to_be_sent))
MeasureIt.to_be_sent = "[ %s," % (item_json,)
else:
MeasureIt.to_be_sent = "%s%s," %(MeasureIt.to_be_sent, item_json)
# print MeasureIt.to_be_sent
MeasureIt.queue.task_done()
except:
import sys
print sys.exc_info()
pass
def api_json_to_worksheet(sheet_json):
sheet_values = jsonlib.loads(sheet_json)
worksheet = Worksheet()
worksheet.name = sheet_values.get('name', 'Untitled')
for key, value in sheet_values.iteritems():
if key == "usercode_error":
worksheet._usercode_error = value
elif isinstance(value, dict):
rows = value
col = int(key)
for row, value in rows.iteritems():
row = int(row)
worksheet[col, row].value = value
return worksheet
def _parse_response(self, response, method, format=None):
"""Parses the response according to the given (optional) format, which should be either 'JSON' or 'XML'."""
if not format:
format = RESPONSE_FORMAT
if format == 'JSON':
result = simplejson.loads(response)
self._check_error(result)
elif format == 'XML':
dom = minidom.parseString(response)
result = self._parse_response_item(dom)
dom.unlink()
if 'error_response' in result:
self._check_error(result['error_response'])
result = result[method[9:].replace('.', '_') + '_response']
else:
raise RuntimeError('Invalid format specified.')
return result
# -*- coding: UTF-8 -*-
"""
Picka is a data generation and randomization module which aims to increase
coverage by increasing the amount of tests you _dont_ have to write
by hand.
By: Anthony Long
"""
import os
import random
import string
import jsonlib as json
__docformat__ = "restructuredtext en"
albanian_names = json.loads(open(os.path.join(os.path.dirname(__file__), 'albanian.json')).read())['albanian']
def age(min=1, max=99):
return random.randint(min, max + 1)
def apartment_number():
return ' '.join([random.choice(["apartament", "pas", "dhomë"]), str(random.randint(1, 100))])
def business_title():
pass
def calling_code():
pass
def _prepare_response(self, code, data):
msg = self._get_msg(code)
if data:
try:
data = simplejson.loads(data)
except ValueError:
pass
return {'status': code, 'body': data or {}, 'msg': msg}
def _parse_response(self, response, method, format=None):
"""Parses the response according to the given (optional) format, which should be either 'JSON' or 'XML'."""
if not format:
format = RESPONSE_FORMAT
if format == 'JSON':
result = simplejson.loads(response)
print 'result: ', result
self._check_error(result)
elif format == 'XML':
dom = minidom.parseString(response)
result = self._parse_response_item(dom)
dom.unlink()
if 'error_response' in result:
self._check_error(result['error_response'])
result = result[method[9:].replace('.', '_') + '_response']
else:
raise RuntimeError('Invalid format specified.')
return result
def _parse_response(self, response, method, format=None):
"""Parses the response according to the given (optional) format, which should be either 'JSON' or 'XML'."""
if not format:
format = RESPONSE_FORMAT
if format == 'JSON':
result = simplejson.loads(response)
self._check_error(result)
elif format == 'XML':
dom = minidom.parseString(response)
result = self._parse_response_item(dom)
dom.unlink()
if 'error_response' in result:
self._check_error(result['error_response'])
result = result[method[9:].replace('.', '_') + '_response']
else:
raise RuntimeError('Invalid format specified.')
return result
def load(self):
import jsonlib
inp = openCompressed(self.fileName,"rb")
#nodes = json.loads(buf,object_hook=lambda o: TaxaNode(**o) if "idpar" in o else o)
#return dict( ((node.id,node) for node in nodes) )
nodes = {}
for line in inp:
node = jsonlib.read(line.strip(),use_float=True)
nodes[node["id"]] = TaxaNode(**node)
inp.close()
return nodes
def fastloadfile(filename): return jsonlib.read(myopen(filename).read(), use_float=True)
def fastload(file): return jsonlib.read(file.read(), use_float=True)