Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
os.mkdir(outdir)
ofnset = []
cnt = 0
for course_id in course_id_set:
gb = gsutil.gs_path_from_course_id(course_id, use_dataset_latest=use_dataset_latest)
ofn = outdir / ('person_course_%s.csv.gz' % (course_id.replace('/', '__')))
ofnset.append(ofn)
if (nskip>0) and ofn.exists():
print "%s already exists, not downloading" % ofn
sys.stdout.flush()
continue
if ofn.exists():
fnset = gsutil.get_gs_file_list(gb)
local_dt = gsutil.get_local_file_mtime_in_utc(ofn)
fnb = 'person_course.csv.gz'
if not fnb in fnset:
print "%s/%s missing! skipping %s" % (gb, fnb, course_id)
continue
if (fnb in fnset) and (local_dt >= fnset[fnb]['date']):
print "%s already exists with date %s (gs file date %s), not re-downloading" % (ofn, local_dt, fnset[fnb]['date'])
sys.stdout.flush()
continue
else:
print "%s already exists but has date %s (gs file date %s), so re-downloading" % (ofn, local_dt, fnset[fnb]['date'])
sys.stdout.flush()
cmd = 'gsutil cp %s/person_course.csv.gz %s' % (gb, ofn)
print "Retrieving %s via %s" % (course_id, cmd)
sys.stdout.flush()
def process_dir(course_id, gspath='gs://x-data', logs_directory="TRACKING_LOGS", verbose=True):
cdir = path(logs_directory) / gsutil.path_from_course_id(course_id)
print "="*77
print "Transferring tracking logs for %s from directory %s (start %s)" % (course_id, cdir, datetime.datetime.now())
print "="*77
if not os.path.exists(cdir):
print "Oops! non-existent course tracking logs directory %s" % cdir
return
sys.stdout.flush()
cdir = path(cdir)
gp = path(gspath + "/" + cdir.basename()) / 'DAILY'
filelist = gsutil.get_gs_file_list(gp)
# print filelist
local_files = glob.glob(cdir / 'tracklog*.gz')
local_files.sort()
for fn in local_files:
fnp = path(fn)
fnb = fnp.basename()
statbuf = os.stat(fn)
mt = datetime.datetime.fromtimestamp(statbuf.st_mtime)
# do some date checking to upload log files which have changed, and are newer than that on google cloud storage
local = pytz.timezone ("America/New_York")
# naive = datetime.datetime.strptime ("2001-2-3 10:11:12", "%Y-%m-%d %H:%M:%S")
local_dt = local.localize(mt, is_dst=None)
utc_dt = local_dt.astimezone (pytz.utc)
dbname=DBNAME,
collection = 'tracking_log',
tracking_logs_directory="TRACKING_LOGS",
):
print "extracting logs for course %s" % course_id
# list of dates to dump
dates = daterange(d2dt(start), d2dt(end))
if verbose:
print "Dates to dump:", [x['dstr'] for x in dates]
# what files already on gs?
gspath = "%s/DAILY" % gs_path_from_course_id(course_id)
gsfiles = get_gs_file_list(gspath)
DIR = tracking_logs_directory
if not os.path.exists(DIR):
os.mkdir(DIR)
DIR += '/' + path_from_course_id(course_id)
if not os.path.exists(DIR):
os.mkdir(DIR)
filebuf = []
for k in range(len(dates)-1):
d = dates[k]
ofn = '%s/tracklog-%s.json.gz' % (DIR, d['dstr'])
start = d['start']
end = d['end']
If wait=True then waits for loading jobs to be completed. It's desirable to wait
if subsequent jobs which need these tables (like person_day) are to be run
immediately afterwards.
'''
print "Loading daily tracking logs for course %s into BigQuery (start: %s)" % (course_id, datetime.datetime.now())
sys.stdout.flush()
gsroot = gsutil.path_from_course_id(course_id)
mypath = os.path.dirname(os.path.realpath(__file__))
SCHEMA = json.loads(open('%s/schemas/schema_tracking_log.json' % mypath).read())['tracking_log']
gsdir = '%s/%s/DAILY/' % (gsbucket, gsroot)
fnset = gsutil.get_gs_file_list(gsdir)
dataset = bqutil.course_id2dataset(gsroot, dtype="logs")
# create this dataset if necessary
bqutil.create_dataset_if_nonexistent(dataset)
tables = bqutil.get_list_of_table_ids(dataset)
tables = [x for x in tables if x.startswith('track')]
if verbose:
print "-"*77
print "current tables loaded:", json.dumps(tables, indent=4)
print "files to load: ", json.dumps(fnset.keys(), indent=4)
print "-"*77
sys.stdout.flush()
# if using latest date directory, also look for course_image.jpg one level up
if use_dataset_latest:
print lfp.dirname()
ci_files = glob.glob(lfp.dirname() / 'course_image.jpg')
if ci_files:
local_files += list(ci_files)
print "--> local course_image file: %s" % ci_files
gsdir = gsutil.gs_path_from_course_id(course_id, gsbucket=gsbucket, use_dataset_latest=use_dataset_latest)
local = pytz.timezone ("America/New_York")
if do_gs_copy:
try:
fnset = get_gs_file_list(gsdir)
except Exception as err:
fnset = []
def copy_if_newer(fn, fnset, options='-z csv,json'):
statbuf = os.stat(fn)
mt = datetime.datetime.fromtimestamp(statbuf.st_mtime)
# do some date checking to upload files which have changed, and are newer than that on google cloud storage
local_dt = local.localize(mt, is_dst=None)
utc_dt = local_dt.astimezone (pytz.utc)
fnb = os.path.basename(fn)
if fnb in fnset and fnset[fnb]['date'] > utc_dt:
print "...%s already copied, skipping" % fn
sys.stdout.flush()
return