Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _test_explicit(self, m=False, c=False, r=False, **kwargs):
s = MRStep(**kwargs)
self.assertEqual(s.has_explicit_mapper, m)
self.assertEqual(s.has_explicit_combiner, c)
self.assertEqual(s.has_explicit_reducer, r)
def test_mapper_combiner(self):
self._assert_script_protocols(
[MRStep(mapper=self._yield_none,
combiner=self._yield_none)],
[dict(mapper=(PickleProtocol, JSONValueProtocol),
combiner=(JSONValueProtocol, JSONValueProtocol))])
def test_render_reducer_pre_filter(self):
self.assertEqual(
MRStep(
reducer=identity_reducer,
reducer_pre_filter='cat').description(1),
{
'type': 'streaming',
'reducer': {
'type': 'script',
'pre_filter': 'cat',
},
def steps(self):
return [
MRStep(mapper=self.mapper, reducer=self.reducer),
MRStep(mapper=self.mapper2)
]
def test_mapper_combiner_reducer(self):
self._assert_script_protocols(
[MRStep(
mapper=self._yield_none,
combiner=self._yield_none,
reducer=self._yield_none)],
[dict(mapper=(PickleProtocol, JSONProtocol),
combiner=(JSONProtocol, JSONProtocol),
reducer=(JSONProtocol, JSONValueProtocol))])
def steps(self):
return [
MRStep(mapper=self.mapper),
MRStep(mapper=self.mapper, jobconf=dict(foo='bar')),
MRStep(mapper=self.mapper, jobconf=dict(foo='bar')),
MRStep(mapper=self.mapper, jobconf=dict(foo='baz')),
]
def _combine_or_reduce_pairs(self, pairs, mrc, step_num=0):
"""Helper for :py:meth:`combine_pairs` and :py:meth:`reduce_pairs`."""
step = self._get_step(step_num, MRStep)
task = step[mrc]
task_init = step[mrc + '_init']
task_final = step[mrc + '_final']
if task is None:
raise ValueError('No %s in step %d' % (mrc, step_num))
if task_init:
for k, v in task_init() or ():
yield k, v
# group all values of the same key together, and pass to the reducer
#
# be careful to use generators for everything, to allow for
# very large groupings of values
for key, pairs_for_key in itertools.groupby(pairs, lambda k_v: k_v[0]):
def steps(self):
return [MRStep(mapper_init = self.mapper_gmm_init,
mapper = self.mapper_gmm,
mapper_final = self.mapper_final_gmm,
reducer = self.reducer_gmm)]
def steps(self):
return [
MRStep(mapper=self.get_movies_rating,
reducer=self.reducer_movie_rating),
MRStep(reducer=self.reducer_output)
]
def steps(self):
return [
MRStep(mapper=self.mapper_count_friends_per_line,
reducer=self.reducer_combine_friends),
MRStep(mapper=self.mapper_prep_for_sort,
mapper_init=self.load_name_dictionary,
reducer=self.reducer_find_max_friends)
]