1
0
mirror of https://github.com/gryf/wicd.git synced 2025-12-21 13:28:08 +01:00

Refactor wnettools/backend code so that most external tool functionality exists in wnettools, and it just inherited by the external backends. This simplifies creating new backends that just override selected methods.

Rewrite encryption template parsing code to allow blank entries (though the GUI still doesn't), and be more permissive of slightly incorrect formatting.
Fix bug in wicd-monitor error handling code.
This commit is contained in:
Dan O'Reilly
2009-02-17 23:29:14 -05:00
parent 7e18194c29
commit a62840dd96
7 changed files with 427 additions and 483 deletions

View File

@@ -35,9 +35,9 @@ from wicd.wnettools import *
import re import re
import os import os
import os.path import os.path
import time
# Regular expressions for wpa_cli output
auth_patten = re.compile('.*wpa_state=(.*?)\n', wnettools.__re_mode)
NAME = "external" NAME = "external"
UPDATE_INTERVAL = 5 UPDATE_INTERVAL = 5
DESCRIPTION = """External app (original) backend DESCRIPTION = """External app (original) backend
@@ -49,30 +49,6 @@ it doesn't require any third party libraries and may be
more stable for some set ups. more stable for some set ups.
""" """
# Compile the regex patterns that will be used to search the output of iwlist
# scan for info these are well tested, should work on most cards
essid_pattern = re.compile('.*ESSID:"?(.*?)"?\s*\n', re.I | re.M | re.S)
ap_mac_pattern = re.compile('.*Address: (.*?)\n', re.I | re.M | re.S)
channel_pattern = re.compile('.*Channel:? ?(\d\d?)', re.I | re.M | re.S)
strength_pattern = re.compile('.*Quality:?=? ?(\d+)\s*/?\s*(\d*)', re.I | re.M | re.S)
# These next two look a lot a like, altstrength is for Signal level = xx/100,
# which is just an alternate way of displaying link quality, signaldbm is
# for displaying actual signal strength (-xx dBm).
altstrength_pattern = re.compile('.*Signal level:?=? ?(\d\d*)', re.I | re.M | re.S)
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)', re.I | re.M | re.S)
mode_pattern = re.compile('.*Mode:(.*?)\n', re.I | re.M | re.S)
freq_pattern = re.compile('.*Frequency:(.*?)\n', re.I | re.M | re.S)
ip_pattern = re.compile(r'inet [Aa]d?dr[^.]*:([^.]*\.[^.]*\.[^.]*\.[0-9]*)', re.S)
bssid_pattern = re.compile('.*Access Point: (([0-9A-Z]{2}:){5}[0-9A-Z]{2})', re.I | re.M | re.S)
wep_pattern = re.compile('.*Encryption key:(.*?)\n', re.I | re.M | re.S)
altwpa_pattern = re.compile('(wpa_ie)', re.I | re.M | re.S)
wpa1_pattern = re.compile('(WPA Version 1)', re.I | re.M | re.S)
wpa2_pattern = re.compile('(WPA2)', re.I | re.M | re.S)
# Patterns for wpa_cli output
auth_pattern = re.compile('.*wpa_state=(.*?)\n', re.I | re.M | re.S)
RALINK_DRIVER = 'ralink legacy' RALINK_DRIVER = 'ralink legacy'
@@ -94,54 +70,6 @@ class Interface(wnettools.BaseInterface):
wnettools.BaseInterface.__init__(self, iface, verbose) wnettools.BaseInterface.__init__(self, iface, verbose)
self.Check() self.Check()
def GetIP(self, ifconfig=""):
""" Get the IP address of the interface.
Returns:
The IP address of the interface in dotted quad form.
"""
if not ifconfig:
cmd = 'ifconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = ifconfig
return misc.RunRegex(ip_pattern, output)
def IsUp(self, ifconfig=None):
""" Determines if the interface is up.
Returns:
True if the interface is up, False otherwise.
"""
if not self.iface: return False
flags_file = '/sys/class/net/%s/flags' % self.iface
try:
flags = open(flags_file, "r").read().strip()
except IOError:
print "Could not open %s, using ifconfig to determine status" % flags_file
return self._slow_is_up(ifconfig)
return bool(int(flags, 16) & 1)
def _slow_is_up(self, ifconfig=None):
""" Determine if an interface is up using ifconfig. """
if not ifconfig:
cmd = "ifconfig " + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = ifconfig
lines = output.split('\n')
if len(lines) < 5:
return False
for line in lines[1:4]:
if line.strip().startswith('UP'):
return True
return False
class WiredInterface(Interface, wnettools.BaseWiredInterface): class WiredInterface(Interface, wnettools.BaseWiredInterface):
""" Control a wired network interface. """ """ Control a wired network interface. """
@@ -156,97 +84,6 @@ class WiredInterface(Interface, wnettools.BaseWiredInterface):
wnettools.BaseWiredInterface.__init__(self, iface, verbose) wnettools.BaseWiredInterface.__init__(self, iface, verbose)
Interface.__init__(self, iface, verbose) Interface.__init__(self, iface, verbose)
def GetPluggedIn(self):
""" Get the current physical connection state.
The method will first attempt to use ethtool do determine
physical connection state. Should ethtool fail to run properly,
mii-tool will be used instead.
Returns:
True if a link is detected, False otherwise.
"""
if not self.iface:
return False
# check for link using /sys/class/net/iface/carrier
# is usually more accurate
sys_device = '/sys/class/net/%s/' % self.iface
carrier_path = sys_device + 'carrier'
if not self.IsUp():
MAX_TRIES = 3
tries = 0
self.Up()
while True:
tries += 1
time.sleep(2)
if self.IsUp() or tries > MAX_TRIES: break
if os.path.exists(carrier_path):
carrier = open(carrier_path, 'r')
try:
link = carrier.read().strip()
link = int(link)
if link == 1:
return True
elif link == 0:
return False
except (IOError, ValueError, TypeError):
print 'Error checking link using /sys/class/net/%s/carrier' % self.iface
if self.ethtool_cmd and self.link_detect in [misc.ETHTOOL, misc.AUTO]:
return self._eth_get_plugged_in()
elif self.miitool_cmd and self.link_detect in [misc.MIITOOL, misc.AUTO]:
return self._mii_get_plugged_in()
else:
print ('Error: No way of checking for a wired connection. Make ' +
'sure that either mii-tool or ethtool is installed.')
return False
def _eth_get_plugged_in(self):
""" Use ethtool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
"""
cmd = "%s %s" % (self.ethtool_cmd, self.iface)
if not self.IsUp():
print 'Wired Interface is down, putting it up'
self.Up()
time.sleep(6)
if self.verbose: print cmd
tool_data = misc.Run(cmd, include_stderr=True)
if misc.RunRegex(re.compile('(Link detected: yes)', re.I | re.M | re.S),
tool_data):
return True
else:
return False
def _mii_get_plugged_in(self):
""" Use mii-tool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
"""
cmd = "%s %s" % (self.miitool_cmd, self.iface)
if self.verbose: print cmd
tool_data = misc.Run(cmd, include_stderr=True)
if misc.RunRegex(re.compile('(Invalid argument)', re.I | re.M | re.S),
tool_data) is not None:
print 'Wired Interface is down, putting it up'
self.Up()
time.sleep(4)
if self.verbose: print cmd
tool_data = misc.Run(cmd, include_stderr=True)
if misc.RunRegex(re.compile('(link ok)', re.I | re.M | re.S),
tool_data) is not None:
return True
else:
return False
class WirelessInterface(Interface, wnettools.BaseWirelessInterface): class WirelessInterface(Interface, wnettools.BaseWirelessInterface):
""" Control a wireless network interface. """ """ Control a wireless network interface. """
@@ -262,265 +99,3 @@ class WirelessInterface(Interface, wnettools.BaseWirelessInterface):
wpa_driver) wpa_driver)
Interface.__init__(self, iface, verbose) Interface.__init__(self, iface, verbose)
def GetNetworks(self):
""" Get a list of available wireless networks.
Returns:
A list containing available wireless networks.
"""
cmd = 'iwlist ' + self.iface + ' scan'
if self.verbose: print cmd
results = misc.Run(cmd)
# Split the networks apart, using Cell as our split point
# this way we can look at only one network at a time.
# The spaces around ' Cell ' are to minimize the chance that someone
# has an essid named Cell...
networks = results.split( ' Cell ' )
# Get available network info from iwpriv get_site_survey
# if we're using a ralink card (needed to get encryption info)
if self.wpa_driver == RALINK_DRIVER:
ralink_info = self._GetRalinkInfo()
else:
ralink_info = None
# An array for the access points
access_points = []
for cell in networks:
# Only use sections where there is an ESSID.
if 'ESSID:' in cell:
# Add this network to the list of networks
entry = self._ParseAccessPoint(cell, ralink_info)
if entry is not None:
access_points.append(entry)
return access_points
def _ParseAccessPoint(self, cell, ralink_info):
""" Parse a single cell from the output of iwlist.
Keyword arguments:
cell -- string containing the cell information
ralink_info -- string contating network information needed
for ralink cards.
Returns:
A dictionary containing the cell networks properties.
"""
ap = {}
ap['essid'] = misc.RunRegex(essid_pattern, cell)
try:
ap['essid'] = misc.to_unicode(ap['essid'])
except (UnicodeDecodeError, UnicodeEncodeError):
print 'Unicode problem with current network essid, ignoring!!'
return None
if ap['essid'] in ['<hidden>', ""]:
ap['hidden'] = True
ap['essid'] = "<hidden>"
else:
ap['hidden'] = False
# Channel - For cards that don't have a channel number,
# convert the frequency.
ap['channel'] = misc.RunRegex(channel_pattern, cell)
if ap['channel'] == None:
freq = misc.RunRegex(freq_pattern, cell)
ap['channel'] = self._FreqToChannel(freq)
# BSSID
ap['bssid'] = misc.RunRegex(ap_mac_pattern, cell)
# Mode
ap['mode'] = misc.RunRegex(mode_pattern, cell)
# Break off here if we're using a ralink card
if self.wpa_driver == RALINK_DRIVER:
ap = self._ParseRalinkAccessPoint(ap, ralink_info, cell)
elif misc.RunRegex(wep_pattern, cell) == 'on':
# Encryption - Default to WEP
ap['encryption'] = True
ap['encryption_method'] = 'WEP'
if misc.RunRegex(wpa1_pattern, cell) == 'WPA Version 1':
ap['encryption_method'] = 'WPA'
if misc.RunRegex(altwpa_pattern, cell) == 'wpa_ie':
ap['encryption_method'] = 'WPA'
if misc.RunRegex(wpa2_pattern, cell) == 'WPA2':
ap['encryption_method'] = 'WPA2'
else:
ap['encryption'] = False
# Link Quality
# Set strength to -1 if the quality is not found
# more of the patch from
# https://bugs.launchpad.net/wicd/+bug/175104
if (strength_pattern.match(cell)):
[(strength, max_strength)] = strength_pattern.findall(cell)
if max_strength:
ap["quality"] = 100 * int(strength) // int(max_strength)
else:
ap["quality"] = int(strength)
elif misc.RunRegex(altstrength_pattern,cell):
ap['quality'] = misc.RunRegex(altstrength_pattern, cell)
else:
ap['quality'] = -1
# Signal Strength (only used if user doesn't want link
# quality displayed or it isn't found)
if misc.RunRegex(signaldbm_pattern, cell):
ap['strength'] = misc.RunRegex(signaldbm_pattern, cell)
elif self.wpa_driver != RALINK_DRIVER: # This is already set for ralink
ap['strength'] = -1
return ap
def ValidateAuthentication(self, auth_time):
""" Validate WPA authentication.
Validate that the wpa_supplicant authentication
process was successful.
NOTE: It's possible this could return False,
though in reality wpa_supplicant just isn't
finished yet.
Keyword arguments:
auth_time -- The time at which authentication began.
Returns:
True if wpa_supplicant authenticated succesfully,
False otherwise.
"""
# Right now there's no way to do this for these drivers
if self.wpa_driver == RALINK_DRIVER or not self.wpa_cli_cmd:
return True
MAX_TIME = 35
MAX_DISCONNECTED_TIME = 3
disconnected_time = 0
while (time.time() - auth_time) < MAX_TIME:
cmd = '%s -i %s status' % (self.wpa_cli_cmd, self.iface)
output = misc.Run(cmd)
result = misc.RunRegex(auth_pattern, output)
if self.verbose:
print 'WPA_CLI RESULT IS', result
if not result:
return False
if result == "COMPLETED":
return True
elif result == "DISCONNECTED":
disconnected_time += 1
if disconnected_time > MAX_DISCONNECTED_TIME:
disconnected_time = 0
# Force a rescan to get wpa_supplicant moving again.
self._ForceSupplicantScan()
MAX_TIME += 5
else:
disconnected_time = 0
time.sleep(1)
print 'wpa_supplicant authentication may have failed.'
return False
def _ForceSupplicantScan(self):
""" Force wpa_supplicant to rescan available networks.
This function forces wpa_supplicant to rescan.
This works around authentication validation sometimes failing for
wpa_supplicant because it remains in a DISCONNECTED state for
quite a while, after which a rescan is required, and then
attempting to authenticate. This whole process takes a long
time, so we manually speed it up if we see it happening.
"""
print 'wpa_supplicant rescan forced...'
cmd = 'wpa_cli -i' + self.iface + ' scan'
misc.Run(cmd)
def StopWPA(self):
""" Terminates wpa using wpa_cli"""
cmd = 'wpa_cli -i %s terminate' % self.iface
if self.verbose: print cmd
misc.Run(cmd)
def GetBSSID(self, iwconfig=None):
""" Get the MAC address for the interface. """
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
bssid = misc.RunRegex(bssid_pattern, output)
return bssid
def GetSignalStrength(self, iwconfig=None):
""" Get the signal strength of the current network.
Returns:
The signal strength.
"""
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
[(strength, max_strength)] = strength_pattern.findall(output)
if max_strength and strength:
if int(max_strength) != 0:
return 100 * int(strength) // int(max_strength)
else:
# Prevent a divide by zero error.
strength = int(strength)
if strength is None:
strength = misc.RunRegex(altstrength_pattern, output)
return strength
def GetDBMStrength(self, iwconfig=None):
""" Get the dBm signal strength of the current network.
Returns:
The dBm signal strength.
"""
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
dbm_strength = misc.RunRegex(signaldbm_pattern, output)
return dbm_strength
def GetCurrentNetwork(self, iwconfig=None):
""" Get the essid of the current network.
Returns:
The current network essid.
"""
if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
network = misc.RunRegex(re.compile('.*ESSID:"(.*?)"',
re.I | re.M | re.S), output)
if network:
network = misc.to_unicode(network)
return network

View File

@@ -33,6 +33,8 @@ from wicd import misc
from wicd import wnettools from wicd import wnettools
from wicd import wpath from wicd import wpath
from wicd.wnettools import * from wicd.wnettools import *
from wicd.wnettools import strength_pattern, altstrength_pattern, wep_pattern, \
signaldbm_pattern
import iwscan import iwscan
import wpactrl import wpactrl
@@ -58,14 +60,6 @@ Dependencies:
python-wpactrl (http://projects.otaku42.de/wiki/PythonWpaCtrl) python-wpactrl (http://projects.otaku42.de/wiki/PythonWpaCtrl)
python-iwscan (http://projects.otaku42.de/browser/python-iwscan/)""" python-iwscan (http://projects.otaku42.de/browser/python-iwscan/)"""
strength_pattern = re.compile('.*Quality:?=? ?(\d+)\s*/?\s*(\d*)',
re.I | re.M | re.S)
altstrength_pattern = re.compile('.*Signal level:?=? ?(\d\d*)',
re.I | re.M | re.S)
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)',
re.I | re.M | re.S)
wep_pattern = re.compile('.*Encryption key:(.*?)\n', re.I | re.M | re.S)
RALINK_DRIVER = 'ralink legacy' RALINK_DRIVER = 'ralink legacy'
# Got these from /usr/include/linux/wireless.h # Got these from /usr/include/linux/wireless.h

View File

@@ -57,7 +57,7 @@ ROUTE = 2
GKSUDO = 1 GKSUDO = 1
KDESU = 2 KDESU = 2
KTSUSS = 3 KTSUSS = 3
sudo_dict = { __sudo_dict = {
AUTO : "", AUTO : "",
GKSUDO : "gksudo", GKSUDO : "gksudo",
KDESU : "kdesu", KDESU : "kdesu",
@@ -212,19 +212,34 @@ def ParseEncryption(network):
""" """
enctemplate = open(wpath.encryption + network["enctype"]) enctemplate = open(wpath.encryption + network["enctype"])
template = enctemplate.readlines() template = enctemplate.readlines()
# Set these to nothing so that we can hold them outside the loop config_file = "ap_scan=1\n"
z = "ap_scan=1\n" should_replace = False
# Loop through the lines in the template, selecting ones to use for index, line in enumerate(template):
for y, x in enumerate(template): if not should_replace:
x = x.strip("\n") if line.strip().startswith('---'):
if y > 4: should_replace = True
# replace values else:
x = x.replace("$_SCAN","0") if line.strip().startswith("}"):
for t in network: # This is the last line, so we just write it.
# Don't bother if z's value is None cause it will cause errors config_file = ''.join([config_file, line])
if Noneify(network[t]) is not None: elif "$_" in line:
x = x.replace("$_" + str(t).upper(), str(network[t])) cur_val = re.findall('\$_([A-Z0-9]+)', line)
z = z + "\n" + x if cur_val:
if cur_val[0] == 'SCAN':
#TODO should this be hardcoded?
line = line.replace("$_SCAN", "0")
config_file = ''.join([config_file, line])
else:
rep_val = network.get(cur_val[0].lower())
if rep_val:
line = line.replace("$_%s" % cur_val[0], rep_val)
config_file = ''.join([config_file, line])
else:
print "Ignoring template line: '%s'" % line
else:
print "Weird parsing error occurred"
else: # Just a regular entry.
config_file = ''.join([config_file, line])
# Write the data to the files then chmod them so they can't be read # Write the data to the files then chmod them so they can't be read
# by normal users. # by normal users.
@@ -233,7 +248,7 @@ def ParseEncryption(network):
os.chown(wpath.networks + network["bssid"].replace(":", "").lower(), 0, 0) os.chown(wpath.networks + network["bssid"].replace(":", "").lower(), 0, 0)
# We could do this above, but we'd like to read protect # We could do this above, but we'd like to read protect
# them before we write, so that it can't be read. # them before we write, so that it can't be read.
f.write(z) f.write(config_file)
f.close() f.close()
def LoadEncryptionMethods(): def LoadEncryptionMethods():
@@ -409,7 +424,7 @@ def get_sudo_cmd(msg, prog_num=0):
def choose_sudo_prog(prog_num=0): def choose_sudo_prog(prog_num=0):
""" Try to intelligently decide which graphical sudo program to use. """ """ Try to intelligently decide which graphical sudo program to use. """
if prog_num: if prog_num:
return find_path(sudo_dict[prog_num]) return find_path(__sudo_dict[prog_num])
desktop_env = detect_desktop_environment() desktop_env = detect_desktop_environment()
env_path = os.environ['PATH'].split(":") env_path = os.environ['PATH'].split(":")
paths = [] paths = []

View File

@@ -45,7 +45,7 @@ daemon = dbus_dict["daemon"]
wired = dbus_dict["wired"] wired = dbus_dict["wired"]
wireless = dbus_dict["wireless"] wireless = dbus_dict["wireless"]
monitor = to_time = update_callback = None monitor = to_time = update_callback = mainloop = None
def diewithdbus(func): def diewithdbus(func):
def wrapper(self, *__args, **__kargs): def wrapper(self, *__args, **__kargs):
@@ -54,11 +54,11 @@ def diewithdbus(func):
self.__lost_dbus_count = 0 self.__lost_dbus_count = 0
return ret return ret
except dbusmanager.DBusException, e: except dbusmanager.DBusException, e:
print "Caught exception %e" % str(e) print "Caught exception %s" % str(e)
if not hasattr(self, "__lost_dbus_count"): if not hasattr(self, "__lost_dbus_count"):
self.__lost_dbus_count = 0 self.__lost_dbus_count = 0
if self.__lost_dbus_count > 3: if self.__lost_dbus_count > 3:
sys.exit(1) mainloop.quit()
self.__lost_dbus_count += 1 self.__lost_dbus_count += 1
return True return True
@@ -351,7 +351,7 @@ def main():
an amount of time determined by the active backend. an amount of time determined by the active backend.
""" """
global monitor, to_time global monitor, to_time, mainloop
monitor = ConnectionStatus() monitor = ConnectionStatus()
to_time = daemon.GetBackendUpdateInterval() to_time = daemon.GetBackendUpdateInterval()

View File

@@ -999,3 +999,4 @@ class WiredConnectThread(ConnectThread):
self.connect_result = "Success" self.connect_result = "Success"
self.is_connecting = False self.is_connecting = False

View File

@@ -434,7 +434,7 @@ class TrayIcon(object):
""" Determines which image to use for the wireless entries. """ """ Determines which image to use for the wireless entries. """
def fix_strength(val, default): def fix_strength(val, default):
""" Assigns given strength to a default value if needed. """ """ Assigns given strength to a default value if needed. """
return val is not None and int(val) or default return val and int(val) or default
def get_prop(prop): def get_prop(prop):
return wireless.GetWirelessProperty(net_id, prop) return wireless.GetWirelessProperty(net_id, prop)

View File

@@ -34,21 +34,43 @@ class WirelessInterface() -- Control a wireless network interface.
import os import os
import re import re
import random import random
import time
from string import maketrans, translate from string import maketrans, translate
import wpath import wpath
import misc import misc
from misc import find_path from misc import find_path
RALINK_DRIVER = 'ralink legacy' # Regular expressions.
__re_mode = (re.I | re.M | re.S)
essid_pattern = re.compile('.*ESSID:"?(.*?)"?\s*\n', __re_mode)
ap_mac_pattern = re.compile('.*Address: (.*?)\n', __re_mode)
channel_pattern = re.compile('.*Channel:? ?(\d\d?)', __re_mode)
strength_pattern = re.compile('.*Quality:?=? ?(\d+)\s*/?\s*(\d*)', __re_mode)
altstrength_pattern = re.compile('.*Signal level:?=? ?(\d\d*)', __re_mode)
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)', __re_mode)
mode_pattern = re.compile('.*Mode:(.*?)\n', __re_mode)
freq_pattern = re.compile('.*Frequency:(.*?)\n', __re_mode)
wep_pattern = re.compile('.*Encryption key:(.*?)\n', __re_mode)
altwpa_pattern = re.compile('(wpa_ie)', __re_mode)
wpa1_pattern = re.compile('(WPA Version 1)', __re_mode)
wpa2_pattern = re.compile('(WPA2)', __re_mode)
#iwconfig-only regular expressions.
ip_pattern = re.compile(r'inet [Aa]d?dr[^.]*:([^.]*\.[^.]*\.[^.]*\.[0-9]*)',re.S)
bssid_pattern = re.compile('.*Access Point: (([0-9A-Z]{2}:){5}[0-9A-Z]{2})', __re_mode)
# Regular expressions for wpa_cli output
auth_pattern = re.compile('.*wpa_state=(.*?)\n', __re_mode)
RALINK_DRIVER = 'ralink legacy'
blacklist_strict = '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~ ' blacklist_strict = '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~ '
blacklist_norm = ";`$!*|><&\\" blacklist_norm = ";`$!*|><&\\"
blank_trans = maketrans("", "") blank_trans = maketrans("", "")
__all__ = ["SetDNS", "GetDefaultGateway", "GetWiredInterfaces", __all__ = ["SetDNS", "GetDefaultGateway", "GetWiredInterfaces",
"GetWirelessInterfaces", "IsValidWpaSuppDriver", "NeedsExternalCalls"] "GetWirelessInterfaces", "IsValidWpaSuppDriver"]
def _sanitize_string(string): def _sanitize_string(string):
if string: if string:
@@ -135,7 +157,6 @@ def NeedsExternalCalls():
""" Returns True if the backend needs to use an external program. """ """ Returns True if the backend needs to use an external program. """
raise NotImplementedError raise NotImplementedError
def IsValidWpaSuppDriver(driver): def IsValidWpaSuppDriver(driver):
""" Returns True if given string is a valid wpa_supplicant driver. """ """ Returns True if given string is a valid wpa_supplicant driver. """
output = misc.Run(["wpa_supplicant", "-D%s" % driver, "-iolan19", output = misc.Run(["wpa_supplicant", "-D%s" % driver, "-iolan19",
@@ -529,16 +550,46 @@ class BaseInterface(object):
The IP address of the interface in dotted quad form. The IP address of the interface in dotted quad form.
""" """
raise NotImplementedError if not ifconfig:
cmd = 'ifconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = ifconfig
return misc.RunRegex(ip_pattern, output)
def IsUp(self): def IsUp(self, ifconfig=None):
""" Determines if the interface is up. """ Determines if the interface is up.
Returns: Returns:
True if the interface is up, False otherwise. True if the interface is up, False otherwise.
""" """
raise NotImplementedError if not self.iface: return False
flags_file = '/sys/class/net/%s/flags' % self.iface
try:
flags = open(flags_file, "r").read().strip()
except IOError:
print "Could not open %s, using ifconfig to determine status" % flags_file
return self._slow_is_up(ifconfig)
return bool(int(flags, 16) & 1)
def _slow_is_up(self, ifconfig=None):
""" Determine if an interface is up using ifconfig. """
if not ifconfig:
cmd = "ifconfig " + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = ifconfig
lines = output.split('\n')
if len(lines) < 5:
return False
for line in lines[1:4]:
if line.strip().startswith('UP'):
return True
return False
class BaseWiredInterface(BaseInterface): class BaseWiredInterface(BaseInterface):
@@ -564,7 +615,85 @@ class BaseWiredInterface(BaseInterface):
True if a link is detected, False otherwise. True if a link is detected, False otherwise.
""" """
raise NotImplementedError if not self.iface:
return False
# check for link using /sys/class/net/iface/carrier
# is usually more accurate
sys_device = '/sys/class/net/%s/' % self.iface
carrier_path = sys_device + 'carrier'
if not self.IsUp():
MAX_TRIES = 3
tries = 0
self.Up()
while True:
tries += 1
time.sleep(2)
if self.IsUp() or tries > MAX_TRIES: break
if os.path.exists(carrier_path):
carrier = open(carrier_path, 'r')
try:
link = carrier.read().strip()
link = int(link)
if link == 1:
return True
elif link == 0:
return False
except (IOError, ValueError, TypeError):
print 'Error checking link using /sys/class/net/%s/carrier' % self.iface
if self.ethtool_cmd and self.link_detect in [misc.ETHTOOL, misc.AUTO]:
return self._eth_get_plugged_in()
elif self.miitool_cmd and self.link_detect in [misc.MIITOOL, misc.AUTO]:
return self._mii_get_plugged_in()
else:
print ('Error: No way of checking for a wired connection. Make ' +
'sure that either mii-tool or ethtool is installed.')
return False
def _eth_get_plugged_in(self):
""" Use ethtool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
"""
cmd = "%s %s" % (self.ethtool_cmd, self.iface)
if not self.IsUp():
print 'Wired Interface is down, putting it up'
self.Up()
time.sleep(6)
if self.verbose: print cmd
tool_data = misc.Run(cmd, include_stderr=True)
if misc.RunRegex(re.compile('(Link detected: yes)', re.I | re.M | re.S),
tool_data):
return True
else:
return False
def _mii_get_plugged_in(self):
""" Use mii-tool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
"""
cmd = "%s %s" % (self.miitool_cmd, self.iface)
if self.verbose: print cmd
tool_data = misc.Run(cmd, include_stderr=True)
if misc.RunRegex(re.compile('(Invalid argument)', re.I | re.M | re.S),
tool_data) is not None:
print 'Wired Interface is down, putting it up'
self.Up()
time.sleep(4)
if self.verbose: print cmd
tool_data = misc.Run(cmd, include_stderr=True)
if misc.RunRegex(re.compile('(link ok)', re.I | re.M | re.S),
tool_data) is not None:
return True
else:
return False
class BaseWirelessInterface(BaseInterface): class BaseWirelessInterface(BaseInterface):
@@ -596,10 +725,6 @@ class BaseWirelessInterface(BaseInterface):
if self.verbose: print str(cmd) if self.verbose: print str(cmd)
misc.Run(cmd) misc.Run(cmd)
def StopWPA(self):
""" Stop wireless encryption. """
raise NotImplementedError
def GetKillSwitchStatus(self): def GetKillSwitchStatus(self):
""" Determines if the wireless killswitch is enabled. """ Determines if the wireless killswitch is enabled.
@@ -823,10 +948,6 @@ class BaseWirelessInterface(BaseInterface):
if self.verbose: print cmd if self.verbose: print cmd
misc.Run(cmd) misc.Run(cmd)
def ValidateAuthentication(self, auth_time):
""" Validate WPA authentication. """
raise NotImplementedError
def _AuthenticateRalinkLegacy(self, network): def _AuthenticateRalinkLegacy(self, network):
""" Authenticate with the specified wireless network. """ Authenticate with the specified wireless network.
@@ -866,9 +987,207 @@ class BaseWirelessInterface(BaseInterface):
if self.verbose: print ' '.join(cmd) if self.verbose: print ' '.join(cmd)
misc.Run(cmd) misc.Run(cmd)
def GetNetworks(self):
""" Get a list of available wireless networks.
Returns:
A list containing available wireless networks.
"""
cmd = 'iwlist ' + self.iface + ' scan'
if self.verbose: print cmd
results = misc.Run(cmd)
# Split the networks apart, using Cell as our split point
# this way we can look at only one network at a time.
# The spaces around ' Cell ' are to minimize the chance that someone
# has an essid named Cell...
networks = results.split( ' Cell ' )
# Get available network info from iwpriv get_site_survey
# if we're using a ralink card (needed to get encryption info)
if self.wpa_driver == RALINK_DRIVER:
ralink_info = self._GetRalinkInfo()
else:
ralink_info = None
# An array for the access points
access_points = []
for cell in networks:
# Only use sections where there is an ESSID.
if 'ESSID:' in cell:
# Add this network to the list of networks
entry = self._ParseAccessPoint(cell, ralink_info)
if entry is not None:
access_points.append(entry)
return access_points
def _ParseAccessPoint(self, cell, ralink_info):
""" Parse a single cell from the output of iwlist.
Keyword arguments:
cell -- string containing the cell information
ralink_info -- string contating network information needed
for ralink cards.
Returns:
A dictionary containing the cell networks properties.
"""
ap = {}
ap['essid'] = misc.RunRegex(essid_pattern, cell)
try:
ap['essid'] = misc.to_unicode(ap['essid'])
except (UnicodeDecodeError, UnicodeEncodeError):
print 'Unicode problem with current network essid, ignoring!!'
return None
if ap['essid'] in ['<hidden>', ""]:
ap['hidden'] = True
ap['essid'] = "<hidden>"
else:
ap['hidden'] = False
# Channel - For cards that don't have a channel number,
# convert the frequency.
ap['channel'] = misc.RunRegex(channel_pattern, cell)
if ap['channel'] == None:
freq = misc.RunRegex(freq_pattern, cell)
ap['channel'] = self._FreqToChannel(freq)
# BSSID
ap['bssid'] = misc.RunRegex(ap_mac_pattern, cell)
# Mode
ap['mode'] = misc.RunRegex(mode_pattern, cell)
# Break off here if we're using a ralink card
if self.wpa_driver == RALINK_DRIVER:
ap = self._ParseRalinkAccessPoint(ap, ralink_info, cell)
elif misc.RunRegex(wep_pattern, cell) == 'on':
# Encryption - Default to WEP
ap['encryption'] = True
ap['encryption_method'] = 'WEP'
if misc.RunRegex(wpa1_pattern, cell) == 'WPA Version 1':
ap['encryption_method'] = 'WPA'
if misc.RunRegex(altwpa_pattern, cell) == 'wpa_ie':
ap['encryption_method'] = 'WPA'
if misc.RunRegex(wpa2_pattern, cell) == 'WPA2':
ap['encryption_method'] = 'WPA2'
else:
ap['encryption'] = False
# Link Quality
# Set strength to -1 if the quality is not found
# more of the patch from
# https://bugs.launchpad.net/wicd/+bug/175104
if (strength_pattern.match(cell)):
[(strength, max_strength)] = strength_pattern.findall(cell)
if max_strength:
ap["quality"] = 100 * int(strength) // int(max_strength)
else:
ap["quality"] = int(strength)
elif misc.RunRegex(altstrength_pattern,cell):
ap['quality'] = misc.RunRegex(altstrength_pattern, cell)
else:
ap['quality'] = -1
# Signal Strength (only used if user doesn't want link
# quality displayed or it isn't found)
if misc.RunRegex(signaldbm_pattern, cell):
ap['strength'] = misc.RunRegex(signaldbm_pattern, cell)
elif self.wpa_driver != RALINK_DRIVER: # This is already set for ralink
ap['strength'] = -1
return ap
def ValidateAuthentication(self, auth_time):
""" Validate WPA authentication.
Validate that the wpa_supplicant authentication
process was successful.
NOTE: It's possible this could return False,
though in reality wpa_supplicant just isn't
finished yet.
Keyword arguments:
auth_time -- The time at which authentication began.
Returns:
True if wpa_supplicant authenticated succesfully,
False otherwise.
"""
# Right now there's no way to do this for these drivers
if self.wpa_driver == RALINK_DRIVER or not self.wpa_cli_cmd:
return True
MAX_TIME = 35
MAX_DISCONNECTED_TIME = 3
disconnected_time = 0
while (time.time() - auth_time) < MAX_TIME:
cmd = '%s -i %s status' % (self.wpa_cli_cmd, self.iface)
output = misc.Run(cmd)
result = misc.RunRegex(auth_pattern, output)
if self.verbose:
print 'WPA_CLI RESULT IS', result
if not result:
return False
if result == "COMPLETED":
return True
elif result == "DISCONNECTED":
disconnected_time += 1
if disconnected_time > MAX_DISCONNECTED_TIME:
disconnected_time = 0
# Force a rescan to get wpa_supplicant moving again.
self._ForceSupplicantScan()
MAX_TIME += 5
else:
disconnected_time = 0
time.sleep(1)
print 'wpa_supplicant authentication may have failed.'
return False
def _ForceSupplicantScan(self):
""" Force wpa_supplicant to rescan available networks.
This function forces wpa_supplicant to rescan.
This works around authentication validation sometimes failing for
wpa_supplicant because it remains in a DISCONNECTED state for
quite a while, after which a rescan is required, and then
attempting to authenticate. This whole process takes a long
time, so we manually speed it up if we see it happening.
"""
print 'wpa_supplicant rescan forced...'
cmd = 'wpa_cli -i' + self.iface + ' scan'
misc.Run(cmd)
def StopWPA(self):
""" Terminates wpa using wpa_cli"""
cmd = 'wpa_cli -i %s terminate' % self.iface
if self.verbose: print cmd
misc.Run(cmd)
def GetBSSID(self, iwconfig=None): def GetBSSID(self, iwconfig=None):
""" Get the MAC address for the interface. """ """ Get the MAC address for the interface. """
raise NotImplementedError if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
bssid = misc.RunRegex(bssid_pattern, output)
return bssid
def GetSignalStrength(self, iwconfig=None): def GetSignalStrength(self, iwconfig=None):
""" Get the signal strength of the current network. """ Get the signal strength of the current network.
@@ -877,7 +1196,28 @@ class BaseWirelessInterface(BaseInterface):
The signal strength. The signal strength.
""" """
raise NotImplementedError if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
strength_pattern = re.compile('.*Quality:?=? ?(\d+)\s*/?\s*(\d*)',
re.I | re.M | re.S)
altstrength_pattern = re.compile('.*Signal level:?=? ?(\d\d*)', re.I | re.M | re.S)
[(strength, max_strength)] = strength_pattern.findall(output)
if max_strength and strength:
if int(max_strength) != 0:
return 100 * int(strength) // int(max_strength)
else:
# Prevent a divide by zero error.
strength = int(strength)
if strength is None:
strength = misc.RunRegex(altstrength_pattern, output)
return strength
def GetDBMStrength(self, iwconfig=None): def GetDBMStrength(self, iwconfig=None):
""" Get the dBm signal strength of the current network. """ Get the dBm signal strength of the current network.
@@ -886,7 +1226,16 @@ class BaseWirelessInterface(BaseInterface):
The dBm signal strength. The dBm signal strength.
""" """
raise NotImplementedError if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
signaldbm_pattern = re.compile('.*Signal level:?=? ?(-\d\d*)',
re.I | re.M | re.S)
dbm_strength = misc.RunRegex(signaldbm_pattern, output)
return dbm_strength
def GetCurrentNetwork(self, iwconfig=None): def GetCurrentNetwork(self, iwconfig=None):
""" Get the essid of the current network. """ Get the essid of the current network.
@@ -895,4 +1244,14 @@ class BaseWirelessInterface(BaseInterface):
The current network essid. The current network essid.
""" """
raise NotImplementedError if not iwconfig:
cmd = 'iwconfig ' + self.iface
if self.verbose: print cmd
output = misc.Run(cmd)
else:
output = iwconfig
network = misc.RunRegex(re.compile('.*ESSID:"(.*?)"',
re.I | re.M | re.S), output)
if network:
network = misc.to_unicode(network)
return network