Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_range(self):
node = colander.SchemaNode(colander.Integer(),
validator=colander.Range(111, 555))
ret = convert(node)
self.assertDictEqual(ret, {
'type': 'integer',
'minimum': 111,
'maximum': 555,
})
# set_newparticle_values()
self.array_types = dict()
super(InitBaseClass, self).__init__(*args,**kwargs)
def initialize(self, num_new_particles, spill, data_arrays, substance):
"""
all classes that derive from Base class must implement initialize
method
"""
pass
class WindageSchema(TupleSchema):
min_windage = SchemaNode(Float(), validator=Range(0, 1.0),
default=0.01)
max_windage = SchemaNode(Float(), validator=Range(0, 1.0),
default=0.04)
class InitWindagesSchema(base_schema.ObjTypeSchema):
"""
windages initializer values
"""
windage_range = WindageSchema(
save=True, update=True,
)
windage_persist = SchemaNode(
Int(), default=900, save=True, update=True,
)
class InitWindages(InitBaseClass):
@colander.deferred
def deferred_date_validator(node, kw):
max_date = kw.get('max_date')
if max_date is None:
max_date = datetime.date.today()
return colander.Range(min=datetime.date.min, max=max_date)
practicalrelevance_meta = sheet_meta._replace(
isheet=IPracticalRelevance,
schema_class=PracticalRelevanceSchema,
)
class IFinancialPlanning(ISheet):
"""Marker interface for the financial planning."""
class FinancialPlanningSchema(colander.MappingSchema):
budget = CurrencyAmount(missing=colander.required)
requested_funding = CurrencyAmount(
missing=colander.required,
validator=colander.Range(min=1, max=50000))
major_expenses = Text(missing=colander.required)
class IExtraFunding(ISheet):
"""Marker interface for the 'other sources of income' fie."""
class ExtraFundingSchema(colander.MappingSchema):
other_sources = Text(missing='')
secured = Boolean(default=False)
extra_funding_meta = sheet_meta._replace(
isheet=IExtraFunding,
schema_class=ExtraFundingSchema,
permission_view='view_mercator2_extra_funding',
def _addMetabolizeSchema(self, schema):
metabolism_type = colander.SchemaNode(colander.String(),
validator=colander.OneOf(['phase1', 'phase2'])
)
schema.add(colander.SchemaNode(colander.Sequence(), metabolism_type, name='metabolism_types'))
schema.add(colander.SchemaNode(colander.Integer(), validator=colander.Range(0, 10), name='n_reaction_steps'))
"""
num_to_sample = colander.SchemaNode(
colander.Int(),
missing=1,
validator=colander.Range(min=1),
)
mc_iterations = colander.SchemaNode(
colander.Int(),
validator=colander.Range(min=1),
missing=DEFAULT_EXPECTED_IMPROVEMENT_MC_ITERATIONS,
)
max_num_threads = colander.SchemaNode(
colander.Int(),
validator=colander.Range(min=1, max=MAX_ALLOWED_NUM_THREADS),
missing=DEFAULT_MAX_NUM_THREADS,
)
gp_historical_info = GpHistoricalInfo()
domain_info = BoundedDomainInfo()
covariance_info = CovarianceInfo(
missing=CovarianceInfo().deserialize({}),
)
optimizer_info = OptimizerInfo(
missing=OptimizerInfo().deserialize({}),
)
points_being_sampled = ListOfPointsInDomain(
missing=[],
)
class GpNextPointsConstantLiarRequest(GpNextPointsRequest):
* ms_data_format
* ms_data_file
* max_ms_level
* abs_peak_cutoff
* rel_peak_cutoff
If ``has_metabolites`` is True then :meth:`~magmaweb.job.JobQuery.annotate` will be called.
"""
schema = colander.SchemaNode(colander.Mapping())
schema.add(colander.SchemaNode(colander.String(),
validator=colander.OneOf(['mzxml']),
name='ms_data_format'
))
schema.add(colander.SchemaNode(self.File(), name='ms_data_file'))
schema.add(colander.SchemaNode(colander.Integer(), validator=colander.Range(min=0), name='max_ms_level'))
schema.add(colander.SchemaNode(colander.Float(), validator=colander.Range(min=0), name='abs_peak_cutoff'))
schema.add(colander.SchemaNode(colander.Float(), validator=colander.Range(0,1), name='rel_peak_cutoff'))
if has_metabolites:
self._addAnnotateSchema(schema)
params = schema.deserialize(params)
msfile = file(os.path.join(self.dir, 'ms_data.dat'), 'w')
msf = params['ms_data_file'].file
msf.seek(0)
while 1:
data = msf.read(2 << 16)
if not data:
break
msfile.write(data)
msf.close()
msfile.close()
"""
compress:
component: gzip
config:
compress_level: 6
"""
import colander
import paste.gzipper
from wiseguy import StrictSchema
from wiseguy import WSGIComponent
class GZipSchema(StrictSchema):
compress_level = colander.SchemaNode(
colander.Int(),
validator=colander.Range(1, 9),
missing=6,
default=6,
)
GZipComponent = WSGIComponent(
schema=GZipSchema(),
factory=paste.gzipper.middleware,
)
"domain_bounds": [
{"min": 0.0, "max": 1.0},
],
},
}
"""
std_deviation_coef = colander.SchemaNode(
colander.Float(),
missing=0.0,
)
kriging_noise_variance = colander.SchemaNode(
colander.Float(),
missing=0.0,
validator=colander.Range(min=0.0),
)
class GpNextPointsKriging(GpNextPointsPrettyView):
"""Views for gp_next_points_kriging endpoints."""
_route_name = GP_NEXT_POINTS_KRIGING_ROUTE_NAME
_pretty_route_name = GP_NEXT_POINTS_KRIGING_PRETTY_ROUTE_NAME
request_schema = GpNextPointsKrigingRequest()
_pretty_default_request = GpNextPointsPrettyView._pretty_default_request.copy()
_pretty_default_request['std_deviation_coef'] = 0.0
_pretty_default_request['kriging_noise_variance'] = 0.0
def metabolize_one(self, params, has_ms_data=False):
"""Configure job query to metabolize one structure.
``params`` is a MultiDict from which the following keys are used:
* metid, metabolite identifier to metabolize
* n_reaction_steps
* metabolism_types, comma seperated string with metabolism types
If ``has_ms_data`` is True then :meth:`~magmaweb.job.JobQuery.annotate` will be called.
"""
schema = colander.SchemaNode(colander.Mapping())
schema.add(colander.SchemaNode(colander.Integer(), validator=colander.Range(min=0), name='metid'))
self._addMetabolizeSchema(schema)
if has_ms_data:
self._addAnnotateSchema(schema)
if hasattr(params, 'mixed'):
# unflatten multidict
params = params.mixed()
if 'metabolism_types' in params and isinstance(params['metabolism_types'], basestring):
params['metabolism_types'] = [params['metabolism_types']]
params = schema.deserialize(params)
script = "echo '{metid}' | {{magma}} metabolize -j - -s '{n_reaction_steps}' -m '{metabolism_types}' {{db}}"
self.script += script.format(
metid=self.escape(params['metid']),
n_reaction_steps=self.escape(params['n_reaction_steps']),
metabolism_types=self.escape(','.join(params['metabolism_types']))
)