Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def date_parse(field):
(m, d, y) = map(int, data[field].split('/'))
return datetime.datetime(y, m, d)
launch = date_parse('Course Launch')
wrap = date_parse('Course Wrap')
ndays = (wrap - launch).days
nweeks = ndays / 7.0
print "Course length = %6.2f weeks (%d days)" % (nweeks, ndays)
if pin_date:
datedir = pin_date
course_dir = find_course_sql_dir(course_id, basedir, datedir, use_dataset_latest and not pin_date)
cfn = gsutil.path_from_course_id(course_id)
xbfn = course_dir / ("xbundle_%s.xml" % cfn)
if not xbfn.exists():
print "[analyze_content] cannot find xbundle file %s for %s!" % (xbfn, course_id)
if use_dataset_latest:
# try looking in earlier directories for xbundle file
import glob
spath = course_dir / ("../*/xbundle_%s.xml" % cfn)
files = list(glob.glob(spath))
if files:
xbfn = path(files[-1])
if not xbfn.exists():
print " --> also cannot find any %s ; aborting!" % spath
else:
def load_all_daily_logs_for_course(course_id, gsbucket="gs://x-data", verbose=True, wait=False,
check_dates=True):
'''
Load daily tracking logs for course from google storage into BigQuery.
If wait=True then waits for loading jobs to be completed. It's desirable to wait
if subsequent jobs which need these tables (like person_day) are to be run
immediately afterwards.
'''
print "Loading daily tracking logs for course %s into BigQuery (start: %s)" % (course_id, datetime.datetime.now())
sys.stdout.flush()
gsroot = gsutil.path_from_course_id(course_id)
mypath = os.path.dirname(os.path.realpath(__file__))
SCHEMA = json.loads(open('%s/schemas/schema_tracking_log.json' % mypath).read())['tracking_log']
gsdir = '%s/%s/DAILY/' % (gsbucket, gsroot)
fnset = gsutil.get_gs_file_list(gsdir)
dataset = bqutil.course_id2dataset(gsroot, dtype="logs")
# create this dataset if necessary
bqutil.create_dataset_if_nonexistent(dataset)
tables = bqutil.get_list_of_table_ids(dataset)
tables = [x for x in tables if x.startswith('track')]
def course_key_version(course_id, logs_dir="TRACKING_LOGS", verbose=False):
cdir = path(logs_dir) / gsutil.path_from_course_id(course_id)
log_files = glob.glob(cdir / "*.json.gz")
# find a file that's not too small
for fn in log_files:
if path(fn).stat().st_size > 65536:
break
if verbose:
print "Using %s for course_key_version determination" % fn
count = 0
n = 0
with gzip.GzipFile(fn) as fp:
for k in fp:
n += 1
if 'block-v1:' in k:
print "extracting logs for course %s" % course_id
# list of dates to dump
dates = daterange(d2dt(start), d2dt(end))
if verbose:
print "Dates to dump:", [x['dstr'] for x in dates]
# what files already on gs?
gspath = "%s/DAILY" % gs_path_from_course_id(course_id)
gsfiles = get_gs_file_list(gspath)
DIR = tracking_logs_directory
if not os.path.exists(DIR):
os.mkdir(DIR)
DIR += '/' + path_from_course_id(course_id)
if not os.path.exists(DIR):
os.mkdir(DIR)
filebuf = []
for k in range(len(dates)-1):
d = dates[k]
ofn = '%s/tracklog-%s.json.gz' % (DIR, d['dstr'])
start = d['start']
end = d['end']
ofnb = os.path.basename(ofn)
if ofnb in gsfiles:
print "Already have %s, skipping" % ofnb
sys.stdout.flush()
def process_dir(course_id, gspath='gs://x-data', logs_directory="TRACKING_LOGS", verbose=True):
cdir = path(logs_directory) / gsutil.path_from_course_id(course_id)
print "="*77
print "Transferring tracking logs for %s from directory %s (start %s)" % (course_id, cdir, datetime.datetime.now())
print "="*77
if not os.path.exists(cdir):
print "Oops! non-existent course tracking logs directory %s" % cdir
return
sys.stdout.flush()
cdir = path(cdir)
gp = path(gspath + "/" + cdir.basename()) / 'DAILY'
filelist = gsutil.get_gs_file_list(gp)
# print filelist
local_files = glob.glob(cdir / 'tracklog*.gz')
local_files.sort()