Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# must support older kernels. Ask for the PTY number.
pty_num_s = fcntl.ioctl(master_fd, LINUX_TIOCGPTN,
struct.pack('i', 0))
pty_num, = struct.unpack('i', pty_num_s)
pty_name = '/dev/pts/%d' % (pty_num,)
# Now open it with O_NOCTTY to ensure it doesn't change our controlling
# TTY. Otherwise when we close the FD we get killed by the kernel, and
# the child we spawn that should really attach to it will get EPERM
# during _acquire_controlling_tty().
slave_fd = os.open(pty_name, os.O_RDWR|os.O_NOCTTY)
return master_fd, slave_fd
except OSError:
if master_fd is not None:
os.close(master_fd)
e = sys.exc_info()[1]
raise mitogen.core.StreamError(OPENPTY_MSG, e)
def parse_sudo_flags(args):
parser = make_sudo_parser()
opts, args = parser.parse_args(args)
if len(args):
raise mitogen.core.StreamError('unsupported sudo arguments:'+str(args))
return opts
obj = self._unpickled
if obj is Message._unpickled:
fp = BytesIO(self.data)
unpickler = _Unpickler(fp, **self.UNPICKLER_KWARGS)
unpickler.find_global = self._find_global
try:
# Must occur off the broker thread.
try:
obj = unpickler.load()
except:
LOG.error('raw pickle was: %r', self.data)
raise
self._unpickled = obj
except (TypeError, ValueError):
e = sys.exc_info()[1]
raise StreamError('invalid message: %s', e)
if throw:
if isinstance(obj, CallError):
raise obj
return obj
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
resp = via_context.call(_proxy_connect,
name=name,
method_name=method_name,
kwargs=mitogen.core.Kwargs(kwargs),
)
if resp['msg'] is not None:
raise mitogen.core.StreamError(resp['msg'])
name = u'%s.%s' % (via_context.name, resp['name'])
context = self.context_class(self, resp['id'], name=name)
context.via = via_context
self._write_lock.acquire()
try:
self._context_by_id[context.context_id] = context
finally:
self._write_lock.release()
return context
return
match = PASSWORD_PROMPT_RE.search(buf.decode('utf-8').lower())
if match is not None:
LOG.debug('%s: matched password prompt %r',
self.name, match.group(0))
if self.password is None:
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
self.diag_stream.transmit_side.write(
(mitogen.core.to_text(self.password) + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')
return
if any(s in buf.lower() for s in self.incorrect_prompts):
if password_sent:
raise PasswordError(self.password_incorrect_msg)
elif self.password_prompt in buf.lower():
if self.password is None:
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
import logging
import mitogen.core
import mitogen.parent
from mitogen.core import b
LOG = logging.getLogger(__name__)
class PasswordError(mitogen.core.StreamError):
pass
class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
username = 'root'
password = None
doas_path = 'doas'
password_prompt = b('Password:')
incorrect_prompts = (
b('doas: authentication failed'),
)
def construct(self, username=None, password=None, doas_path=None,
raise PasswordError(self.password_incorrect_msg)
elif PASSWORD_PROMPT in buf and self.password is None:
# Permission denied (password,pubkey)
raise PasswordError(self.password_required_msg)
else:
raise PasswordError(self.auth_incorrect_msg)
elif partial and PASSWORD_PROMPT in buf.lower():
if self.password is None:
raise PasswordError(self.password_required_msg)
LOG.debug('%s: sending password', self.name)
self.diag_stream.transmit_side.write(
(self.password + '\n').encode()
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')