Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# see `importcontent` management command for explanation of how we're using subparsers
subparsers = parser.add_subparsers(dest='command', help="The following subcommands are available.")
network_subparser = subparsers.add_parser(
name='network',
cmd=self,
help="Download the given channel through the network."
)
network_subparser.add_argument(
'channel_id',
type=str,
help="Download the database for the given channel_id."
)
default_studio_url = conf.OPTIONS['Urls']['CENTRAL_CONTENT_BASE_URL']
network_subparser.add_argument(
"--baseurl",
type=str,
default=default_studio_url,
help="The host we will download the content from. Defaults to {}".format(default_studio_url),
)
local_subparser = subparsers.add_parser(
name='disk',
cmd=self,
help='Copy the content from the given folder.'
)
local_subparser.add_argument(
'channel_id',
type=str,
help="Import this channel id from the given directory."
def check_start_conditions(self):
"""
Do the needed checks to enable the Middleware if possible
"""
if MetricsMiddleware.disabled and conf.OPTIONS["Server"]["PROFILE"]:
if os.path.exists(PROFILE_LOCK):
try:
with open(PROFILE_LOCK, "r") as f:
MetricsMiddleware.command_pid = int(f.readline())
file_timestamp = f.readline()
if SUPPORTED_OS:
MetricsMiddleware.disabled = False
self.requests_profiling_file = os.path.join(
conf.KOLIBRI_HOME,
"performance",
"{}_requests_performance.csv".format(file_timestamp),
)
with open(
self.requests_profiling_file, mode="a"
) as profile_file:
profile_writer = csv.writer(
subparsers = parser.add_subparsers(
dest="command", help="The following subcommands are available."
)
network_subparser = subparsers.add_parser(
name="network",
cmd=self,
help="Download the given channel through the network.",
)
network_subparser.add_argument(
"channel_id",
type=str,
help="Download the database for the given channel_id.",
)
default_studio_url = conf.OPTIONS["Urls"]["CENTRAL_CONTENT_BASE_URL"]
network_subparser.add_argument(
"--baseurl",
type=str,
default=default_studio_url,
help="The host we will download the content from. Defaults to {}".format(
default_studio_url
),
)
network_subparser.add_argument(
"--no_upgrade",
action="store_true",
help="Only download database to an upgrade file path.",
)
local_subparser = subparsers.add_parser(
name="disk", cmd=self, help="Copy the content from the given folder."
from ..utils import bytes_for_humans
from ..utils import create_superuser_and_provision_device
from ..utils import get_baseurl
from ..utils import get_client_and_server_certs
from ..utils import get_dataset_id
from kolibri.core.auth.constants.morango_sync import PROFILE_FACILITY_DATA
from kolibri.core.auth.constants.morango_sync import ScopeDefinitions
from kolibri.core.auth.constants.morango_sync import State
from kolibri.core.auth.management.utils import get_facility
from kolibri.core.auth.management.utils import run_once
from kolibri.core.auth.models import dataset_cache
from kolibri.core.tasks.management.commands.base import AsyncCommand
from kolibri.core.tasks.utils import db_task_write_lock
from kolibri.utils import conf
DATA_PORTAL_SYNCING_BASE_URL = conf.OPTIONS["Urls"]["DATA_PORTAL_SYNCING_BASE_URL"]
logger = logging.getLogger(__name__)
class Command(AsyncCommand):
help = "Allow the syncing of facility data with Kolibri Data Portal or another Kolibri device."
def add_arguments(self, parser):
parser.add_argument(
"--facility", action="store", type=str, help="ID of facility to sync"
)
parser.add_argument(
"--baseurl", type=str, default=DATA_PORTAL_SYNCING_BASE_URL, dest="baseurl"
)
parser.add_argument("--noninteractive", action="store_true")
def get_content_dir_path(datafolder=None):
return (
os.path.join(datafolder, "content")
if datafolder
else conf.OPTIONS["Paths"]["CONTENT_DIR"]
)
def get_content_dir_path(datafolder=None):
return os.path.join(
datafolder,
"content",
) if datafolder else conf.OPTIONS["Paths"]["CONTENT_DIR"]
def get_filepath(self, options):
if options["output_file"] is None:
temp_dir = os.path.join(conf.KOLIBRI_HOME, "temp")
if not os.path.isdir(temp_dir):
os.mkdir(temp_dir)
filepath = mkstemp(suffix=".download", dir=temp_dir)[1]
else:
filepath = os.path.join(os.getcwd(), options["output_file"])
return filepath
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy.pool import NullPool
from kolibri.core.sqlite.utils import repair_sqlite_db
from kolibri.core.tasks.queue import Queue
from kolibri.core.tasks.scheduler import Scheduler
from kolibri.core.tasks.worker import Worker
from kolibri.utils import conf
logger = logging.getLogger(__name__)
if conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "sqlite":
def __create_engine():
return create_engine(
"sqlite:///{path}".format(
path=os.path.join(conf.KOLIBRI_HOME, "job_storage.sqlite3")
),
connect_args={"check_same_thread": False},
poolclass=NullPool,
)
elif conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "postgres":
def __create_engine():
return create_engine(
"postgresql://{user}:{password}@{host}{port}/{name}".format(