Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_start_conditions(self):
if not SUPPORTED_OS:
print("This OS is not yet supported")
sys.exit(1)
if not conf.OPTIONS["Server"]["PROFILE"]:
print("Kolibri has not enabled profiling of its requests."
"To enable it, edit the Kolibri options.ini file and "
"add `PROFILE = true` in the [Server] section")
if os.path.exists(PROFILE_LOCK):
command_pid = None
try:
with open(PROFILE_LOCK, 'r') as f:
command_pid = int(f.readline())
except (IOError, TypeError, ValueError):
remove_lock()
if command_pid:
if pid_exists(command_pid):
print("Profile command is already running")
sys.exit(1)
else:
def validate_token(self, request):
PORTAL_URL = conf.OPTIONS["Urls"]["DATA_PORTAL_SYNCING_BASE_URL"]
# token is in query params
response = requests.get(
urljoin(PORTAL_URL, "portal/api/public/v1/registerfacility/validate_token"),
params=request.query_params,
)
# handle any invalid json type responses
try:
data = response.json()
except ValueError:
data = response.content
return Response(data, status=response.status_code)
from kolibri.utils.conf import OPTIONS
INSTALLED_APPS = ["oidc_provider"]
OIDC_LOGIN_URL = "/user/#/signin/"
OIDC_USERINFO = "kolibri.plugins.oidc_provider_plugin.kolibri_userinfo"
# for some special purposes, let's break rules and let's not ask for consent:
if OPTIONS["OIDCProvider"]["REQUIRE_CONSENT"]:
OIDC_TEMPLATES = {
"authorize": "oidc_provider/authorize.html",
"error": "oidc_provider/error.html",
}
else:
OIDC_TEMPLATES = {
"authorize": "oidc_provider/authorize_without_consent.html",
"error": "oidc_provider/error.html",
}
# TODO: implement a --content-domain parameter, for optionally
# specifying the domain for the curation server.
# Note: cmd should be the management command instance, as though the
# interface for adding arguments is argparse, Django overrides the
# parser object with its own thing, hence why we need to add cmd. See
# http://stackoverflow.com/questions/36706220/is-it-possible-to-create-subparsers-in-a-django-management-command
network_subparser = subparsers.add_parser(
name='network',
cmd=self,
help="Download the given channel through the network.",
)
network_subparser.add_argument('channel_id', type=str)
default_studio_url = conf.OPTIONS['Urls']['CENTRAL_CONTENT_BASE_URL']
network_subparser.add_argument(
"--baseurl",
type=str,
default=default_studio_url,
dest="baseurl",
)
disk_subparser = subparsers.add_parser(
name='disk',
cmd=self,
help='Copy the content from the given folder.'
)
disk_subparser.add_argument('channel_id', type=str)
disk_subparser.add_argument('directory', type=str)
default=OPTIONS["Deployment"]["HTTP_PORT"],
type=int,
help="Port on which to run Kolibri services",
)
@click.option(
"--background/--foreground",
default=True,
help="Run Kolibri services as a background task",
)
def services(port, background):
"""
Start the kolibri background services.
"""
create_startup_lock(None)
recreate_cache()
app = "kolibri"
if conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "sqlite":
connection = create_engine(
"sqlite:///{path}".format(
path=os.path.join(conf.KOLIBRI_HOME, "job_storage.sqlite3")
),
connect_args={"check_same_thread": False},
poolclass=NullPool,
)
elif conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "postgres":
connection = create_engine(
"postgresql://{user}:{password}@{host}:{port}/{name}".format(
name=conf.OPTIONS["Database"]["DATABASE_NAME"],
password=conf.OPTIONS["Database"]["DATABASE_PASSWORD"],
user=conf.OPTIONS["Database"]["DATABASE_USER"],
host=conf.OPTIONS["Database"]["DATABASE_HOST"],
port=conf.OPTIONS["Database"]["DATABASE_PORT"],
)
)
# Add multiprocessing safeguards as recommended by
# https://docs.sqlalchemy.org/en/13/core/pooling.html#using-connection-pools-with-multiprocessing
@event.listens_for(connection, "connect")
def connect(dbapi_connection, connection_record):
connection_record.info["pid"] = os.getpid()
from django.core.cache import caches
from kolibri.utils.conf import OPTIONS
cache_options = OPTIONS["Cache"]
NOTHING = object()
class CrossProcessCache(object):
def __init__(self, default_timeout=cache_options["CACHE_TIMEOUT"]):
self.default_timeout = default_timeout
def __contains__(self, key):
if key in caches["default"]:
return True
if cache_options["CACHE_BACKEND"] != "redis" and key in caches["process_cache"]:
return True
return False
def get(self, key, default=None, version=None):
if language is not None:
# Only activate translation if there is a language code returned.
translation.activate(language)
request.LANGUAGE_CODE = translation.get_language()
response = self.get_response(request)
if language is not None:
language = translation.get_language()
if response.status_code == 404 and not language_from_path:
# Maybe the language code is missing in the URL? Try adding the
# language prefix and redirecting to that URL.
# First get any global prefix that is being used.
script_prefix = OPTIONS["Deployment"]["URL_PATH_PREFIX"]
# Replace the global prefix with the global prefix and the language prefix.
language_path = request.path_info.replace(
script_prefix, "%s%s/" % (script_prefix, language), 1
)
# Get the urlconf from the request, default to the global settings ROOT_URLCONF
urlconf = getattr(request, "urlconf", settings.ROOT_URLCONF)
# Check if this is a valid path
path_valid = is_valid_path(language_path, urlconf)
# Check if the path is only invalid because it is missing a trailing slash
path_needs_slash = not path_valid and (
settings.APPEND_SLASH
and not language_path.endswith("/")
and is_valid_path("%s/" % language_path, urlconf)
)
# If the constructed path is valid, or it would be valid with a trailing slash
url = urljoin(server, "/api/v1/pingback")
instance, _ = InstanceIDModel.get_or_create_current_instance()
language = get_device_setting("language_id", "")
try:
timezone = get_current_timezone().zone
except Exception:
timezone = ""
data = {
"instance_id": instance.id,
"version": kolibri.__version__,
"mode": conf.OPTIONS["Deployment"]["RUN_MODE"],
"platform": instance.platform,
"sysversion": instance.sysversion,
"database_id": instance.database.id,
"system_id": instance.system_id,
"node_id": instance.node_id,
"language": language,
"timezone": timezone,
"uptime": int((local_now() - started).total_seconds() / 60),
"timestamp": localtime(),
"installer": installation_type(),
}
logger.debug("Pingback data: {}".format(data))
jsondata = dump_zipped_json(data)
response = requests.post(url, data=jsondata, timeout=60)
response.raise_for_status()
__tablename__ = "scheduledjobs"
# The hex UUID given to the job upon first creation
id = Column(String, primary_key=True, autoincrement=False)
# Repeat interval in seconds.
interval = Column(Integer, default=0)
# Number of times to repeat - None means repeat forever.
repeat = Column(Integer, nullable=True)
# The app name passed to the client when the job is scheduled.
queue = Column(String, index=True)
# The original Job object, pickled here for so we can easily access it.
obj = Column(PickleType(protocol=OPTIONS["Python"]["PICKLE_PROTOCOL"]))
scheduled_time = Column(DateTime())
__table_args__ = (Index("queue__scheduled_time", "queue", "scheduled_time"),)
class Scheduler(StorageMixin):
def __init__(self, queue=None, connection=None):
if connection is None and not isinstance(queue, Queue):
raise ValueError("One of either connection or queue must be specified")
elif isinstance(queue, Queue):
self.queue = queue
if connection is None:
connection = self.queue.storage.engine
elif connection:
try: