How to use the traitlets.Any function in traitlets

To help you get started, we’ve selected a few traitlets examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github jupyter / notebook / notebook / services / kernels / kernelmanager.py View on Github external
"""
    )

    kernel_info_timeout = Float(60, config=True,
        help="""Timeout for giving up on a kernel (in seconds).

        On starting and restarting kernels, we check whether the
        kernel is running and responsive by sending kernel_info_requests.
        This sets the timeout in seconds for how long the kernel can take
        before being presumed dead.
        This affects the MappingKernelManager (which handles kernel restarts)
        and the ZMQChannelsHandler (which handles the startup).
        """
    )

    _kernel_buffers = Any()
    @default('_kernel_buffers')
    def _default_kernel_buffers(self):
        return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}})

    last_kernel_activity = Instance(datetime,
        help="The last activity on any kernel, including shutting down a kernel")

    def __init__(self, **kwargs):
        super(MappingKernelManager, self).__init__(**kwargs)
        self.last_kernel_activity = utcnow()

    allowed_message_types = List(trait=Unicode(), config=True,
        help="""White list of allowed kernel message types.
        When the list is empty, all message types are allowed.
        """
    )
github ipython / ipykernel / ipykernel / ipkernel.py View on Github external
except ImportError:
    _use_experimental_60_completion = False

_EXPERIMENTAL_KEY_NAME = '_jupyter_types_experimental'


class IPythonKernel(KernelBase):
    shell = Instance('IPython.core.interactiveshell.InteractiveShellABC',
                     allow_none=True)
    shell_class = Type(ZMQInteractiveShell)

    use_experimental_completions = Bool(True,
        help="Set this flag to False to deactivate the use of experimental IPython completion APIs.",
    ).tag(config=True)

    user_module = Any()
    def _user_module_changed(self, name, old, new):
        if self.shell is not None:
            self.shell.user_module = new

    user_ns = Instance(dict, args=None, allow_none=True)
    def _user_ns_changed(self, name, old, new):
        if self.shell is not None:
            self.shell.user_ns = new
            self.shell.init_user_ns()

    # A reference to the Python builtin 'raw_input' function.
    # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
    _sys_raw_input = Any()
    _sys_eval_input = Any()

    def __init__(self, **kwargs):
github takluyver / jupyter_kernel_mgmt / jupyter_client / client.py View on Github external
"""

    # The PyZMQ Context to use for communication with the kernel.
    context = Instance(zmq.Context)
    def _context_default(self):
        return zmq.Context.instance()

    # The classes to use for the various channels
    shell_channel_class = Type(ChannelABC)
    iopub_channel_class = Type(ChannelABC)
    stdin_channel_class = Type(ChannelABC)
    hb_channel_class = Type(HBChannelABC)

    # Protected traits
    _shell_channel = Any()
    _iopub_channel = Any()
    _stdin_channel = Any()
    _hb_channel = Any()

    # flag for whether execute requests should be allowed to call raw_input:
    allow_stdin = True

    #--------------------------------------------------------------------------
    # Channel proxy methods
    #--------------------------------------------------------------------------

    def get_shell_msg(self, *args, **kwargs):
        """Get a message from the shell channel"""
        return self.shell_channel.get_msg(*args, **kwargs)

    def get_iopub_msg(self, *args, **kwargs):
        """Get a message from the iopub channel"""
github altair-viz / altair / altair / schema / _interface / data.py View on Github external
class Data(BaseObject):
    """Wrapper for Vega-Lite Data definition.
    
    Attributes
    ----------
    format: DataFormat
        An object that specifies the format for the data file or values.
    url: Unicode
        A URL from which to load the data set.
    values: List(Any)
        Pass array of objects instead of a url to a file.
    """
    format = T.Instance(DataFormat, allow_none=True, default_value=None, help="""An object that specifies the format for the data file or values.""")
    url = T.Unicode(allow_none=True, default_value=None, help="""A URL from which to load the data set.""")
    values = T.List(T.Any(), allow_none=True, default_value=None, help="""Pass array of objects instead of a url to a file.""")
    
    def __init__(self, format=None, url=None, values=None, **kwargs):
        kwds = dict(format=format, url=url, values=values)
        kwargs.update({k:v for k, v in kwds.items() if v is not None})
        super(Data, self).__init__(**kwargs)
github jupyterhub / jupyterhub / jupyterhub / proxy.py View on Github external
In addition to these, the following method(s) may need to be implemented:

    - :meth:`.start` start the proxy, if it should be launched by the Hub
      instead of externally managed.
      If the proxy is externally managed, it should set :attr:`should_start` to False.
    - :meth:`.stop` stop the proxy. Only used if :meth:`.start` is also used.

    And the following method(s) are optional, but can be provided:

    - :meth:`.get_route` gets a single route.
      There is a default implementation that extracts data from :meth:`.get_all_routes`,
      but implementations may choose to provide a more efficient implementation
      of fetching a single route.
    """

    db_factory = Any()

    @property
    def db(self):
        return self.db_factory()

    app = Any()
    hub = Any()
    public_url = Unicode()
    ssl_key = Unicode()
    ssl_cert = Unicode()
    host_routing = Bool()

    should_start = Bool(
        True,
        config=True,
        help="""Should the Hub start the proxy
github jupyter-widgets / ipywidgets / ipywidgets / widgets / widget_selection.py View on Github external
Parameters
    ----------
    {multiple_selection_params}

    rows: int
        The number of rows to display in the widget.
    """
    _view_name = Unicode('SelectMultipleView').tag(sync=True)
    _model_name = Unicode('SelectMultipleModel').tag(sync=True)
    rows = Int(5, help="The number of rows to display.").tag(sync=True)


class _SelectionNonempty(_Selection):
    """Selection that is guaranteed to have a value selected."""
    # don't allow None to be an option.
    value = Any(help="Selected value")
    label = Unicode(help="Selected label")
    index = Int(help="Selected index").tag(sync=True)

    def __init__(self, *args, **kwargs):
        if len(kwargs.get('options', ())) == 0:
            raise TraitError('options must be nonempty')
        super(_SelectionNonempty, self).__init__(*args, **kwargs)

    @validate('options')
    def _validate_options(self, proposal):
        if isinstance(proposal.value, Iterable) and not isinstance(proposal.value, Mapping):
            proposal.value = tuple(proposal.value)
        self._options_full = _make_options(proposal.value)
        if len(self._options_full) == 0:
            raise TraitError("Option list must be nonempty")
        return proposal.value
github jupyterhub / oauthenticator / oauthenticator / mediawiki.py View on Github external
mw_index_url = Unicode(
        os.environ.get('MW_INDEX_URL', 'https://meta.wikimedia.org/w/index.php'),
        config=True,
        help='Full path to index.php of the MW instance to use to log in'
    )

    executor_threads = Integer(12,
        help="""Number of executor threads.

        MediaWiki OAuth requests happen in this thread,
        so it is mostly waiting for network replies.
        """,
        config=True,
    )
    executor = Any()

    def normalize_username(self, username):
        """
        Override normalize_username to avoid lowercasing usernames
        """
        return username

    def _executor_default(self):
        return ThreadPoolExecutor(self.executor_threads)

    async def authenticate(self, handler, data=None):
        consumer_token = ConsumerToken(
            self.client_id,
            self.client_secret,
        )
github ipython / ipyparallel / ipyparallel / apps / launcher.py View on Github external
class BaseLauncher(LoggingConfigurable):
    """An asbtraction for starting, stopping and signaling a process."""

    # In all of the launchers, the work_dir is where child processes will be
    # run. This will usually be the profile_dir, but may not be. any work_dir
    # passed into the __init__ method will override the config value.
    # This should not be used to set the work_dir for the actual engine
    # and controller. Instead, use their own config files or the
    # controller_args, engine_args attributes of the launchers to add
    # the work_dir option.
    work_dir = Unicode(u'.')
    loop = Instance('tornado.ioloop.IOLoop', allow_none=True)

    start_data = Any()
    stop_data = Any()

    def _loop_default(self):
        return ioloop.IOLoop.current()

    def __init__(self, work_dir=u'.', config=None, **kwargs):
        super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
        self.state = 'before'  # can be before, running, after
        self.stop_callbacks = []

    @property
    def args(self):
        """A list of cmd and args that will be used to start the process.

        This is what is passed to :func:`spawnProcess` and the first element
        will be the process name.
        """
github ipython / ipykernel / ipykernel / kernelbase.py View on Github external
# attribute to override with a GUI
    eventloop = Any(None)

    @observe('eventloop')
    def _update_eventloop(self, change):
        """schedule call to eventloop from IOLoop"""
        loop = ioloop.IOLoop.current()
        if change.new is not None:
            loop.add_callback(self.enter_eventloop)

    session = Instance(Session, allow_none=True)
    profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True)
    shell_streams = List()
    control_stream = Instance(ZMQStream, allow_none=True)
    iopub_socket = Any()
    iopub_thread = Any()
    stdin_socket = Any()
    log = Instance(logging.Logger, allow_none=True)

    # identities:
    int_id = Integer(-1)
    ident = Unicode()

    @default('ident')
    def _default_ident(self):
        return unicode_type(uuid.uuid4())

    # This should be overridden by wrapper kernels that implement any real
    # language.
    language_info = {}
github altair-viz / altair / altair / vegalite / v1 / schema / _interface / jstraitlets.py View on Github external
def _get_additional_traits(cls):
        try:
            default = cls._additional_traits[0]
        except TypeError:
            default = cls._additional_traits

        if isinstance(default, T.TraitType):
            return default
        elif default:
            return T.Any()
        else:
            return None