1
0
mirror of https://github.com/gryf/wicd.git synced 2026-03-14 05:35:49 +01:00

experimental:

- Merge in changes from pluggablebackends.
This commit is contained in:
imdano
2008-09-06 16:54:53 +00:00
parent 9639cc8a14
commit 59d282ee6e
17 changed files with 2217 additions and 1329 deletions

View File

@@ -31,59 +31,23 @@ class WirelessInterface() -- Control a wireless network interface.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import re
import os
import time
import socket
import fcntl
import struct
import array
import re
import wicd.wpath as wpath
import wicd.misc as misc
# Compile the regex patterns that will be used to search the output of iwlist
# scan for info these are well tested, should work on most cards
essid_pattern = re.compile('.*ESSID:"(.*?)"\n', re.I | re.M | re.S)
ap_mac_pattern = re.compile('.*Address: (.*?)\n', re.I | re.M | re.S)
channel_pattern = re.compile('.*Channel:? ?(\d\d?)', re.I | re.M | re.S)
strength_pattern = re.compile('.*Quality:?=? ?(\d+)\s*/?\s*(\d*)', re.I | re.M | re.S)
# These next two look a lot a like, altstrength is for Signal level = xx/100,
# which is just an alternate way of displaying link quality, signaldbm is
# for displaying actual signal strength (-xx dBm).
altstrength_pattern = re.compile('.*Signal level:?=? ?(\d\d*)', re.I | re.M | re.S)
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)', re.I | re.M | re.S)
mode_pattern = re.compile('.*Mode:(.*?)\n', re.I | re.M | re.S)
freq_pattern = re.compile('.*Frequency:(.*?)\n', re.I | re.M | re.S)
ip_pattern = re.compile(r'inet [Aa]d?dr[^.]*:([^.]*\.[^.]*\.[^.]*\.[0-9]*)', re.S)
bssid_pattern = re.compile('.*Access Point: (([0-9A-Z]{2}:){5}[0-9A-Z]{2})', re.I | re.M | re.S)
wep_pattern = re.compile('.*Encryption key:(.*?)\n', re.I | re.M | re.S)
altwpa_pattern = re.compile('(wpa_ie)', re.I | re.M | re.S)
wpa1_pattern = re.compile('(WPA Version 1)', re.I | re.M | re.S)
wpa2_pattern = re.compile('(WPA2)', re.I | re.M | re.S)
# Patterns for wpa_cli output
auth_pattern = re.compile('.*wpa_state=(.*?)\n', re.I | re.M | re.S)
import wpath
import misc
RALINK_DRIVER = 'ralink legacy'
SIOCGIWESSID = 0x8B1B
SIOCGIFADDR = 0x8915
SIOCGIWSTATS = 0x8B0F
SIOCGIFHWADDR = 0x8927
SIOCGMIIPHY = 0x8947
SIOCGETHTOOL = 0x8946
SIOCGIFFLAGS = 0x8913
SIOCGIWRANGE = 0x8B0B
SIOCGIWAP = 0x8B15
def _sanitize_string(string):
blacklist = [';', '`', '$', '!', '*', '|', '>', '<']
new_string = []
if not string:
return string
for c in string:
if c in blacklist:
new_string.append("\\" + c)
@@ -102,7 +66,7 @@ def SetDNS(dns1=None, dns2=None, dns3=None):
dns3 -- IP address of DNS server 1
"""
resolv = open("/etc/resolv.conf","w")
resolv = open("/etc/resolv.conf", "w")
for dns in [dns1, dns2, dns3]:
if dns:
if misc.IsValidIP(dns):
@@ -147,42 +111,24 @@ def GetWirelessInterfaces():
dev_dir = '/sys/class/net/'
ifnames = []
ifnames = [iface for iface in os.listdir(dev_dir) if os.path.isdir(dev_dir + iface) \
ifnames = [iface for iface in os.listdir(dev_dir) if os.path.isdir(dev_dir + iface)
and 'wireless' in os.listdir(dev_dir + iface)]
return bool(ifnames) and ifnames[0] or None
def GetWiredInterfaces():
basedir = '/sys/class/net/'
return [iface for iface in os.listdir(basedir) if not 'wireless' \
in os.listdir(basedir + iface) and \
return [iface for iface in os.listdir(basedir) if not 'wireless'
in os.listdir(basedir + iface) and
open(basedir + iface + "/type").readlines()[0].strip() == "1"]
def NeedsExternalCalls():
print ("NeedsExternalCalls: returning default of True. You should " +
"implement this yourself.")
return True
def get_iw_ioctl_result(iface, call):
""" Makes the given ioctl call and returns the results.
Keyword arguments:
call -- The ioctl call to make
Returns:
The results of the ioctl call.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
buff = array.array('c', '\0' * 32)
addr, length = buff.buffer_info()
arg = struct.pack('Pi', addr, length)
data = (iface + '\0' * 16)[:16] + arg
try:
result = fcntl.ioctl(s.fileno(), call, data)
except IOError:
return None
except OSError:
return None
return buff.tostring()
class Interface(object):
class BaseInterface(object):
""" Control a network interface. """
def __init__(self, iface, verbose=False):
""" Initialise the object.
@@ -201,10 +147,8 @@ class Interface(object):
self.ETHTOOL_FOUND = False
self.IP_FOUND = False
self.flush_tool = None
self.link_detect = None
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.Check()
self.link_detect = None
def SetDebugMode(self, value):
""" If True, verbose output is enabled. """
self.verbose = value
@@ -548,7 +492,7 @@ class Interface(object):
gw -- gateway of the default route in dotted quad form
"""
if not self.iface: return
if not self.iface: return
if not misc.IsValidIP(gw):
print 'WARNING: Invalid gateway found. Aborting!'
return False
@@ -556,77 +500,28 @@ class Interface(object):
if self.verbose: print cmd
misc.Run(cmd)
def GetIP(self, fast=False):
def GetIP(self, ifconfig=""):
""" Get the IP address of the interface.
Returns:
The IP address of the interface in dotted quad form.
"""
if not self.iface: return False
if fast:
return self._fast_get_ip()
cmd = 'ifconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
return misc.RunRegex(ip_pattern, output)
print 'Implement this in a derived class!'
pass
def _fast_get_ip(self):
""" Gets the IP Address of the interface using ioctl.
Using ioctl calls to get the IP Address info is MUCH faster
than calling ifconfig and paring it's output. It's less
portable though, so there may be problems with it on some
systems.
"""
ifstruct = struct.pack('256s', self.iface)
try:
raw_ip = fcntl.ioctl(self.sock.fileno(), SIOCGIFADDR, ifstruct)
except IOError:
return None
except OSError:
return None
return socket.inet_ntoa(raw_ip[20:24])
def IsUp(self, fast=True):
def IsUp(self):
""" Determines if the interface is up.
Returns:
True if the interface is up, False otherwise.
"""
if not self.iface: return False
if fast:
return self._fast_is_up()
cmd = "ifconfig " + self.iface
output = misc.Run(cmd)
lines = output.split('\n')
if len(lines) < 5:
return False
for line in lines[1:4]:
if line.strip().startswith('UP'):
return True
return False
def _fast_is_up(self):
""" Determines if the interfae is up using an ioctl call. """
data = (self.iface + '\0' * 16)[:18]
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGIFFLAGS, data)
except IOError, e:
if self.verbose:
print "SIOCGIFFLAGS failed: " + str(e)
return False
flags, = struct.unpack('H', result[16:18])
return bool(flags & 1)
print 'Implement this in a derived class!'
pass
class WiredInterface(Interface):
class BaseWiredInterface(BaseInterface):
""" Control a wired network interface. """
def __init__(self, iface, verbose=False):
""" Initialise the wired network interface class.
@@ -636,9 +531,9 @@ class WiredInterface(Interface):
verbose -- print all commands
"""
Interface.__init__(self, iface, verbose)
BaseInterface.__init__(self, iface, verbose)
def GetPluggedIn(self, fast=False):
def GetPluggedIn(self):
""" Get the current physical connection state.
The method will first attempt to use ethtool do determine
@@ -649,104 +544,11 @@ class WiredInterface(Interface):
True if a link is detected, False otherwise.
"""
if not self.iface: return False
if self.ETHTOOL_FOUND and self.link_detect != misc.MIITOOL:
return self._eth_get_plugged_in(fast)
elif self.MIITOOL_FOUND:
return self._mii_get_plugged_in(fast)
else:
print 'Error: No way of checking for a wired connection. Make \
sure that either mii-tool or ethtool is installed.'
return False
def _eth_get_plugged_in(self, fast):
""" Use ethtool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
"""
if fast:
self._fast_eth_get_plugged_in()
link_tool = 'ethtool'
if not self.IsUp():
print 'Wired Interface is down, putting it up'
self.Up()
time.sleep(6)
tool_data = misc.Run(link_tool + ' ' + self.iface, True)
if misc.RunRegex(re.compile('(Link detected: yes)', re.I | re.M |
re.S), tool_data) is not None:
return True
else:
return False
def _fast_eth_get_plugged_in(self):
""" Determines link connection status using an ioctl call.
Uses the SIOCGETHTOOL ioctl call to determine link status.
Returns:
True if a wire is plugged in, False otherwise.
"""
if not self.IsUp():
self.Up()
time.sleep(5)
buff = array.array('i', [0x0000000a, 0x00000000])
addr, length = buff.buffer_info()
arg = struct.pack('Pi', addr, length)
data = (self.iface + '\0' * 16)[:16] + arg
try:
fcntl.ioctl(self.sock.fileno(), SIOCGETHTOOL, data)
except IOError, e:
if self.verbose:
print 'SIOCGETHTOOL failed: ' + str(e)
return False
return bool(buff.tolist()[1])
def _mii_get_plugged_in(self, fast):
""" Use mii-tool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
"""
if fast:
return self._fast_mii_get_plugged_in()
link_tool = 'mii-tool'
tool_data = misc.Run(link_tool + ' ' + self.iface, True)
if misc.RunRegex(re.compile('(Invalid argument)', re.I | re.M | re.S),
tool_data) is not None:
print 'Wired Interface is down, putting it up'
self.Up()
time.sleep(4)
tool_data = misc.Run(link_tool + ' ' + self.iface, True)
if misc.RunRegex(re.compile('(link ok)', re.I | re.M | re.S),
tool_data) is not None:
return True
else:
return False
def _fast_mii_get_plugged_in(self):
""" Get link status using the SIOCGMIIPHY ioctl call. """
if not self.IsUp():
self.Up()
time.sleep(2.5)
buff = struct.pack('16shhhh', (self.iface + '\0' * 16)[:16], 0, 1,
0x0004, 0)
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGMIIPHY, buff)
except IOError, e:
if self.verbose:
print 'SIOCGMIIPHY failed: ' + str(e)
return False
reg = struct.unpack('16shhhh', result)[-1]
return bool(reg & 0x0004)
print 'Implement this in a derived class!'
pass
class WirelessInterface(Interface):
class BaseWirelessInterface(BaseInterface):
""" Control a wireless network interface. """
def __init__(self, iface, verbose=False, wpa_driver='wext'):
""" Initialise the wireless network interface class.
@@ -756,7 +558,7 @@ class WirelessInterface(Interface):
verbose -- print all commands
"""
Interface.__init__(self, iface, verbose)
BaseInterface.__init__(self, iface, verbose)
self.wpa_driver = wpa_driver
self.scan_iface = None
@@ -790,7 +592,7 @@ class WirelessInterface(Interface):
"""
if not self.iface: return False
output = misc.Run("iwconfig " + self.iface)
output = self.GetIwconfig()
killswitch_pattern = re.compile('.*radio off', re.I | re.M | re.S)
if killswitch_pattern.search(output):
@@ -803,43 +605,9 @@ class WirelessInterface(Interface):
def GetIwconfig(self):
""" Returns the output of iwconfig for this interface. """
if not self.iface: return ""
return misc.Run("iwconfig " + self.iface)
def GetNetworks(self):
""" Get a list of available wireless networks.
Returns:
A list containing available wireless networks.
"""
cmd = 'iwlist ' + self.iface + ' scan'
cmd = "iwconfig " + self.iface
if self.verbose: print cmd
results = misc.Run(cmd)
# Split the networks apart, using Cell as our split point
# this way we can look at only one network at a time.
# The spaces around ' Cell ' are to minimize the chance that someone
# has an essid named Cell...
networks = results.split( ' Cell ' )
# Get available network info from iwpriv get_site_survey
# if we're using a ralink card (needed to get encryption info)
if self.wpa_driver == RALINK_DRIVER:
ralink_info = self._GetRalinkInfo()
else:
ralink_info = None
# An array for the access points
access_points = []
for cell in networks:
# Only use sections where there is an ESSID.
if 'ESSID:' in cell:
# Add this network to the list of networks
entry = self._ParseAccessPoint(cell, ralink_info)
if entry is not None:
access_points.append(entry)
return access_points
return misc.Run(cmd)
def _FreqToChannel(self, freq):
""" Translate the specified frequency to a channel.
@@ -880,90 +648,6 @@ class WirelessInterface(Interface):
lines = lines[2:]
return lines
def _ParseAccessPoint(self, cell, ralink_info):
""" Parse a single cell from the output of iwlist.
Keyword arguments:
cell -- string containing the cell information
ralink_info -- string contating network information needed
for ralink cards.
Returns:
A dictionary containing the cell networks properties.
"""
ap = {}
# ESSID - Switch '<hidden>' to 'Hidden' to remove
# brackets that can mix up formatting.
ap['essid'] = misc.RunRegex(essid_pattern, cell)
try:
ap['essid'] = misc.to_unicode(ap['essid'])
except (UnicodeDecodeError, UnicodeEncodeError):
print 'Unicode problem with current network essid, ignoring!!'
return None
if ap['essid'] in ['<hidden>', ""]:
ap['essid'] = 'Hidden'
ap['hidden'] = True
else:
ap['hidden'] = False
# Channel - For cards that don't have a channel number,
# convert the frequency.
ap['channel'] = misc.RunRegex(channel_pattern, cell)
if ap['channel'] == None:
freq = misc.RunRegex(freq_pattern, cell)
ap['channel'] = self._FreqToChannel(freq)
# BSSID
ap['bssid'] = misc.RunRegex(ap_mac_pattern, cell)
# Mode
ap['mode'] = misc.RunRegex(mode_pattern, cell)
# Break off here if we're using a ralink card
if self.wpa_driver == RALINK_DRIVER:
ap = self._ParseRalinkAccessPoint(ap, ralink_info, cell)
elif misc.RunRegex(wep_pattern, cell) == 'on':
# Encryption - Default to WEP
ap['encryption'] = True
ap['encryption_method'] = 'WEP'
if misc.RunRegex(wpa1_pattern, cell) == 'WPA Version 1':
ap['encryption_method'] = 'WPA'
if misc.RunRegex(altwpa_pattern, cell) == 'wpa_ie':
ap['encryption_method'] = 'WPA'
if misc.RunRegex(wpa2_pattern, cell) == 'WPA2':
ap['encryption_method'] = 'WPA2'
else:
ap['encryption'] = False
# Link Quality
# Set strength to -1 if the quality is not found
# more of the patch from
# https://bugs.launchpad.net/wicd/+bug/175104
if (strength_pattern.match(cell)):
[(strength, max_strength)] = strength_pattern.findall(cell)
if max_strength:
ap["quality"] = 100 * int(strength) // int(max_strength)
else:
ap["quality"] = int(strength)
elif misc.RunRegex(altstrength_pattern,cell):
ap['quality'] = misc.RunRegex(altstrength_pattern, cell)
else:
ap['quality'] = -1
# Signal Strength (only used if user doesn't want link
# quality displayed or it isn't found)
if misc.RunRegex(signaldbm_pattern, cell):
ap['strength'] = misc.RunRegex(signaldbm_pattern, cell)
elif self.wpa_driver != RALINK_DRIVER: # This is already set for ralink
ap['strength'] = -1
return ap
def _ParseRalinkAccessPoint(self, ap, ralink_info, cell):
""" Parse encryption and signal strength info for ralink cards
@@ -1101,52 +785,8 @@ class WirelessInterface(Interface):
False otherwise.
"""
# Right now there's no way to do this for ralink drivers
if self.wpa_driver == RALINK_DRIVER or not self.WPA_CLI_FOUND:
return True
MAX_TIME = 15
MAX_DISCONNECTED_TIME = 3
while (time.time() - auth_time) < MAX_TIME:
cmd = 'wpa_cli -i ' + self.iface + ' status'
output = misc.Run(cmd)
result = misc.RunRegex(auth_pattern, output)
if self.verbose:
print 'WPA_CLI RESULT IS', result
if not result:
print "WARNING: Received an unexpected result from wpa_cli!" + \
"\nMake sure you're using the right wpa_supplicant " + \
"driver (you probably want wext).\nIf the problem " + \
"persists, please file a bug report."
return False
if result == "COMPLETED":
return True
elif result == "DISCONNECTED" and \
(time.time() - auth_time) > MAX_DISCONNECTED_TIME:
# Force a rescan to get wpa_supplicant moving again.
self._ForceSupplicantScan()
MAX_TIME += 5
time.sleep(1)
print 'wpa_supplicant authentication may have failed.'
return False
def _ForceSupplicantScan(self):
""" Force wpa_supplicant to rescan available networks.
This function forces wpa_supplicant to rescan.
This works around authentication validation sometimes failing for
wpa_supplicant because it remains in a DISCONNECTED state for
quite a while, after which a rescan is required, and then
attempting to authenticate. This whole process takes a long
time, so we manually speed it up if we see it happening.
"""
print 'wpa_supplicant rescan forced...'
cmd = 'wpa_cli -i' + self.iface + ' scan'
misc.Run(cmd)
print 'Implement this in a derived class!'
pass
def _AuthenticateRalinkLegacy(self, network):
""" Authenticate with the specified wireless network.
@@ -1205,154 +845,38 @@ class WirelessInterface(Interface):
if self.verbose: print cmd
misc.Run(cmd)
def GetBSSID(self, iwconfig="", fast=True):
def GetBSSID(self, iwconfig=None):
""" Get the MAC address for the interface. """
if not self.iface: return ""
if fast:
return self._fast_get_bssid()
else:
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
bssid = misc.RunRegex(bssid_pattern, output)
return bssid
def _fast_get_bssid(self):
""" Gets the MAC address for the connected AP using ioctl calls. """
data = (self.iface + '\0' * 32)[:32]
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGIWAP, data)[16:]
except IOError, e:
if self.verbose:
print "SIOCGIWAP failed: " + str(e)
return ""
raw_addr = struct.unpack("xxBBBBBB", result[:8])
return "%02X:%02X:%02X:%02X:%02X:%02X" % raw_addr
print 'Implement this in a derived class!'
pass
def GetSignalStrength(self, iwconfig=None, fast=False):
def GetSignalStrength(self, iwconfig=None):
""" Get the signal strength of the current network.
Returns:
The signal strength.
"""
if not self.iface: return -1
if fast:
return self._get_signal_strength_fast()
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
[(strength, max_strength)] = strength_pattern.findall(output)
if max_strength and strength:
return 100 * int(strength) // int(max_strength)
if strength is None:
strength = misc.RunRegex(altstrength_pattern, output)
return strength
print 'Implement this in a derived class!'
pass
def _get_signal_strength_fast(self):
""" Get the link quality using ioctl calls. """
buff = get_iw_ioctl_result(self.iface, SIOCGIWSTATS)
strength = ord(buff[2])
max_strength = self._get_max_strength_fast()
if strength and max_strength:
return 100 * int(strength) // int(max_strength)
return strength
def _get_max_strength_fast(self):
""" Gets the maximum possible strength from the wireless driver. """
buff = array.array('c', '\0' * 700)
addr, length = buff.buffer_info()
arg = struct.pack('Pi', addr, length)
iwfreq = (self.iface + '\0' * 16)[:16] + arg
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGIWRANGE, iwfreq)
except IOError, e:
if self.verbose:
print "SIOCGIWRANGE failed: " + str(e)
return None
# This defines the iwfreq struct, used to get signal strength.
fmt = "iiihb6ii4B4Bi32i2i2i2i2i3h8h2b2bhi8i2b3h2i2ihB17x" + 32 * "ihbb"
size = struct.calcsize(fmt)
data = buff.tostring()
data = data[0:size]
values = struct.unpack(fmt, data)
return values[12]
def GetDBMStrength(self, iwconfig=None, fast=False):
def GetDBMStrength(self, iwconfig=None):
""" Get the dBm signal strength of the current network.
Returns:
The dBm signal strength.
"""
if not self.iface: return -100
if fast:
return self._get_dbm_strength_fast()
if iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
dbm_strength = misc.RunRegex(signaldbm_pattern, output)
return dbm_strength
def _get_dbm_strength_fast(self):
""" Uses the SIOCGIWSTATS ioctl call to get dbm signal strength.
Returns:
The dBm signal strength or None if it can't be found.
"""
buff = misc.get_irwange_ioctl_result(self.iface, SIOCGIWSTATS)
if not buff:
return None
print 'Implement this in a derived class!'
pass
return str((ord(buff[3]) - 256))
def GetCurrentNetwork(self, iwconfig=None, fast=False):
def GetCurrentNetwork(self, iwconfig=None):
""" Get the essid of the current network.
Returns:
The current network essid.
"""
if not self.iface: return ""
if fast:
return self._get_essid_fast()
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
network = misc.RunRegex(re.compile('.*ESSID:"(.*?)"',
re.I | re.M | re.S), output)
if network:
network = misc.to_unicode(network)
return network
def _get_essid_fast(self):
""" Get the current essid using ioctl. """
buff = get_iw_ioctl_result(self.iface, SIOCGIWESSID)
if not buff:
return None
return buff.strip('\x00')
print 'Implement this in a derived class!'
pass