1
0
mirror of https://github.com/gryf/wicd.git synced 2025-12-23 14:42:29 +01:00

Added support for monitoring connection status without the need for iwconfig, ifconfig, and ethtool/miitool.

Added a "Disconnect" button to each network entry, which will be visible instead of the "Connect" button for the active network.
Fixed a bug where cancelling a connection while validating authentication would leave the GUI in the connecting state forever.
This commit is contained in:
imdano
2008-03-31 14:21:43 +00:00
parent 45b7f78bd9
commit ed2d53839e
7 changed files with 418 additions and 144 deletions

View File

@@ -33,8 +33,13 @@ class WirelessInterface() -- Control a wireless network interface.
import misc
import re
import os
import wpath
import time
import socket
import fcntl
import struct
import array
# Compile the regex patterns that will be used to search the output of iwlist
# scan for info these are well tested, should work on most cards
@@ -60,6 +65,15 @@ wpa2_pattern = re.compile('(WPA2)', re.I | re.M | re.S)
auth_pattern = re.compile('.*wpa_state=(.*?)\n', re.I | re.M | re.S)
RALINK_DRIVER = 'ralink legacy'
SIOCGIWESSID = 0x8B1B
SIOCGIFADDR = 0x8915
SIOCGIWSTATS = 0x8B0F
SIOCGIFHWADDR = 0x8927
SIOCGMIIPHY = 0x8947
SIOCGETHTOOL = 0x8946
SIOCGIFFLAGS = 0x8913
SIOCGIWRANGE = 0x8B0B
SIOCGIWAP = 0x8B15
def SetDNS(dns1=None, dns2=None, dns3=None):
""" Set the DNS of the system to the specified DNS servers.
@@ -123,7 +137,10 @@ def _fast_get_wifi_interfaces():
device = re.compile('[a-z]{3,4}[0-9]')
ifnames = []
f = open('/proc/net/wireless', 'r')
try:
f = open('/proc/net/wireless', 'r')
except IOError:
return None
data = f.readlines()
f.close()
for line in data:
@@ -136,6 +153,20 @@ def _fast_get_wifi_interfaces():
return ifnames[0]
else:
return None
def get_iw_ioctl_result(iface, call):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
buff = array.array('c', '\0' * 32)
addr, length = buff.buffer_info()
arg = struct.pack('Pi', addr, length)
data = (iface + '\0' * 16)[:16] + arg
try:
result = fcntl.ioctl(s.fileno(), call, data)
except IOError:
return None
except OSError:
return None
return buff.tostring()
class Interface(object):
""" Control a network interface. """
@@ -157,6 +188,7 @@ class Interface(object):
self.IP_FOUND = False
self.flush_tool = None
self.link_detect = None
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.Check()
def SetDebugMode(self, value):
@@ -172,7 +204,18 @@ class Interface(object):
"""
self.iface = str(iface)
def _find_client_path(self, client):
paths = ['/sbin/', '/usr/sbin/', '/bin/', '/usr/bin/',
'/usr/local/sbin/', '/usr/local/bin/']
for path in paths:
if os.access("%s%s" % (path, client), os.F_OK):
return "%s%s" % (path, client)
if self.verbose:
"WARNING: No path found for %s" % (client)
return None
def _client_found(self, client):
# TODO: Don't use which anymore. Just search path manually.
output = misc.Run("which " + client)
if output and not ("no " + client) in output:
return True
@@ -188,46 +231,57 @@ class Interface(object):
"""
if self.DHCP_CLIENT:
DHCP_CLIENT = self.DHCP_CLIENT
dhcp_client = self.DHCP_CLIENT
else:
DHCP_CLIENT = None
dhcp_client = None
dhcp_path = None
dhcpclients = ["dhclient", "dhcpcd", "pump"]
for client in dhcpclients:
if self._client_found(client):
DHCP_CLIENT = client
dhcp_path = self._find_client_path(client)
if dhcp_path:
dhcp_client = client
break
if not DHCP_CLIENT:
print "WARNING: NO DHCP CLIENT DETECTED! Make sure there is one \
set in your path."
if not dhcp_client:
print "WARNING: No supported DHCP Client could be found!"
return
elif DHCP_CLIENT in [misc.DHCLIENT, "dhclient"]:
DHCP_CLIENT = misc.DHCLIENT
DHCP_CMD = "dhclient"
DHCP_RELEASE = "dhclient -r"
elif DHCP_CLIENT in [misc.PUMP, "pump"]:
DHCP_CLIENT = misc.PUMP
DHCP_CMD = "pump -i"
DHCP_RELEASE = "pump -r -i"
elif DHCP_CLIENT in [misc.DHCPCD, "dhcpcd"]:
DHCP_CLIENT = misc.DHCPCD
DHCP_CMD = "dhcpcd"
DHCP_RELEASE = "dhcpcd -r"
elif dhcp_client in [misc.DHCLIENT, "dhclient"]:
dhcp_client = misc.DHCLIENT
dhcp_cmd = dhcp_path
dhcp_release = dhcp_cmd + " -r"
elif dhcp_client in [misc.PUMP, "pump"]:
dhcp_client = misc.PUMP
dhcp_cmd = dhcp_path + " -i"
dhcp_release = dhcp_cmd + " -r -i"
elif dhcp_client in [misc.DHCPCD, "dhcpcd"]:
dhcp_client = misc.DHCPCD
dhcp_cmd = dhcp_path
dhcp_release = dhcp_cmd + " -r"
else:
dhcp_client = None
dhcp_cmd = None
dhcp_release = None
self.DHCP_CMD = DHCP_CMD
self.DHCP_RELEASE = DHCP_RELEASE
self.DHCP_CLIENT = DHCP_CLIENT
self.DHCP_CMD = dhcp_cmd
self.DHCP_RELEASE = dhcp_release
self.DHCP_CLIENT = dhcp_client
def CheckWiredTools(self):
""" Check for the existence of ethtool and mii-tool. """
if self._client_found("mii-tool"):
miitool_path = self._find_client_path("mii-tool")
if miitool_path:
self.miitool_cmd = miitool_path
self.MIITOOL_FOUND = True
else:
self.miitool_cmd = None
self.MIITOOL_FOUND = False
if self._client_found("ethtool"):
ethtool_path = self._find_client_path("ethtool")
if ethtool_path:
self.ethtool_cmd = ethtool_path
self.ETHTOOL_FOUND = True
else:
self.ethtool_cmd = None
self.ETHTOOL_FOUND = False
def Check(self):
@@ -236,9 +290,12 @@ class Interface(object):
self.CheckDHCP()
self.CheckWiredTools()
if self._client_found("ip"):
ip_path = self._find_client_path("ip")
if ip_path:
self.ip_cmd = ip_path
self.IP_FOUND = True
else:
self.ip_cmd = None
self.IP_FOUND = False
def Up(self):
@@ -427,25 +484,48 @@ class Interface(object):
if self.verbose: print cmd
misc.Run(cmd)
def GetIP(self):
def GetIP(self, fast=False):
""" Get the IP address of the interface.
Returns:
The IP address of the interface in dotted quad form.
"""
if fast:
return self._fast_get_ip()
cmd = 'ifconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
return misc.RunRegex(ip_pattern, output)
def IsUp(self):
def _fast_get_ip(self):
""" Gets the IP Address of the interface using ioctl.
Using ioctl calls to get the IP Address info is MUCH faster
than calling ifconfig and paring it's output. It's less
portable though, so there may be problems with it on some
systems.
"""
ifstruct = struct.pack('256s', self.iface)
try:
raw_ip = fcntl.ioctl(self.sock.fileno(), SIOCGIFADDR, ifstruct)
except IOError:
return None
except OSError:
return None
return socket.inet_ntoa(raw_ip[20:24])
def IsUp(self, fast=True):
""" Determines if the interface is up.
Returns:
True if the interface is up, False otherwise.
"""
if fast:
return self._fast_is_up()
cmd = "ifconfig " + self.iface
output = misc.Run(cmd)
lines = output.split('\n')
@@ -457,6 +537,19 @@ class Interface(object):
return True
return False
def _fast_is_up(self):
data = (self.iface + '\0' * 16)[:18]
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGIFFLAGS, data)
except IOError, e:
if self.verbose:
print "SIOCGIFFLAGS failed: " + str(e)
return False
flags, = struct.unpack('H', result[16:18])
return bool(flags & 1)
class WiredInterface(Interface):
@@ -471,7 +564,7 @@ class WiredInterface(Interface):
"""
Interface.__init__(self, iface, verbose)
def GetPluggedIn(self):
def GetPluggedIn(self, fast=False):
""" Get the current physical connection state.
The method will first attempt to use ethtool do determine
@@ -485,21 +578,23 @@ class WiredInterface(Interface):
if not self.iface:
return False
if self.ETHTOOL_FOUND and self.link_detect != misc.MIITOOL:
return self._eth_get_plugged_in()
return self._eth_get_plugged_in(fast)
elif self.MIITOOL_FOUND:
return self._mii_get_plugged_in()
return self._mii_get_plugged_in(fast)
else:
print 'Error: No way of checking for a wired connection. Make \
sure that either mii-tool or ethtool is installed.'
return False
def _eth_get_plugged_in(self):
def _eth_get_plugged_in(self, fast):
""" Use ethtool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
"""
if fast:
self._fast_eth_get_plugged_in()
link_tool = 'ethtool'
if not self.IsUp():
print 'Wired Interface is down, putting it up'
@@ -511,14 +606,33 @@ class WiredInterface(Interface):
return True
else:
return False
def _fast_eth_get_plugged_in(self):
if not self.IsUp():
self.Up()
time.sleep(1)
buff = array.array('i', [0x0000000a, 0x00000000])
addr, length = buff.buffer_info()
arg = struct.pack('Pi', addr, length)
data = (self.iface + '\0' * 16)[:16] + arg
try:
fcntl.ioctl(self.sock.fileno(), SIOCGETHTOOL, data)
except IOError, e:
if self.verbose:
print 'SIOCGETHTOOL failed: ' + str(e)
return False
return bool(buff.tolist()[1])
def _mii_get_plugged_in(self):
def _mii_get_plugged_in(self, fast):
""" Use mii-tool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
"""
if fast:
return self._fast_mii_get_plugged_in()
link_tool = 'mii-tool'
tool_data = misc.Run(link_tool + ' ' + self.iface, True)
if misc.RunRegex(re.compile('(Invalid argument)', re.I | re.M | re.S),
@@ -534,6 +648,21 @@ class WiredInterface(Interface):
else:
return False
def _fast_mii_get_plugged_in(self):
""" Get link status usingthe SIOCGMIIPHY ioctl. """
if not self.IsUp():
self.Up()
time.sleep(1)
buff = struct.pack('16shhhh', (self.iface + '\0' * 16)[:16], 0, 1,
0x0004, 0)
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGMIIPHY, buff)
except IOError, e:
if self.verbose:
print 'SIOCGMIIPHY failed: ' + str(e)
return False
reg = struct.unpack('16shhhh', result)[-1]
return bool(reg & 0x0004)
class WirelessInterface(Interface):
""" Control a wireless network interface. """
@@ -974,27 +1103,44 @@ class WirelessInterface(Interface):
cmd = 'iwpriv ' + self.iface + ' '
if self.verbose: print cmd
misc.Run(cmd)
def GetBSSID(self, fast=True):
""" Get the MAC address for the interface. """
if fast:
return self._fast_get_bssid()
else:
return ""
def _fast_get_bssid(self):
""" Gets the MAC address for the connected AP using ioctl calls. """
data = (self.iface + '\0' * 32)[:32]
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGIWAP, data)[16:]
except IOError, e:
if self.verbose:
print "SIOCGIWAP failed: " + str(e)
return ""
raw_addr = struct.unpack("xxBBBBBB", result[:8])
return "%02X:%02X:%02X:%02X:%02X:%02X" % raw_addr
def GetSignalStrength(self, iwconfig=None):
def GetSignalStrength(self, iwconfig=None, fast=False):
""" Get the signal strength of the current network.
Returns:
The signal strength.
"""
if fast:
return self._get_signal_strength_fast()
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
# implemented the patch provided in
# https://bugs.launchpad.net/wicd/+bug/175104
# it was for the stable version, so I've improvised here
# should work though
#strength = misc.RunRegex(strength_pattern,output)
[(strength, max_strength)] = strength_pattern.findall(output)
if max_strength and strength:
@@ -1004,14 +1150,45 @@ class WirelessInterface(Interface):
strength = misc.RunRegex(altstrength_pattern, output)
return strength
def GetDBMStrength(self, iwconfig=None):
def _get_signal_strength_fast(self):
""" Get the link quality using ioctl calls. """
buff = get_iw_ioctl_result(self.iface, SIOCGIWSTATS)
strength = ord(buff[2])
max_strength = self._get_max_strength_fast()
if strength and max_strength:
return 100 * int(strength) // int(max_strength)
return strength
def _get_max_strength_fast(self):
""" Gets the maximum possible strength from the wireless driver. """
buff = array.array('c', '\0' * 700)
addr, length = buff.buffer_info()
arg = struct.pack('Pi', addr, length)
iwfreq = (self.iface + '\0' * 16)[:16] + arg
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGIWRANGE, iwfreq)
except IOError, e:
if self.verbose:
print "SIOCGIWRANGE failed: " + str(e)
return None
fmt = "iiihb6ii4B4Bi32i2i2i2i2i3h8h2b2bhi8i2b3h2i2ihB17x" + 32*"ihbb"
size = struct.calcsize(fmt)
data = buff.tostring()
data = data[0:size]
values = struct.unpack(fmt, data)
return values[12]
def GetDBMStrength(self, iwconfig=None, fast=False):
""" Get the dBm signal strength of the current network.
Returns:
The dBm signal strength.
"""
if fast:
return self._get_dbm_strength_fast()
if iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
@@ -1020,15 +1197,25 @@ class WirelessInterface(Interface):
output = iwconfig
dbm_strength = misc.RunRegex(signaldbm_pattern, output)
return dbm_strength
def _get_dbm_strength_fast(self):
buff = misc.get_irwange_ioctl_result(self.iface, SIOCGIWSTATS)
if not buff:
return None
return str((ord(buff[3]) - 256))
def GetCurrentNetwork(self, iwconfig=None):
def GetCurrentNetwork(self, iwconfig=None, fast=False):
""" Get the essid of the current network.
Returns:
The current network essid.
"""
if fast:
return self._get_essid_fast()
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
@@ -1040,3 +1227,12 @@ class WirelessInterface(Interface):
if network:
network = misc.to_unicode(network)
return network
def _get_essid_fast(self):
""" Get the current essid using ioctl. """
buff = get_iw_ioctl_result(self.iface, SIOCGIWESSID)
if not buff:
return None
return buff.strip('\x00')