Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for setting JobConf Environment Variables on a per-step basis
"""
from mrjob.compat import jobconf_from_env
from mrjob.job import MRJob
from mrjob.step import MRStep
JOBCONF_LIST = [
'mapred.map.tasks',
'mapreduce.job.local.dir',
'user.defined',
]
class MRTestPerStepJobConf(MRJob):
def mapper_init(self):
self.increment_counter('count', 'mapper_init', 1)
for jobconf in JOBCONF_LIST:
yield ((self.options.step_num, jobconf),
jobconf_from_env(jobconf, None))
def mapper(self, key, value):
yield key, value
def steps(self):
return([
MRStep(mapper_init=self.mapper_init),
MRStep(mapper_init=self.mapper_init,
mapper=self.mapper,
jobconf={'user.defined': 'nothing',
def test_cant_override_libjars_on_command_line(self):
with patch.object(MRJob, 'libjars', return_value=['honey.jar']):
job = MRJob(['--libjars', 'cookie.jar'])
# ignore switch, don't resolve relative path
self.assertEqual(job._runner_kwargs()['libjars'],
['honey.jar', 'cookie.jar'])
def test_is_task(self):
self.assertEqual(MRJob([]).is_task(), False)
self.assertEqual(MRJob(['--mapper']).is_task(), True)
self.assertEqual(MRJob(['--reducer']).is_task(), True)
self.assertEqual(MRJob(['--combiner']).is_task(), True)
self.assertEqual(MRJob(['--spark']).is_task(), True)
# it as a script anyway.
class MRBoringJob2(MRBoringJob):
INPUT_PROTOCOL = StandardJSONProtocol
INTERNAL_PROTOCOL = PickleProtocol
OUTPUT_PROTOCOL = ReprProtocol
class MRBoringJob3(MRBoringJob):
def internal_protocol(self):
return ReprProtocol()
class MRBoringJob4(MRBoringJob):
INTERNAL_PROTOCOL = ReprProtocol
class MRTrivialJob(MRJob):
OUTPUT_PROTOCOL = RawValueProtocol
def mapper(self, key, value):
yield key, value
def assertMethodsEqual(self, fs, gs):
# we're going to use this to match bound against unbound methods
self.assertEqual([_im_func(f) for f in fs],
[_im_func(g) for g in gs])
def test_default_protocols(self):
mr_job = MRBoringJob([])
self.assertMethodsEqual(
mr_job.pick_protocols(0, 'mapper'),
(RawValueProtocol.read, JSONProtocol.write))
def test_init_does_not_require_tzset(self):
MRJob()
from mrjob.job import MRJob
class SpendByCustomer(MRJob):
def mapper(self, _, line):
(customer, item, orderAmount) = line.split(',')
yield customer, float(orderAmount)
def reducer(self, customer, orders):
yield customer, sum(orders)
if __name__ == '__main__':
SpendByCustomer.run()
# Enable boto logging for EMR debug
import logging
logging.basicConfig(filename='boto.log',level=logging.INFO)
# Settings for running at the cluster
import os
os.environ['HADOOP_HOME']='/opt/cloudera/parcels/CDH'
os.environ['HADOOP_MAPRED_HOME']='/opt/cloudera/parcels/CDH-4.6.0-1.cdh4.6.0.p0.26/lib/hadoop-0.20-mapreduce'
import json
# Set up begin and end time
begin = 1293840001000
end = 1373673599000
class MRSampleRevisions(MRJob):
INTERNAL_PROTOCOL = RawValueProtocol
OUTPUT_PROTOCOL = RawValueProtocol
def mapper(self, pid, line):
obj = json.loads(line)
timestamp = long(obj['timestamp'])
if timestamp >= begin and timestamp < end:
yield (None,line)
def reducer(self, pid, lines):
for line in lines:
yield (None,line)
if __name__ == '__main__':
MRSampleRevisions.run()
option_parser.add_option('-A', '--cat-all', dest='cat_all',
action="store_true", default=False,
help='Cat all log files to JOB_FLOW_ID/')
option_parser.add_option('-s', '--step-num', dest='step_num',
action='store', type='int', default=None,
help=('Limit results to a single step. To be used'
' with --list and --cat.'))
option_parser.add_option('--counters', dest='get_counters',
action='store_true', default=False,
help='Show counters from the job flow')
add_emr_connect_opts(option_parser)
scrape_options_into_new_groups(MRJob().all_option_groups(), {
option_parser: ('ec2_key_pair_file', 's3_sync_wait_time', 'ssh_bin')
})
alphabetize_options(option_parser)
return option_parser
yield (cst.surt_domain.value, self.surt_domain, crawl), counts
hostDomainCount = HostDomainCount()
for url, count in self.url.items():
hostDomainCount.add(url, count)
if exact_count:
yield (cst.url.value, self.surt_domain, url), (crawl, count)
if exact_count:
for digest, counts in self.digest.items():
yield (cst.digest.value, digest), (crawl, counts)
for mime, counts in self.mime.items():
yield (cst.mimetype.value, mime, crawl), counts
for key, val in hostDomainCount.output(crawl):
yield key, val
class CCStatsCountJob(MRJob):
INPUT_PROTOCOL = RawValueProtocol
OUTPUT_PROTOCOL = JSONProtocol
HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
JOBCONF = {
'mapreduce.task.timeout': '9600000',
'mapreduce.map.speculative': 'false',
'mapreduce.reduce.speculative': 'false',
'mapreduce.job.jvm.numtasks': '-1',
}
s3pattern = re.compile('^s3://([^/]+)/(.+)')
gzpattern = re.compile('\.gz$')
crawlpattern = re.compile('(CC-MAIN-2\d{3}-\d{2})')
from mrjob.protocol import RawValueProtocol
import re
import logging
logging.basicConfig(filename='boto.log',level=logging.DEBUG)
# set up classpath
import os
os.environ['HADOOP_HOME']='/opt/cloudera/parcels/CDH'
os.environ['HADOOP_MAPRED_HOME']='/opt/cloudera/parcels/CDH-4.6.0-1.cdh4.6.0.p0.26/lib/hadoop-0.20-mapreduce'
import json
class MRTitle2Id(MRJob):
OUTPUT_PROTOCOL = RawValueProtocol
def mapper(self, pid, line):
obj = json.loads(line)
pid = int(obj['page_id'])
title = obj['page_title']
# yield (pid, line)
yield (pid,title)
def combiner(self, pid, title):
d = set()
for t in title:
if not t in d:
d.add(t)
yield (pid,t)