Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handle(self, *args, **options):
#read xml from url
url = urllib2.urlopen('http://webservicesri.swalekha.in/Service.asmx?op=GetExportGroupMemberDataHnNNew2?pUsername=admin&pPassword=JSLPSSRI')
contents = url.read()
xml_file = open("jslps_data_integration_files/jslps-hnn-person-new2.xml", 'w')
xml_file.write(contents)
xml_file.close()
partner = Partner.objects.get(id = 24)
user_obj = User.objects.get(username="jslps_bot")
csv_file = open('jslps_data_integration_files/jslps-hnn-person-new2.csv', 'wb')
wtr = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
tree = ET.parse('jslps_data_integration_files/jslps-hnn-person-new2.xml')
root = tree.getroot()
for c in root.findall('GroupMemberDataNew2'):
village_code = c.find('VillageCode').text
group_code = c.find('GroupCode').text
member_code = c.find('Group_M_Code').text
member_name = c.find('MemberName').text
father_name = c.find('FatherName').text if c.find('FatherName') else 'X'
gender = 'F'
try:
village = JSLPS_Village.objects.get(village_code=village_code)
except JSLPS_Village.DoesNotExist as e:
wtr.writerow(['JSLPS village not EXIST: '+ str(member_code), village_code, group_code, e])
continue
def _configure_csv_file(self, file_handle, schema):
"""Configure a csv writer with the file_handle and write schema
as headers for the new file.
"""
csv_writer = csv.writer(file_handle, encoding='utf-8',
delimiter=self.field_delimiter)
csv_writer.writerow(schema)
return csv_writer
def write_aggregate_file(counter, filename):
with open(SEFARIA_EXPORT_PATH + "/links/%s" % filename, 'wb') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([
"Text 1",
"Text 2",
"Link Count",
])
for link in counter.most_common():
writer.writerow([
link[0][0],
link[0][1],
link[1],
])
"slug",
"full_name",
"level",
"parent",
"days",
"absolute_url",
"average_response_time",
"fee_rate",
"success_rate"
)
page = 1
csv_file = open('jurisdiction_stats.csv', 'w')
csv_file.seek(0)
csv_writer = unicodecsv.writer(csv_file)
csv_writer.writerow(fields)
while next_ is not None:
r = requests.get(next_, headers=headers)
try:
json = r.json()
next_ = json['next']
for datum in json['results']:
csv_writer.writerow([datum[field] for field in fields])
print 'Page %d of %d' % (page, json['count'] / 20 + 1)
page += 1
except Exception as e:
print e
with open(file_, 'w') as f:
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(query)
except:
print query
raise
first = rows.fetchone()
if not first:
raise Exception("Got no data from query: {}".format(query))
writer = csv.writer(f)
writer.writerow(first.keys())
writer.writerow(tuple(first))
for row in rows:
lr()
writer.writerow(tuple(row))
return file_
def sql2csv(name=None, delimiter=',', quotechar='"'):
insp = reflection.Inspector.from_engine(engine)
output = io.BytesIO()
writer = unicodecsv.writer(output, delimiter=str(delimiter), quotechar=str(quotechar), quoting=csv.QUOTE_ALL, encoding='utf-8')
connection = engine.connect()
res = connection.execute("SELECT * FROM " + name)
# write header
writer.writerow([c['name'] for c in insp.get_columns(name)])
while True:
row = res.fetchone()
if row is None:
break
writer.writerow(row)
retval = output.getvalue()
connection.close()
def writeCSV(allCanvasses):
def listGet(inList, index, default):
try:
out = inList[index]
except IndexError:
out = default
return out
with open(outfile, 'wb') as csvfile:
w = unicodecsv.writer(csvfile, encoding='utf-8')
w.writerow(headers)
for canvass in allCanvasses:
for precinct, results in canvass.results.iteritems():
for index, result in enumerate(results):
normalisedOffice = office_lookup[canvass.office] # Normalise the office
candidate = canvass.candidates[index]
party = listGet(canvass.parties, index, "")
normalisedCandidate = normaliseName(candidate) # Normalise the candidate
normalisedPrecinct = precinct.replace("*", "")
row = [county, normalisedPrecinct, normalisedOffice, canvass.district,
party, normalisedCandidate, result]
print row
w.writerow(row)
self._inserter = self._write_dict
elif row_is_dict and not has_header:
self.header = row.keys()
self._writer = unicodecsv.DictWriter(
f,
self.header,
delimiter=delimiter,
escapechar=self.escapechar,
encoding=self.encoding)
if self.write_header:
self._writer.writeheader()
self._inserter = self._write_dict
elif row_is_list and has_header:
self._writer = unicodecsv.writer(
f,
delimiter=delimiter,
escapechar=self.escapechar,
encoding=self.encoding)
if self.write_header:
self._writer.writerow(self.header)
self._inserter = self._write_list
elif row_is_list and not has_header:
self._writer = unicodecsv.writer(
f,
delimiter=delimiter,
escapechar=self.escapechar,
encoding=self.encoding)
self._inserter = self._write_list
for column in range(0, len(tds)):
sheet_item.append(tds[column].get_text())
if tds[column].get_text() == '详细仓位' and tds[column].a != None:
detailcontent = urllib2.urlopen(
'http://data.cfi.cn/' + tds[column].a['href']).read()
detailsoup = BeautifulSoup(
detailcontent, 'lxml', from_encoding="utf-8").findAll(id='datatable')[0]
detailpath = csvpath + '详细仓位' + '/'
if os.path.exists(detailpath) == False:
os.makedirs(detailpath)
detailfile = str(
tds[0].get_text()) + str(tds[2].get_text()).replace('-', '') + '.csv'
write2csv(detailpath, detailfile, detailsoup, True)
sheet.append(sheet_item)
with open(csvpath + csvfile, 'a+') as f:
f_w = unicodecsv.writer(f, encoding='utf-8')
for i in sheet:
f_w.writerow(i)
def make_csv_writer_response(filename, *args, **kwargs):
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="{}"'.format(filename)
return unicodecsv.writer(response), response