Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _find_process_name(port_number):
"""
Get the name of the process using the given port number.
"""
for connection in psutil.net_connections():
if connection.laddr[1] == port_number:
return psutil.Process(connection.pid).name()
return None
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
watchdog.register_spawn(self.debuggee_process.pid, self.session.debuggee_id)
self.session.captured_output.capture(self.debuggee_process)
pid = self.debuggee_process.pid
if self.method == "attach_pid":
self._attach_args["processId"] = pid
else:
log.info(
"Waiting for {0} to open listener socket...", self.session.debuggee_id
)
for i in range(0, 100):
connections = psutil.net_connections()
if any(p == pid for (_, _, _, _, _, _, p) in connections):
break
time.sleep(0.1)
else:
log.warning("Couldn't detect open listener socket; proceeding anyway.")
self._attach_request = self.session.send_request("attach", self._attach_args)
self.session.wait_for_next_event("initialized")
account any externally used ports such that no occupied
port is accidentally used. This is generally recommended.
Arguments:
start (int, optional): Port from which to start
looking, defaults to 6001
"""
print("Finding available port..")
occupied_ports = list(self.app.clients)
try:
import psutil
occupied_ports += list(
c.laddr[-1] for c in psutil.net_connections())
except ImportError:
pass
available = start
while available in occupied_ports:
available += 1
print("Distributing new port %i" % available)
return available
def _cx_state_psutil(self, tags=None):
"""
Collect metrics about connections state using psutil
"""
metrics = defaultdict(int)
tags = [] if tags is None else tags
for conn in psutil.net_connections():
protocol = self._parse_protocol_psutil(conn)
status = self.tcp_states['psutil'].get(conn.status)
metric = self.cx_state_gauge.get((protocol, status))
if metric is None:
self.log.warning('Metric not found for: %s,%s', protocol, status)
else:
metrics[metric] += 1
for metric, value in iteritems(metrics):
self.gauge(metric, value, tags=tags)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = 'ipetrash'
# pip install psutil
import psutil
net_connections = psutil.net_connections()
print('Net connections ({}):'.format(len(net_connections)))
# Sort by pid
net_connections.sort(key=lambda x: x.pid)
# for connect in net_connections:
# print(connect)
# #
# # OR:
# #
if net_connections:
headers = net_connections[0]._fields
headers = [header.upper() for header in headers]
rows = [tuple(info) for info in net_connections]
def network_connections(metrics):
protocols = ['tcp']
for protocol in protocols:
for c in ps.net_connections(kind=protocol):
try:
if c.status == "ESTABLISHED" or c.status == "BOUND":
metrics.add_network_connection(c.raddr.ip, c.raddr.port,
Collector.__get_interface_name(c.laddr.ip),
c.laddr.port)
except Exception as ex:
print('Failed to parse network info for protocol: ' + protocol)
print(ex)
def checkMsfrpcdRunning():
for socket in psutil.net_connections():
if socket.laddr[1] == 55553: return socket.pid
# addr = namedtuple('addr', ['ip', 'port'])
# from psutil._common import addr
statistics_file = "statistics.txt"
port = 993
max_records = 1000 # max records size
wanted_res_list = list()
remote_addr_set = set()
try:
while 1:
if len(wanted_res_list) >= max_records:
break
now = time.time()
sconn_list = psutil.net_connections(kind='tcp')
# my_sconn_list = [sconn for sconn in sconn_list if
# isinstance(sconn.raddr,
# addr) and sconn.raddr.port == port and sconn.status == 'ESTABLISHED']
# see: https://github.com/giampaolo/psutil/issues/1513
my_sconn_list = [sconn for sconn in sconn_list if sconn.status == 'ESTABLISHED' and sconn.raddr.port == port]
for my_sconn in my_sconn_list:
ip = my_sconn.raddr.ip
if ip not in remote_addr_set:
remote_addr_set.add(ip)
wanted_res_list.append((now, my_sconn))
print wanted_res_list
with open(statistics_file, 'w') as fp:
def already_listening(self, port): # pylint: disable=no-self-use
"""Check if a process is already listening on the port.
If so, also tell the user via a display notification.
.. warning::
On some operating systems, this function can only usefully be
run as root.
:param int port: The TCP port in question.
:returns: True or False."""
try:
net_connections = psutil.net_connections()
except psutil.AccessDenied as error:
logger.info("Access denied when trying to list network "
"connections: %s. Are you root?", error)
# this function is just a pre-check that often causes false
# positives and problems in testing (c.f. #680 on Mac, #255
# generally); we will fail later in bind() anyway
return False
listeners = [conn.pid for conn in net_connections
if conn.status == 'LISTEN' and
conn.type == socket.SOCK_STREAM and
conn.laddr[1] == port]
try:
if listeners and listeners[0] is not None:
# conn.pid may be None if the current process doesn't have