1
0
mirror of https://github.com/gryf/wicd.git synced 2026-03-13 04:55:46 +01:00

Style changes for python files

This commit is contained in:
2020-08-01 11:25:13 +02:00
parent c401f2963b
commit 40a7a8ac5d
32 changed files with 2775 additions and 2614 deletions

View File

@@ -1 +1 @@
""" WICD core module. """
"""WICD core module."""

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
""" autoconnect -- Triggers an automatic connection attempt. """
"""autoconnect -- Triggers an automatic connection attempt."""
#
# Copyright (C) 2007 - 2009 Adam Blackburn
@@ -18,19 +18,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from wicd import dbusmanager
import sys
import time
import dbus
import time
import sys
if getattr(dbus, 'version', (0, 0, 0)) < (0, 80, 0):
import dbus.glib
else:
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
from wicd import dbusmanager
try:
dbusmanager.connect_to_dbus()
daemon = dbusmanager.get_interface('daemon')
@@ -40,20 +39,24 @@ except Exception as e:
print('Could not connect to daemon.', file=sys.stderr)
sys.exit(1)
def handler(*args):
""" No-op handler. """
"""No-op handler."""
pass
def error_handler(*args):
""" Error handler. """
"""Error handler."""
print('Async error autoconnecting.', file=sys.stderr)
sys.exit(3)
if __name__ == '__main__':
try:
time.sleep(2)
daemon.SetSuspend(False)
if not daemon.CheckIfConnecting():
daemon.AutoConnect(True, reply_handler=handler,
daemon.AutoConnect(True, reply_handler=handler,
error_handler=error_handler)
except Exception as e:
print("Exception caught: %s" % str(e), file=sys.stderr)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
""" Backend manager for wicd.
"""Backend manager for wicd.
Manages and loads the pluggable backends for wicd.
@@ -29,57 +29,58 @@ import os
import wicd.wpath as wpath
def fail(backend_name, reason):
""" Helper to warn the user about failure in loading backend. """
"""Helper to warn the user about failure in loading backend."""
print(("Failed to load backend %s: %s" % (backend_name, reason)))
return True
class BackendManager(object):
""" Manages, validates, and loads wicd backends. """
"""Manages, validates, and loads wicd backends."""
def __init__(self):
""" Initialize the backend manager. """
"""Initialize the backend manager."""
self.backend_dir = wpath.backends
self.__loaded_backend = None
def _valid_backend_file(self, be_file):
""" Make sure the backend file is valid. """
return (os.path.exists(be_file) and
"""Make sure the backend file is valid."""
return (os.path.exists(be_file) and
os.path.basename(be_file).startswith("be-") and
be_file.endswith(".py"))
def get_current_backend(self):
""" Returns the name of the loaded backend. """
"""Returns the name of the loaded backend."""
if self.__loaded_backend:
return self.__loaded_backend.NAME
else:
return None
def get_available_backends(self):
""" Returns a list of all valid backends in the backend directory. """
"""Returns a list of all valid backends in the backend directory."""
be_list = []
for f in os.listdir(self.backend_dir):
if self._valid_backend_file(os.path.join(self.backend_dir, f)):
be_list.append(f[3:-3])
return be_list or [""]
def get_update_interval(self):
""" Returns how often in seconds the wicd monitor should update. """
"""Returns how often in seconds the wicd monitor should update."""
if self.__loaded_backend:
return self.__loaded_backend.UPDATE_INTERVAL
else:
return None
def get_backend_description(self, backend_name):
""" Loads a backend and returns its description. """
"""Loads a backend and returns its description."""
backend = self._load_backend(backend_name)
if backend and backend.DESCRIPTION:
return backend.DESCRIPTION
else:
return "No backend data available"
def _load_backend(self, backend_name):
""" Imports a backend and returns the loaded module. """
"""Imports a backend and returns the loaded module."""
print(('trying to load backend %s' % backend_name))
backend_path = os.path.join(self.backend_dir,
'be-' + backend_name + '.py')
@@ -90,9 +91,9 @@ class BackendManager(object):
else:
fail(backend_name, 'Invalid backend file.')
return None
def _validate_backend(self, backend, backend_name):
""" Ensures that a backend module is valid. """
"""Ensures that a backend module is valid."""
failed = False
if not backend.NAME:
failed = fail(backend_name, 'Missing NAME attribute.')
@@ -105,20 +106,20 @@ class BackendManager(object):
if not backend.WirelessInterface:
failed = fail(backend_name, "Missing WirelessInterface class.")
return failed
def load_backend(self, backend_name):
""" Load and return a backend module.
"""Load and return a backend module.
Given a backend name be-foo, attempt to load a python module
in the backends directory called be-foo.py. The module must
include a certain set of classes and variables to be considered
valid.
"""
backend = self._load_backend(backend_name)
if not backend:
return None
failed = self._validate_backend(backend, backend_name)
if failed:
return None

View File

@@ -1 +1 @@
""" Backends module. """
"""Backends module."""

View File

@@ -29,9 +29,15 @@ class WirelessInterface() -- Control a wireless network interface.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from wicd.wnettools import GetDefaultGateway, GetWiredInterfaces, \
GetWirelessInterfaces, IsValidWpaSuppDriver, BaseWirelessInterface, \
BaseWiredInterface, BaseInterface, GetWpaSupplicantDrivers
from wicd.wnettools import BaseInterface
from wicd.wnettools import BaseWiredInterface
from wicd.wnettools import BaseWirelessInterface
from wicd.wnettools import GetDefaultGateway
from wicd.wnettools import GetWiredInterfaces
from wicd.wnettools import GetWirelessInterfaces
from wicd.wnettools import GetWpaSupplicantDrivers
from wicd.wnettools import IsValidWpaSuppDriver
NAME = "external"
UPDATE_INTERVAL = 5
@@ -46,14 +52,14 @@ more stable for some set ups.
def NeedsExternalCalls(*args, **kargs):
""" Return True, since this backend uses iwconfig/ifconfig. """
"""Return True, since this backend uses iwconfig/ifconfig."""
return True
class Interface(BaseInterface):
""" Control a network interface. """
"""Control a network interface."""
def __init__(self, iface, verbose=False):
""" Initialize the object.
"""Initialize the object.
Keyword arguments:
iface -- the name of the interface
@@ -62,12 +68,12 @@ class Interface(BaseInterface):
"""
BaseInterface.__init__(self, iface, verbose)
self.Check()
class WiredInterface(Interface, BaseWiredInterface):
""" Control a wired network interface. """
"""Control a wired network interface."""
def __init__(self, iface, verbose=False):
""" Initialise the wired network interface class.
"""Initialise the wired network interface class.
Keyword arguments:
iface -- name of the interface
@@ -79,9 +85,9 @@ class WiredInterface(Interface, BaseWiredInterface):
class WirelessInterface(Interface, BaseWirelessInterface):
""" Control a wireless network interface. """
"""Control a wireless network interface."""
def __init__(self, iface, verbose=False, wpa_driver='wext'):
""" Initialise the wireless network interface class.
"""Initialise the wireless network interface class.
Keyword arguments:
iface -- name of the interface

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" ioctl Network interface control tools for wicd.
"""ioctl Network interface control tools for wicd.
This module implements functions to control and obtain information from
network interfaces. It utilizes ioctl calls and python modules to
@@ -30,18 +30,33 @@ class WirelessInterface() -- Control a wireless network interface.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import array
import fcntl
import os
import socket
import struct
import time
from wicd import misc
from wicd import wpath
from wicd.wnettools import GetDefaultGateway, GetWiredInterfaces, \
GetWirelessInterfaces, IsValidWpaSuppDriver, BaseWirelessInterface, \
BaseWiredInterface, BaseInterface, GetWpaSupplicantDrivers, wep_pattern, \
signaldbm_pattern, neediface
from wicd.wnettools import BaseInterface
from wicd.wnettools import BaseWiredInterface
from wicd.wnettools import BaseWirelessInterface
from wicd.wnettools import GetDefaultGateway
from wicd.wnettools import GetWiredInterfaces
from wicd.wnettools import GetWirelessInterfaces
from wicd.wnettools import GetWpaSupplicantDrivers
from wicd.wnettools import IsValidWpaSuppDriver
from wicd.wnettools import neediface
from wicd.wnettools import signaldbm_pattern
from wicd.wnettools import wep_pattern
try:
import iwscan
IWSCAN_AVAIL = True
except ImportError:
print("WARNING: python-iwscan not found, falling back to using iwlist scan.")
print("WARNING: python-iwscan not found, falling back to using iwlist "
"scan.")
IWSCAN_AVAIL = False
try:
import wpactrl
@@ -50,14 +65,6 @@ except ImportError:
print("WARNING: python-wpactrl not found, falling back to using wpa_cli.")
WPACTRL_AVAIL = False
import re
import os
import time
import socket
import fcntl
import struct
import array
NAME = "ioctl"
UPDATE_INTERVAL = 4
@@ -90,7 +97,7 @@ SIOCGIFFLAGS = 0x8913
def get_iw_ioctl_result(iface, call):
""" Makes the given ioctl call and returns the results.
"""Makes the given ioctl call and returns the results.
Keyword arguments:
call -- The ioctl call to make
@@ -112,15 +119,16 @@ def get_iw_ioctl_result(iface, call):
return None
return buff.tostring()
def NeedsExternalCalls(*args, **kargs):
""" Return False, since this backend doesn't use any external apps. """
"""Return False, since this backend doesn't use any external apps."""
return False
class Interface(BaseInterface):
""" Control a network interface. """
"""Control a network interface."""
def __init__(self, iface, verbose=False):
""" Initialise the object.
"""Initialise the object.
Keyword arguments:
iface -- the name of the interface
@@ -132,13 +140,13 @@ class Interface(BaseInterface):
self.Check()
def CheckWirelessTools(self):
""" Check for the existence needed wireless tools """
"""Check for the existence needed wireless tools"""
if not WPACTRL_AVAIL:
BaseInterface.CheckWirelessTools(self)
@neediface("")
def GetIP(self, ifconfig=""):
""" Get the IP address of the interface.
"""Get the IP address of the interface.
Returns:
The IP address of the interface in dotted quad form.
@@ -156,7 +164,7 @@ class Interface(BaseInterface):
@neediface(False)
def IsUp(self, ifconfig=None):
""" Determines if the interface is up.
"""Determines if the interface is up.
Returns:
True if the interface is up, False otherwise.
@@ -175,9 +183,9 @@ class Interface(BaseInterface):
class WiredInterface(Interface, BaseWiredInterface):
""" Control a wired network interface. """
"""Control a wired network interface."""
def __init__(self, iface, verbose=False):
""" Initialise the wired network interface class.
"""Initialise the wired network interface class.
Keyword arguments:
iface -- name of the interface
@@ -189,7 +197,7 @@ class WiredInterface(Interface, BaseWiredInterface):
@neediface(False)
def GetPluggedIn(self):
""" Get the current physical connection state.
"""Get the current physical connection state.
The method will first attempt to use ethtool do determine
physical connection state. Should ethtool fail to run properly,
@@ -201,7 +209,8 @@ class WiredInterface(Interface, BaseWiredInterface):
"""
if self.ethtool_cmd and self.link_detect in [misc.ETHTOOL, misc.AUTO]:
return self._eth_get_plugged_in()
elif self.miitool_cmd and self.link_detect in [misc.MIITOOL, misc.AUTO]:
elif self.miitool_cmd and self.link_detect in [misc.MIITOOL,
misc.AUTO]:
return self._mii_get_plugged_in()
else:
print(('Error: No way of checking for a wired connection. Make' +
@@ -209,7 +218,7 @@ class WiredInterface(Interface, BaseWiredInterface):
return False
def _eth_get_plugged_in(self):
""" Use ethtool to determine the physical connection state.
"""Use ethtool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
@@ -231,7 +240,7 @@ class WiredInterface(Interface, BaseWiredInterface):
return bool(buff.tolist()[1])
def _mii_get_plugged_in(self):
""" Use mii-tool to determine the physical connection state.
"""Use mii-tool to determine the physical connection state.
Returns:
True if a link is detected, False otherwise.
@@ -253,25 +262,24 @@ class WiredInterface(Interface, BaseWiredInterface):
class WirelessInterface(Interface, BaseWirelessInterface):
""" Control a wireless network interface. """
"""Control a wireless network interface."""
def __init__(self, iface, verbose=False, wpa_driver='wext'):
""" Initialise the wireless network interface class.
"""Initialise the wireless network interface class.
Keyword arguments:
iface -- name of the interface
verbose -- print all commands
"""
BaseWirelessInterface.__init__(self, iface, verbose,
wpa_driver)
BaseWirelessInterface.__init__(self, iface, verbose, wpa_driver)
Interface.__init__(self, iface, verbose)
self.scan_iface = None
self.CheckWirelessTools()
@neediface([])
def GetNetworks(self, essid=None):
""" Get a list of available wireless networks.
"""Get a list of available wireless networks.
NOTE: the essid parameter is not used here,
it was added for the iwlist scan for hidden networks.
@@ -282,7 +290,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
if not IWSCAN_AVAIL:
# Use the slow version if python-iwscan isn't available.
return BaseWirelessInterface.GetNetworks(self)
if not self.scan_iface:
try:
self.scan_iface = iwscan.WirelessInterface(self.iface)
@@ -298,7 +306,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
return [_f for _f in [self._parse_ap(cell) for cell in results] if _f]
def _parse_ap(self, cell):
""" Parse a single cell from the python-iwscan list. """
"""Parse a single cell from the python-iwscan list."""
ap = {}
try:
ap['essid'] = misc.to_unicode(cell['essid'])
@@ -306,7 +314,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
print('Unicode problem with the current network essid, ignoring!!')
return None
if ap['essid'] in [ "", '<hidden>']:
if ap['essid'] in ["", '<hidden>']:
ap['essid'] = '<hidden>'
ap['hidden'] = True
else:
@@ -344,13 +352,14 @@ class WirelessInterface(Interface, BaseWirelessInterface):
# quality displayed or it isn't found)
if misc.RunRegex(signaldbm_pattern, cell["stats"]):
ap['strength'] = misc.RunRegex(signaldbm_pattern, cell["stats"])
elif self.wpa_driver != RALINK_DRIVER: # This is already set for ralink
ap['strength'] = -1
# This is already set for ralink
elif self.wpa_driver != RALINK_DRIVER:
ap['strength'] = -1
return ap
def _connect_to_wpa_ctrl_iface(self):
""" Connect to the wpa ctrl interface. """
"""Connect to the wpa ctrl interface."""
ctrl_iface = '/var/run/wpa_supplicant'
socket_loc = os.path.join(ctrl_iface, self.iface)
if os.path.exists(socket_loc):
@@ -360,12 +369,12 @@ class WirelessInterface(Interface, BaseWirelessInterface):
print(("Couldn't open ctrl_interface: %s" % e))
return None
else:
print(("Couldn't find a wpa_supplicant ctrl_interface for iface %s" \
% self.iface))
print(f"Couldn't find a wpa_supplicant ctrl_interface for iface "
f"{self.iface}")
return None
def ValidateAuthentication(self, auth_time):
""" Validate WPA authentication.
"""Validate WPA authentication.
Validate that the wpa_supplicant authentication
process was successful.
@@ -384,8 +393,9 @@ class WirelessInterface(Interface, BaseWirelessInterface):
"""
if not WPACTRL_AVAIL:
# If we don't have python-wpactrl, use the slow version.
return BaseWirelessInterface.ValidateAuthentication(self, auth_time)
return BaseWirelessInterface.ValidateAuthentication(self,
auth_time)
# Right now there's no way to do this for ralink drivers
if self.wpa_driver == RALINK_DRIVER:
return True
@@ -401,16 +411,16 @@ class WirelessInterface(Interface, BaseWirelessInterface):
while (time.time() - auth_time) < MAX_TIME:
try:
status = wpa.request("STATUS").split("\n")
except:
except Exception:
print("wpa_supplicant status query failed.")
return False
if self.verbose:
print(('wpa_supplicant ctrl_interface status query is %s' \
% str(status)))
print(f'wpa_supplicant ctrl_interface status query is '
f'{status}')
try:
[result] = [l for l in status if l.startswith("wpa_state=")]
[result] = [s for s in status if s.startswith("wpa_state=")]
except ValueError:
return False
@@ -431,7 +441,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
@neediface(False)
def StopWPA(self):
""" Terminates wpa_supplicant using its ctrl interface. """
"""Terminates wpa_supplicant using its ctrl interface."""
if not WPACTRL_AVAIL:
return BaseWirelessInterface.StopWPA(self)
wpa = self._connect_to_wpa_ctrl_iface()
@@ -440,7 +450,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
wpa.request("TERMINATE")
def _AuthenticateRalinkLegacy(self, network):
""" Authenticate with the specified wireless network.
"""Authenticate with the specified wireless network.
This function handles Ralink legacy cards that cannot use
wpa_supplicant.
@@ -449,58 +459,60 @@ class WirelessInterface(Interface, BaseWirelessInterface):
network -- dictionary containing network info
"""
if network.get('key') != None:
lines = self._GetRalinkInfo()
for x in lines:
info = x.split()
if len(info) < 5:
break
if info[2] == network.get('essid'):
if info[5] == 'WEP' or (info[5] == 'OPEN' and \
info[4] == 'WEP'):
if network.get('key') is None:
return
lines = self._GetRalinkInfo()
for x in lines:
info = x.split()
if len(info) < 5:
break
if info[2] == network.get('essid'):
if info[5] == 'WEP' or (info[5] == 'OPEN' and
info[4] == 'WEP'):
print('Setting up WEP')
cmd = ''.join(['iwconfig ', self.iface, ' key ',
network.get('key')])
if self.verbose:
print(cmd)
misc.Run(cmd)
else:
if info[5] == 'SHARED' and info[4] == 'WEP':
print('Setting up WEP')
cmd = ''.join(['iwconfig ', self.iface, ' key ',
network.get('key')])
auth_mode = 'SHARED'
key_name = 'Key1'
elif info[5] == 'WPA-PSK':
print('Setting up WPA-PSK')
auth_mode = 'WPAPSK'
key_name = 'WPAPSK'
elif info[5] == 'WPA2-PSK':
print('Setting up WPA2-PSK')
auth_mode = 'WPA2PSK'
key_name = 'WPAPSK'
else:
print("Unknown AuthMode, can\'t complete connection "
"process!")
return
cmd_list = []
cmd_list.append('NetworkType=' + info[6])
cmd_list.append('AuthMode=' + auth_mode)
cmd_list.append('EncrypType=' + info[4])
cmd_list.append('SSID=' + info[2])
cmd_list.append(key_name + '=' + network.get('key'))
if info[5] == 'SHARED' and info[4] == 'WEP':
cmd_list.append('DefaultKeyID=1')
cmd_list.append('SSID=' + info[2])
for cmd in cmd_list:
cmd = 'iwpriv ' + self.iface + ' '
if self.verbose:
print(cmd)
misc.Run(cmd)
else:
if info[5] == 'SHARED' and info[4] == 'WEP':
print('Setting up WEP')
auth_mode = 'SHARED'
key_name = 'Key1'
elif info[5] == 'WPA-PSK':
print('Setting up WPA-PSK')
auth_mode = 'WPAPSK'
key_name = 'WPAPSK'
elif info[5] == 'WPA2-PSK':
print('Setting up WPA2-PSK')
auth_mode = 'WPA2PSK'
key_name = 'WPAPSK'
else:
print(('Unknown AuthMode, can\'t complete ' + \
'connection process!'))
return
cmd_list = []
cmd_list.append('NetworkType=' + info[6])
cmd_list.append('AuthMode=' + auth_mode)
cmd_list.append('EncrypType=' + info[4])
cmd_list.append('SSID=' + info[2])
cmd_list.append(key_name + '=' + network.get('key'))
if info[5] == 'SHARED' and info[4] == 'WEP':
cmd_list.append('DefaultKeyID=1')
cmd_list.append('SSID=' + info[2])
for cmd in cmd_list:
cmd = 'iwpriv ' + self.iface + ' '
if self.verbose:
print(cmd)
misc.Run(cmd)
@neediface("")
def GetBSSID(self, iwconfig=None):
""" Get the MAC address for the interface. """
"""Get the MAC address for the interface."""
data = (self.iface + '\0' * 32)[:32]
try:
result = fcntl.ioctl(self.sock.fileno(), SIOCGIWAP, data)[16:]
@@ -513,7 +525,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
@neediface("")
def GetCurrentBitrate(self, iwconfig=None):
""" Get the current bitrate for the interface. """
"""Get the current bitrate for the interface."""
data = (self.iface + '\0' * 32)[:32]
fmt = "ihbb"
size = struct.calcsize(fmt)
@@ -526,19 +538,19 @@ class WirelessInterface(Interface, BaseWirelessInterface):
f, e, x, x = struct.unpack(fmt, result[:size])
return "%s %s" % ((f / 1000000), 'Mb/s')
#def GetOperationalMode(self, iwconfig=None):
# """ Get the operational mode for the interface. """
# TODO: implement me
# return ''
# def GetOperationalMode(self, iwconfig=None):
# """ Get the operational mode for the interface."""
# TODO: implement me
# return ''
#def GetAvailableAuthMethods(self, iwlistauth=None):
# """ Get the authentication methods for the interface. """
# TODO: Implement me
# return ''
# def GetAvailableAuthMethods(self, iwlistauth=None):
# """ Get the authentication methods for the interface."""
# TODO: Implement me
# return ''
@neediface(-1)
def GetSignalStrength(self, iwconfig=None):
""" Get the signal strength of the current network.
"""Get the signal strength of the current network.
Returns:
The signal strength.
@@ -555,7 +567,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
return None
def _get_max_strength(self):
""" Gets the maximum possible strength from the wireless driver. """
"""Gets the maximum possible strength from the wireless driver."""
buff = array.array('c', '\0' * 700)
addr, length = buff.buffer_info()
arg = struct.pack('Pi', addr, length)
@@ -576,7 +588,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
@neediface(-100)
def GetDBMStrength(self, iwconfig=None):
""" Get the dBm signal strength of the current network.
"""Get the dBm signal strength of the current network.
Returns:
The dBm signal strength.
@@ -590,7 +602,7 @@ class WirelessInterface(Interface, BaseWirelessInterface):
@neediface("")
def GetCurrentNetwork(self, iwconfig=None):
""" Get the essid of the current network.
"""Get the essid of the current network.
Returns:
The current network essid.

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
""" configmanager -- Wicd configuration file manager
"""configmanager -- Wicd configuration file manager
Wrapper around ConfigParser for wicd, though it should be
reusable for other purposes as well.
@@ -25,17 +25,18 @@ reusable for other purposes as well.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys, os
from configparser import RawConfigParser, ParsingError
import codecs
from wicd.misc import Noneify, to_unicode
import configparser
import os
import sys
from dbus import Int32
from wicd.misc import Noneify, to_unicode
def sanitize_config_file(path):
""" Remove invalid lines from config file. """
"""Remove invalid lines from config file."""
conf = open(path)
newconf = ''
for line in conf:
@@ -46,10 +47,11 @@ def sanitize_config_file(path):
conf.write(newconf)
conf.close()
class ConfigManager(RawConfigParser):
""" A class that can be used to manage a given configuration file. """
class ConfigManager(configparser.RawConfigParser):
"""A class that can be used to manage a given configuration file."""
def __init__(self, path, debug=False, mark_whitespace="`'`"):
RawConfigParser.__init__(self)
configparser.RawConfigParser.__init__(self)
self.config_file = path
self.debug = debug
self.mrk_ws = mark_whitespace
@@ -57,55 +59,55 @@ class ConfigManager(RawConfigParser):
sanitize_config_file(path)
try:
self.read(path)
except ParsingError:
except configparser.ParsingError:
self.write()
try:
self.read(path)
except ParsingError as p:
except configparser.ParsingError as p:
print(("Could not start wicd: %s" % p.message))
sys.exit(1)
def __repr__(self):
return self.config_file
def __str__(self):
return self.config_file
def get_config(self):
""" Returns the path to the loaded config file. """
"""Returns the path to the loaded config file."""
return self.config_file
def set_option(self, section, option, value, write=False):
""" Wrapper around ConfigParser.set
"""Wrapper around ConfigParser.set
Adds the option to write the config file change right away.
Also forces all the values being written to type str, and
adds the section the option should be written to if it
doesn't exist already.
"""
if not self.has_section(section):
self.add_section(section)
if isinstance(value, str):
value = to_unicode(value)
if value.startswith(' ') or value.endswith(' '):
value = "%(ws)s%(value)s%(ws)s" % {"value" : value,
"ws" : self.mrk_ws}
RawConfigParser.set(self, section, str(option), value)
value = "%(ws)s%(value)s%(ws)s" % {"value": value,
"ws": self.mrk_ws}
configparser.RawConfigParser.set(self, section, str(option), value)
if write:
self.write()
def set(self, *args, **kargs):
""" Calls the set_option method. """
"""Calls the set_option method."""
self.set_option(*args, **kargs)
def get_option(self, section, option, default="__None__"):
""" Wrapper around ConfigParser.get.
"""Wrapper around ConfigParser.get.
Automatically adds any missing sections, adds the ability
to write a default value, and if one is provided prints if
the default or a previously saved value is returned.
"""
if not self.has_section(section):
if default != "__None__":
@@ -114,31 +116,29 @@ class ConfigManager(RawConfigParser):
return None
if self.has_option(section, option):
ret = RawConfigParser.get(self, section, option)
if (isinstance(ret, str) and ret.startswith(self.mrk_ws)
and ret.endswith(self.mrk_ws)):
ret = configparser.RawConfigParser.get(self, section, option)
if (isinstance(ret, str) and ret.startswith(self.mrk_ws)
and ret.endswith(self.mrk_ws)):
ret = ret[3:-3]
ret = to_unicode(ret)
if default:
if self.debug:
# mask out sensitive information
if option in ['apsk', 'password', 'identity', \
'private_key', 'private_key_passwd', \
if option in ['apsk', 'password', 'identity',
'private_key', 'private_key_passwd',
'key', 'passphrase']:
print((''.join(['found ', option, \
' in configuration *****'])))
print(f'found {option} in configuration *****')
else:
print((''.join(['found ', option, ' in configuration ',
str(ret)])))
else: # Use the default, unless no default was provided
print(f'found {option} in configuration {ret}')
else: # Use the default, unless no default was provided
if default != "__None__":
print(('did not find %s in configuration, setting default %s' \
% (option, str(default))))
print(f'did not find {option} in configuration, setting '
f'default {default}')
self.set(section, option, str(default), write=True)
ret = default
else:
ret = None
# Try to intelligently handle the type of the return value.
try:
if not ret.startswith('0') or len(ret) == 1:
@@ -152,52 +152,53 @@ class ConfigManager(RawConfigParser):
except OverflowError:
ret = str(ret)
return to_unicode(ret)
def get(self, *args, **kargs):
""" Calls the get_option method """
"""Calls the get_option method"""
return self.get_option(*args, **kargs)
def _write_one(self):
""" Writes the loaded config file to disk. """
"""Writes the loaded config file to disk."""
for section in self.sections():
if not section:
self.remove_section(section)
configfile = open(self.config_file, 'w')
RawConfigParser.write(self, configfile)
configparser.RawConfigParser.write(self, configfile)
configfile.close()
def remove_section(self, section):
""" Wrapper around the ConfigParser.remove_section() method.
"""Wrapper around the ConfigParser.remove_section() method.
This method only calls the ConfigParser.remove_section() method
if the section actually exists.
"""
if self.has_section(section):
RawConfigParser.remove_section(self, section)
configparser.RawConfigParser.remove_section(self, section)
def reload(self):
""" Re-reads the config file, in case it was edited out-of-band. """
"""Re-reads the config file, in case it was edited out-of-band."""
self.read(self.config_file)
def read(self, path):
""" Reads the config file specified by 'path' then reads all the
"""Reads the config file specified by 'path' then reads all the
files in the directory obtained by adding '.d' to 'path'. The files
in the '.d' directory are read in normal sorted order and section
entries in these files override entries in the main file.
"""
if os.path.exists(path):
RawConfigParser.readfp(self, codecs.open(path, 'r', 'utf-8'))
configparser.RawConfigParser.readfp(self, codecs.open(path, 'r',
'utf-8'))
path_d = path + ".d"
files = []
if os.path.exists(path_d):
files = [ os.path.join(path_d, f) for f in os.listdir(path_d) ]
files = [os.path.join(path_d, f) for f in os.listdir(path_d)]
files.sort()
for fname in files:
p = RawConfigParser()
p = configparser.RawConfigParser()
p.readfp(codecs.open(fname, 'r', 'utf-8'))
for section_name in p.sections():
# New files override old, so remove first to avoid
@@ -209,9 +210,8 @@ class ConfigManager(RawConfigParser):
# Store the filename this section was read from.
self.set(section_name, '_filename_', fname)
def _copy_section(self, name):
""" Copy whole section from config file. """
"""Copy whole section from config file."""
p = ConfigManager("", self.debug, self.mrk_ws)
p.add_section(name)
for (iname, value) in self.items(name):
@@ -222,7 +222,7 @@ class ConfigManager(RawConfigParser):
return p
def write(self, fp=None):
""" Writes the loaded config file to disk. """
"""Writes the loaded config file to disk."""
in_this_file = []
for sname in sorted(self.sections()):
fname = self.get_option(sname, '_filename_')
@@ -243,4 +243,3 @@ class ConfigManager(RawConfigParser):
p.set(sname, iname, value)
p.remove_option(sname, '_filename_')
p._write_one()

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env python3
""" The wicd DBus Manager.
"""The wicd DBus Manager.
A module for managing wicd's dbus interfaces.
@@ -29,74 +27,81 @@ if getattr(dbus, "version", (0, 0, 0)) < (0, 80, 0):
else:
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
DBUS_MANAGER = None
def get_dbus_ifaces():
""" Return available DBus interfaces. """
"""Return available DBus interfaces."""
return DBUS_MANAGER.get_dbus_ifaces()
def get_interface(iface):
""" Return specified interface. """
"""Return specified interface."""
return DBUS_MANAGER.get_interface(iface)
def get_bus():
""" Return the loaded System Bus. """
"""Return the loaded System Bus."""
return DBUS_MANAGER.get_bus()
def set_mainloop(loop):
""" Set DBus main loop. """
"""Set DBus main loop."""
return DBUS_MANAGER.set_mainloop(loop)
def connect_to_dbus():
""" Connect to DBus. """
"""Connect to DBus."""
return DBUS_MANAGER.connect_to_dbus()
def threads_init():
""" Init GLib threads. """
"""Init GLib threads."""
dbus.mainloop.glib.threads_init()
class DBusManager(object):
""" Manages the DBus objects used by wicd. """
"""Manages the DBus objects used by wicd."""
def __init__(self):
self._bus = dbus.SystemBus()
self._dbus_ifaces = {}
self._dbus_ifaces = {}
def get_dbus_ifaces(self):
""" Returns a dict of dbus interfaces. """
"""Returns a dict of dbus interfaces."""
if not self._dbus_ifaces:
connect_to_dbus()
return self._dbus_ifaces
def get_interface(self, iface):
""" Returns a DBus Interface. """
"""Returns a DBus Interface."""
if not self._dbus_ifaces:
connect_to_dbus()
return self._dbus_ifaces[iface]
def get_bus(self):
""" Returns the loaded SystemBus. """
"""Returns the loaded SystemBus."""
return self._bus
def set_mainloop(self, loop):
""" Set DBus main loop. """
"""Set DBus main loop."""
dbus.set_default_main_loop(loop)
def connect_to_dbus(self):
""" Connects to wicd's dbus interfaces and loads them into a dict. """
"""Connects to wicd's dbus interfaces and loads them into a dict."""
proxy_obj = self._bus.get_object("org.wicd.daemon", '/org/wicd/daemon')
daemon = dbus.Interface(proxy_obj, 'org.wicd.daemon')
proxy_obj = self._bus.get_object("org.wicd.daemon",
'/org/wicd/daemon/wireless')
wireless = dbus.Interface(proxy_obj, 'org.wicd.daemon.wireless')
proxy_obj = self._bus.get_object("org.wicd.daemon",
'/org/wicd/daemon/wired')
wired = dbus.Interface(proxy_obj, 'org.wicd.daemon.wired')
self._dbus_ifaces = {"daemon" : daemon, "wireless" : wireless,
"wired" : wired}
self._dbus_ifaces = {"daemon": daemon, "wireless": wireless,
"wired": wired}
DBUS_MANAGER = DBusManager()

View File

@@ -1,5 +1,12 @@
#!/usr/bin/env python3
"""
Managing logfile rotation. A ManagedLog object is a file-like object that
rotates itself when a maximum size is reached.
TODO(gryf): how about using standard logging module and let the log rotating
to system tools like logrotate?
"""
#
# Copyright (C) 1999-2006 Keith Dart <keith@kdart.com>
# Copyright (C) 2008-2009 Dan O'Reilly <oreilldf@gmail.com>
@@ -14,28 +21,23 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
"""
Managing logfile rotation. A ManagedLog object is a file-like object that
rotates itself when a maximum size is reached.
"""
import sys
import os
import time
import io
import os
import sys
import time
class SizeError(IOError):
""" Custom error class. """
"""Custom error class."""
pass
class LogFile(io.FileIO):
"""LogFile(name, [mode="w"], [maxsize=360000])
Opens a new file object. After writing <maxsize> bytes a SizeError
will be raised.
"""
def __init__(self, name, mode="a", maxsize=360000, *args, **kwargs):
super(LogFile, self).__init__(name, mode, maxsize, *args, **kwargs)
@@ -48,29 +50,32 @@ class LogFile(io.FileIO):
def write(self, data):
self.written += len(data)
# TODO(gryf): revisit need for encode/decode madness
data = data.encode('utf-8')
if len(data) <= 0:
return
if self.eol:
super(LogFile, self).write(self.get_time().encode("utf-8") + b' :: ')
super(LogFile, self).write(self.get_time().encode("utf-8") +
b' :: ')
self.eol = False
if data[-1] == '\n':
self.eol = True
data = data[:-1]
super(LogFile, self).write(data.replace(
b'\n', b'\n' + self.get_time().encode("utf-8") + b' :: '))
super(LogFile, self).write(data.replace(b'\n', b'\n' + self.get_time()
.encode("utf-8") + b' :: '))
if self.eol:
super(LogFile, self).write('\n')
self.flush()
if self.written > self.maxsize:
raise SizeError
def get_time(self):
""" Return a string with the current time nicely formatted.
"""Return a string with the current time nicely formatted.
The format of the returned string is yyyy/mm/dd HH:MM:SS
@@ -82,21 +87,21 @@ class LogFile(io.FileIO):
str(x[4]).rjust(2, '0'), ':', str(x[5]).rjust(2, '0')])
def rotate(self):
""" Rotate logfile. """
"""Rotate logfile."""
return rotate(self)
def note(self, text):
"""Writes a specially formated note text to the file.
The note starts with the string '\\n#*=' so you can easily filter them.
"""
self.write("\n#*===== %s =====\n" % (text,))
class ManagedLog(object):
"""ManagedLog(name, [maxsize=360000], [maxsave=9])
A ManagedLog instance is a persistent log object. Write data with the
write() method. The log size and rotation is handled automatically.
@@ -108,29 +113,29 @@ class ManagedLog(object):
self.maxsave = maxsave
def __repr__(self):
return "%s(%r, %r, %r)" % (self.__class__.__name__, self._lf.name,
return "%s(%r, %r, %r)" % (self.__class__.__name__, self._lf.name,
self._lf.maxsize, self.maxsave)
def write(self, data):
""" Write logfile. """
"""Write logfile."""
try:
self._lf.write(data)
except SizeError:
self._lf = rotate(self._lf, self.maxsave)
def note(self, data):
""" Write a note to the logfile. """
"""Write a note to the logfile."""
try:
self._lf.note(data)
except SizeError:
self._lf = rotate(self._lf, self.maxsave)
def written(self):
""" Return whether the logfile was written. """
"""Return whether the logfile was written."""
return self._lf.written
def rotate(self):
""" Rotate logfile. """
"""Rotate logfile."""
self._lf = rotate(self._lf, self.maxsave)
# auto-delegate remaining methods (but you should not read or seek an open
@@ -141,9 +146,9 @@ class ManagedLog(object):
# useful for logged stdout for daemon processes
class ManagedStdio(ManagedLog):
""" Manage stdout/stderr. """
"""Manage stdout/stderr."""
def write(self, data):
""" Write logfile to disk. """
"""Write logfile to disk."""
try:
self._lf.write(data)
except SizeError:
@@ -157,7 +162,7 @@ class ManagedStdio(ManagedLog):
def rotate(fileobj, maxsave=9):
""" Rotate fileobj. """
"""Rotate fileobj."""
name = fileobj.name
mode = fileobj.mode
maxsize = fileobj.maxsize
@@ -168,7 +173,7 @@ def rotate(fileobj, maxsave=9):
# assumes basename logfile is closed.
def shiftlogs(basename, maxsave):
""" Shift logfiles. """
"""Shift logfiles."""
topname = "%s.%d" % (basename, maxsave)
if os.path.isfile(topname):
os.unlink(topname)
@@ -187,11 +192,12 @@ def shiftlogs(basename, maxsave):
def open(name, maxsize=360000, maxsave=9):
""" Open logfile. """
"""Open logfile."""
return ManagedLog(name, maxsize, maxsave)
def writelog(logobj, data):
""" Write logfile. """
"""Write logfile."""
try:
logobj.write(data)
except SizeError:

View File

@@ -1,4 +1,4 @@
""" misc - miscellaneous functions for wicd
"""misc - miscellaneous functions for wicd
This module contains a large variety of utility functions used
throughout wicd.
@@ -22,23 +22,24 @@ throughout wicd.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import locale
import sys
import re
import string
from gi.repository import GLib as gobject
from threading import Thread
from subprocess import Popen, STDOUT, PIPE, call
from subprocess import getoutput
from itertools import repeat, chain
import locale
import os
import re
from pipes import quote
import socket
import string
from subprocess import Popen, STDOUT, PIPE, call
from subprocess import getoutput
import sys
from threading import Thread
from gi.repository import GLib as gobject
from wicd.translations import _
# wicd imports
from . import wpath
from wicd import wpath
# Connection state constants
NOT_CONNECTED = 0
@@ -46,13 +47,11 @@ CONNECTING = 1
WIRELESS = 2
WIRED = 3
SUSPENDED = 4
_const_status_dict = {
NOT_CONNECTED: _('Not connected'),
CONNECTING: _('Connection in progress'),
WIRELESS: _('Connected to a wireless network'),
WIRED: _('Connected to a wired network'),
SUSPENDED: _('Connection suspended'),
}
_const_status_dict = {NOT_CONNECTED: _('Not connected'),
CONNECTING: _('Connection in progress'),
WIRELESS: _('Connected to a wireless network'),
WIRED: _('Connected to a wired network'),
SUSPENDED: _('Connection suspended')}
# Automatic app selection constant
AUTO = 0
@@ -75,17 +74,15 @@ ROUTE = 2
GKSUDO = 1
KDESU = 2
KTSUSS = 3
_sudo_dict = {
AUTO : "",
GKSUDO : "gksudo",
KDESU : "kdesu",
KTSUSS: "ktsuss",
}
_sudo_dict = {AUTO: "",
GKSUDO: "gksudo",
KDESU: "kdesu",
KTSUSS: "ktsuss"}
_status_dict = {
'aborted': _('Connection Cancelled'),
'association_failed': _('Connection failed: Could not contact the ' + \
'wireless access point.'),
'association_failed': _('Connection failed: Could not contact the '
'wireless access point.'),
'bad_pass': _('Connection Failed: Bad password'),
'configuring_interface': _('Configuring wireless interface...'),
'dhcp_failed': _('Connection Failed: Unable to Get IP Address'),
@@ -107,14 +104,15 @@ _status_dict = {
'verifying_association': _('Verifying access point association...'),
}
class WicdError(Exception):
""" Custom Exception type. """
"""Custom Exception type."""
pass
def Run(cmd, include_stderr=False, return_pipe=False,
return_obj=False, return_retcode=True):
""" Run a command.
"""Run a command.
Runs the given command, returning either the output
of the program, or a pipe to read output from.
@@ -144,34 +142,35 @@ def Run(cmd, include_stderr=False, return_pipe=False,
std_in = PIPE
else:
std_in = None
# We need to make sure that the results of the command we run
# are in English, so we set up a temporary environment.
tmpenv = os.environ.copy()
tmpenv["LC_ALL"] = "C"
tmpenv["LANG"] = "C"
try:
f = Popen(cmd, shell=False, stdout=PIPE, stdin=std_in, stderr=err,
close_fds=fds, cwd='/', env=tmpenv)
except OSError as e:
print(("Running command %s failed: %s" % (str(cmd), str(e))))
return ""
if return_obj:
return f
if return_pipe:
return f.stdout
else:
return f.communicate()[0].decode()
def LaunchAndWait(cmd):
""" Launches the given program with the given arguments, then blocks.
"""Launches the given program with the given arguments, then blocks.
cmd : A list contained the program name and its arguments.
returns: The exit code of the process.
"""
if not isinstance(cmd, list):
cmd = to_unicode(str(cmd))
@@ -179,8 +178,9 @@ def LaunchAndWait(cmd):
p = Popen(cmd, shell=False, stdout=PIPE, stderr=STDOUT, stdin=None)
return p.wait()
def IsValidIP(ip):
""" Make sure an entered IP is valid. """
"""Make sure an entered IP is valid."""
if not ip:
return False
@@ -189,6 +189,7 @@ def IsValidIP(ip):
return False
return True
def IsValidIPv4(ip):
''' Make sure an entered IP is a valid IPv4. '''
try:
@@ -197,6 +198,7 @@ def IsValidIPv4(ip):
return False
return True
def IsValidIPv6(ip):
''' Make sure an entered IP is a valid IPv6. '''
try:
@@ -205,8 +207,9 @@ def IsValidIPv6(ip):
return False
return True
def PromptToStartDaemon():
""" Prompt the user to start the daemon """
"""Prompt the user to start the daemon"""
daemonloc = wpath.sbin + 'wicd'
sudo_prog = choose_sudo_prog()
if not sudo_prog:
@@ -215,26 +218,29 @@ def PromptToStartDaemon():
msg = '--message'
else:
msg = '--caption'
sudo_args = [sudo_prog, msg,
sudo_args = [sudo_prog, msg,
_("Wicd needs to access your computer's network cards."),
daemonloc]
os.spawnvpe(os.P_WAIT, sudo_prog, sudo_args, os.environ)
return True
def RunRegex(regex, s):
""" runs a regex search on a string """
"""runs a regex search on a string"""
m = regex.search(s)
if m:
return m.groups()[0]
else:
return None
def WriteLine(my_file, text):
""" write a line to a file """
"""write a line to a file"""
my_file.write(text + "\n")
def ExecuteScripts(scripts_dir, verbose=False, extra_parameters=()):
""" Execute every executable file in a given directory. """
"""Execute every executable file in a given directory."""
if not os.path.exists(scripts_dir):
return
for obj in sorted(os.listdir(scripts_dir)):
@@ -245,9 +251,10 @@ def ExecuteScripts(scripts_dir, verbose=False, extra_parameters=()):
ExecuteScript(os.path.abspath(obj), verbose=verbose,
extra_parameters=extra_parameters)
def ExecuteScript(script, verbose=False, extra_parameters=()):
""" Execute a command and send its output to the bit bucket. """
extra_parameters = [ quote(s) for s in extra_parameters ]
"""Execute a command and send its output to the bit bucket."""
extra_parameters = [quote(s) for s in extra_parameters]
params = ' '.join(extra_parameters)
# escape script name
script = quote(script)
@@ -257,26 +264,29 @@ def ExecuteScript(script, verbose=False, extra_parameters=()):
if verbose:
print(("%s returned %s" % (script, ret)))
def ReadFile(filename):
""" read in a file and return it's contents as a string """
"""read in a file and return it's contents as a string"""
if not os.path.exists(filename):
return None
my_file = open(filename,'r')
my_file = open(filename, 'r')
data = my_file.read().strip()
my_file.close()
return str(data)
def to_bool(var):
""" Convert a string to type bool, but make "False"/"0" become False. """
"""Convert a string to type bool, but make "False"/"0" become False."""
if var in ("False", "0"):
var = False
else:
var = bool(var)
return var
def Noneify(variable, convert_to_bool=True):
""" Convert string types to either None or booleans"""
#set string Nones to real Nones
"""Convert string types to either None or booleans"""
# set string Nones to real Nones
if variable in ("None", "", None):
return None
if convert_to_bool:
@@ -287,8 +297,9 @@ def Noneify(variable, convert_to_bool=True):
return True
return variable
def ParseEncryption(network):
""" Parse through an encryption template file
"""Parse through an encryption template file
Parses an encryption template, reading in a network's info
and creating a config file for it
@@ -309,8 +320,8 @@ def ParseEncryption(network):
if line.strip().startswith("}"):
# This is the last line, so we just write it.
config_file = ''.join([config_file, line])
elif "$_" in line:
for cur_val in re.findall('\$_([A-Z0-9_]+)', line):
elif "$_" in line:
for cur_val in re.findall(r'\$_([A-Z0-9_]+)', line):
if cur_val:
rep_val = network.get(cur_val.lower())
if not rep_val:
@@ -329,7 +340,7 @@ def ParseEncryption(network):
else: # Just a regular entry.
config_file = ''.join([config_file, line])
# Write the data to the files then chmod them so they can't be read
# Write the data to the files then chmod them so they can't be read
# by normal users.
if network.get('bssid'):
file_name = network['bssid'].replace(":", "").lower()
@@ -344,8 +355,9 @@ def ParseEncryption(network):
f.write(config_file)
f.close()
def LoadEncryptionMethods(wired = False):
""" Load encryption methods from configuration files
def LoadEncryptionMethods(wired=False):
"""Load encryption methods from configuration files
Loads all the encryption methods from the template files
in /encryption/templates into a data structure. To be
@@ -357,11 +369,11 @@ def LoadEncryptionMethods(wired = False):
else:
active_fname = "active"
try:
enctypes = open(wpath.encryption + active_fname,"r").readlines()
enctypes = open(wpath.encryption + active_fname, "r").readlines()
except IOError as e:
print("Fatal Error: template index file is missing.")
raise IOError(e)
# Parse each encryption method
encryptionTypes = []
for enctype in enctypes:
@@ -370,6 +382,7 @@ def LoadEncryptionMethods(wired = False):
encryptionTypes.append(parsed_template)
return encryptionTypes
def __parse_field_ent(fields, field_type='require'):
fields = fields.split(" ")
ret = []
@@ -383,8 +396,9 @@ def __parse_field_ent(fields, field_type='require'):
ret.append([val, disp_val[1:]])
return ret
def _parse_enc_template(enctype):
""" Parse an encryption template. """
"""Parse an encryption template."""
def parse_ent(line, key):
return line.replace(key, "").replace("=", "").strip()
@@ -405,10 +419,12 @@ def _parse_enc_template(enctype):
if line.startswith("name") and not cur_type["name"]:
cur_type["name"] = parse_ent(line, "name")
elif line.startswith("require"):
cur_type["required"] = __parse_field_ent(parse_ent(line, "require"))
cur_type["required"] = __parse_field_ent(parse_ent(line,
"require"))
if not cur_type["required"]:
# An error occured parsing the require line.
print(("Invalid 'required' line found in template %s" % enctype))
print("Invalid 'required' line found in template %s" %
enctype)
continue
elif line.startswith("optional"):
cur_type["optional"] = __parse_field_ent(parse_ent(line,
@@ -416,32 +432,34 @@ def _parse_enc_template(enctype):
field_type="optional")
if not cur_type["optional"]:
# An error occured parsing the optional line.
print(("Invalid 'optional' line found in template %s" % enctype))
print("Invalid 'optional' line found in template %s" %
enctype)
continue
elif line.startswith("protected"):
cur_type["protected"] = __parse_field_ent(
parse_ent(line, "protected"),
field_type="protected"
)
cur_type["protected"] = __parse_field_ent(parse_ent(line,
"protected"),
field_type="protected")
if not cur_type["protected"]:
# An error occured parsing the protected line.
print(("Invalid 'protected' line found in template %s" % enctype))
print("Invalid 'protected' line found in template %s" %
enctype)
continue
elif line.startswith("----"):
# We're done.
break
f.close()
if not cur_type["required"]:
print(("Failed to find a 'require' line in template %s" % enctype))
print("Failed to find a 'require' line in template %s" % enctype)
return None
if not cur_type["name"]:
print(("Failed to find a 'name' line in template %s" % enctype))
print("Failed to find a 'name' line in template %s" % enctype)
return None
else:
return cur_type
def noneToString(text):
""" Convert None, "None", or "" to string type "None"
"""Convert None, "None", or "" to string type "None"
Used for putting text in a text box. If the value to put in is 'None',
the box will be blank.
@@ -452,8 +470,9 @@ def noneToString(text):
else:
return to_unicode(text)
def sanitize_config(s):
""" Sanitize property names to be used in config-files. """
"""Sanitize property names to be used in config-files."""
allowed = string.ascii_letters + '_' + string.digits
table = string.maketrans(allowed, ' ' * len(allowed))
@@ -461,20 +480,22 @@ def sanitize_config(s):
# make it simple.
return s.encode('ascii', 'replace').translate(None, table)
def sanitize_escaped(s):
""" Sanitize double-escaped unicode strings. """
"""Sanitize double-escaped unicode strings."""
lastpos = -1
while True:
lastpos = s.find('\\x', lastpos + 1)
#print lastpos
# print lastpos
if lastpos == -1:
break
c = s[lastpos+2:lastpos+4] # i.e. get the next two characters
s = s.replace('\\x'+c, chr(int(c, 16)))
return s
def to_unicode(x):
""" Attempts to convert a string to utf-8. """
"""Attempts to convert a string to utf-8."""
# If this is a unicode string, encode it and return
if not isinstance(x, bytes):
return x
@@ -495,11 +516,12 @@ def to_unicode(x):
ret = x.decode('latin-1').encode('utf-8')
except UnicodeError:
ret = x.decode('utf-8', 'replace').encode('utf-8')
return ret
def RenameProcess(new_name):
""" Renames the process calling the function to the given name. """
"""Renames the process calling the function to the given name."""
if 'linux' not in sys.platform:
print('Unsupported platform')
return False
@@ -509,16 +531,17 @@ def RenameProcess(new_name):
libc = ctypes.CDLL(find_library('c'))
libc.prctl(15, new_name, 0, 0, 0)
return True
except:
except Exception:
print("rename failed")
return False
def detect_desktop_environment():
""" Try to determine which desktop environment is in use.
"""Try to determine which desktop environment is in use.
Choose between kde, gnome, or xfce based on environment
variables and a call to xprop.
"""
desktop_environment = 'generic'
if os.environ.get('KDE_FULL_SESSION') == 'true':
@@ -534,8 +557,9 @@ def detect_desktop_environment():
pass
return desktop_environment
def get_sudo_cmd(msg, prog_num=0):
""" Returns a graphical sudo command for generic use. """
"""Returns a graphical sudo command for generic use."""
sudo_prog = choose_sudo_prog(prog_num)
if not sudo_prog:
return None
@@ -545,34 +569,36 @@ def get_sudo_cmd(msg, prog_num=0):
msg_flag = "--caption"
return [sudo_prog, msg_flag, msg]
def choose_sudo_prog(prog_num=0):
""" Try to intelligently decide which graphical sudo program to use. """
"""Try to intelligently decide which graphical sudo program to use."""
if prog_num:
return find_path(_sudo_dict[prog_num])
desktop_env = detect_desktop_environment()
env_path = os.environ['PATH'].split(":")
paths = []
if desktop_env == "kde":
progs = ["kdesu", "kdesudo", "ktsuss"]
else:
progs = ["gksudo", "gksu", "ktsuss"]
for prog in progs:
paths.extend([os.path.join(p, prog) for p in env_path])
for path in paths:
if os.path.exists(path):
return path
return ""
def find_path(cmd):
""" Try to find a full path for a given file name.
"""Try to find a full path for a given file name.
Search the all the paths in the environment variable PATH for
the given file name, or return None if a full path for
the file can not be found.
"""
paths = os.getenv("PATH").split(':')
if not paths:
@@ -583,28 +609,32 @@ def find_path(cmd):
return os.path.join(path, cmd)
return None
def noneToBlankString(text):
""" Converts NoneType or "None" to a blank string. """
"""Converts NoneType or "None" to a blank string."""
if text in (None, "None"):
return ""
else:
return str(text)
def stringToNone(text):
""" Performs opposite function of noneToString. """
"""Performs opposite function of noneToString."""
if text in ("", None, "None"):
return None
else:
return str(text)
def checkboxTextboxToggle(checkbox, textboxes):
""" Manage {de,}activation of textboxes depending on checkboxes. """
"""Manage {de,}activation of textboxes depending on checkboxes."""
# FIXME: should be moved to UI-specific files?
for textbox in textboxes:
textbox.set_sensitive(checkbox.get_active())
def threaded(f):
""" A decorator that will make any function run in a new thread. """
"""A decorator that will make any function run in a new thread."""
def wrapper(*args, **kwargs):
t = Thread(target=f, args=args, kwargs=kwargs)
@@ -618,8 +648,9 @@ def threaded(f):
return wrapper
def timeout_add(time, func, milli=False):
""" Convience function for running a function on a timer. """
"""Convience function for running a function on a timer."""
if hasattr(gobject, "timeout_add_seconds") and not milli:
return gobject.timeout_add_seconds(time, func)
else:
@@ -627,16 +658,19 @@ def timeout_add(time, func, milli=False):
time = time * 1000
return gobject.timeout_add(time, func)
def izip_longest(*args, **kwds):
""" Implement the itertools.izip_longest method.
"""Implement the itertools.izip_longest method.
We implement the method here because its new in Python 2.6.
"""
# izip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
fillvalue = kwds.get('fillvalue')
def sentinel(counter = ([fillvalue]*(len(args)-1)).pop):
yield counter() # yields the fillvalue, or raises IndexError
def sentinel(counter=([fillvalue]*(len(args)-1)).pop):
yield counter() # yields the fillvalue, or raises IndexError
fillers = repeat(fillvalue)
iters = [chain(it, sentinel(), fillers) for it in args]
try:
@@ -645,8 +679,9 @@ def izip_longest(*args, **kwds):
except IndexError:
pass
def grouper(n, iterable, fillvalue=None):
""" Iterate over several elements at once
"""Iterate over several elements at once
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
""" monitor -- connection monitoring process
"""monitor -- connection monitoring process
This process is spawned as a child of the daemon, and is responsible
for monitoring connection status and initiating autoreconnection
@@ -24,9 +24,9 @@ when appropriate.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from gi.repository import GLib as gobject
import time
from gi.repository import GLib as gobject
from dbus import DBusException
from wicd import wpath
@@ -46,6 +46,7 @@ wireless = dbus_dict["wireless"]
mainloop = None
def diewithdbus(func):
"""
Decorator catching DBus exceptions, making wicd quit.
@@ -67,12 +68,13 @@ def diewithdbus(func):
wrapper.__name__ = func.__name__
wrapper.__dict__ = func.__dict__
wrapper.__doc__ = func.__doc__
return wrapper
return wrapper
class ConnectionStatus(object):
""" Class for monitoring the computer's connection status. """
"""Class for monitoring the computer's connection status."""
def __init__(self):
""" Initialize variables needed for the connection status methods. """
"""Initialize variables needed for the connection status methods."""
self.last_strength = -2
self.last_state = misc.NOT_CONNECTED
self.last_reconnect_time = time.time()
@@ -90,22 +92,22 @@ class ConnectionStatus(object):
self.__lost_dbus_count = 0
self._to_time = daemon.GetBackendUpdateInterval()
self.update_callback = None
self.add_poll_callback()
bus = dbusmanager.get_bus()
bus.add_signal_receiver(self._force_update_connection_status,
bus.add_signal_receiver(self._force_update_connection_status,
"UpdateState", "org.wicd.daemon")
bus.add_signal_receiver(self._update_timeout_interval,
"SignalBackendChanged", "org.wicd.daemon")
def _update_timeout_interval(self, interval):
""" Update the callback interval when signaled by the daemon. """
"""Update the callback interval when signaled by the daemon."""
self._to_time = interval
gobject.source_remove(self.update_callback)
self.add_poll_callback()
def _force_update_connection_status(self):
""" Run a connection status update on demand.
"""Run a connection status update on demand.
Removes the scheduled update_connection_status()
call, explicitly calls the function, and reschedules
@@ -115,18 +117,18 @@ class ConnectionStatus(object):
gobject.source_remove(self.update_callback)
self.update_connection_status()
self.add_poll_callback()
def add_poll_callback(self):
""" Registers a polling call at a predetermined interval.
"""Registers a polling call at a predetermined interval.
The polling interval is determined by the backend in use.
"""
self.update_callback = misc.timeout_add(self._to_time,
self.update_connection_status)
def check_for_wired_connection(self, wired_ip):
""" Checks for a wired connection.
"""Checks for a wired connection.
Checks for two states:
1) A wired connection is not in use, but a cable is plugged
@@ -150,20 +152,20 @@ class ConnectionStatus(object):
return True
# Wired connection isn't active
elif wired_ip and self.still_wired:
# If we still have an IP, but no cable is plugged in
# If we still have an IP, but no cable is plugged in
# we should disconnect to clear it.
wired.DisconnectWired()
self.still_wired = False
return False
def check_for_wireless_connection(self, wireless_ip):
""" Checks for an active wireless connection.
"""Checks for an active wireless connection.
Checks for an active wireless connection. Also notes
if the signal strength is 0, and if it remains there
for too long, triggers a wireless disconnect.
Returns True if wireless connection is active, and
Returns True if wireless connection is active, and
False otherwise.
"""
@@ -197,17 +199,17 @@ class ConnectionStatus(object):
self.connection_lost_counter = 0
if (wifi_signal != self.last_strength or
self.network != self.last_network):
self.network != self.last_network):
self.last_strength = wifi_signal
self.last_network = self.network
self.signal_changed = True
daemon.SetCurrentInterface(daemon.GetWirelessInterface())
daemon.SetCurrentInterface(daemon.GetWirelessInterface())
return True
@diewithdbus
def update_connection_status(self):
""" Updates the tray icon and current connection status.
"""Updates the tray icon and current connection status.
Determines the current connection state and sends a dbus signal
announcing when the status changes. Also starts the automatic
@@ -253,8 +255,8 @@ class ConnectionStatus(object):
if not daemon.GetGUIOpen():
print('Killing wireless connection to switch to wired...')
wireless.DisconnectWireless()
daemon.AutoConnect(False, reply_handler=lambda *a:None,
error_handler=lambda *a:None)
daemon.AutoConnect(False, reply_handler=lambda *a: None,
error_handler=lambda *a: None)
return self.update_state(misc.NOT_CONNECTED)
return self.update_state(misc.WIRELESS, wifi_ip=wifi_ip)
@@ -267,7 +269,7 @@ class ConnectionStatus(object):
return self.update_state(state)
def update_state(self, state, wired_ip=None, wifi_ip=None):
""" Set the current connection state. """
"""Set the current connection state."""
# Set our connection state/info.
iwconfig = self.iwconfig
if state == misc.NOT_CONNECTED:
@@ -278,15 +280,18 @@ class ConnectionStatus(object):
if wired.CheckIfWiredConnecting():
info = ["wired"]
else:
info = ["wireless",
misc.noneToBlankString(wireless.GetCurrentNetwork(iwconfig))]
info = ["wireless", misc.
noneToBlankString(wireless.
GetCurrentNetwork(iwconfig))]
elif state == misc.WIRELESS:
self.reconnect_tries = 0
info = [str(wifi_ip),
misc.noneToBlankString(wireless.GetCurrentNetwork(iwconfig)),
str(self._get_printable_sig_strength()),
str(wireless.GetCurrentNetworkID(iwconfig)),
misc.noneToBlankString(wireless.GetCurrentBitrate(iwconfig))]
misc.noneToBlankString(wireless.
GetCurrentNetwork(iwconfig)),
str(self._get_printable_sig_strength()),
str(wireless.GetCurrentNetworkID(iwconfig)),
misc.noneToBlankString(wireless.
GetCurrentBitrate(iwconfig))]
elif state == misc.WIRED:
self.reconnect_tries = 0
info = [str(wired_ip)]
@@ -297,12 +302,12 @@ class ConnectionStatus(object):
daemon.SetConnectionStatus(state, info)
# Send a D-Bus signal announcing status has changed if necessary.
if (state != self.last_state or (state == misc.WIRELESS and
if (state != self.last_state or (state == misc.WIRELESS and
self.signal_changed)):
daemon.EmitStatusChanged(state, info)
if (state != self.last_state) and (state == misc.NOT_CONNECTED) and \
(not daemon.GetForcedDisconnect()):
if (state != self.last_state and state == misc.NOT_CONNECTED and
not daemon.GetForcedDisconnect()):
daemon.Disconnect()
# Disconnect() sets forced disconnect = True
# so we'll revert that
@@ -311,7 +316,7 @@ class ConnectionStatus(object):
return True
def _get_printable_sig_strength(self, always_positive=False):
""" Get the correct signal strength format. """
"""Get the correct signal strength format."""
try:
if daemon.GetSignalDisplayType() == 0:
signal = wireless.GetCurrentSignalStrength(self.iwconfig)
@@ -327,12 +332,12 @@ class ConnectionStatus(object):
else:
wifi_signal = int(signal)
except TypeError:
wifi_signal = 0
wifi_signal = 0
return wifi_signal
def auto_reconnect(self, from_wireless=None):
""" Automatically reconnects to a network if needed.
"""Automatically reconnects to a network if needed.
If automatic reconnection is turned on, this method will
attempt to first reconnect to the last used wireless network, and
@@ -344,7 +349,7 @@ class ConnectionStatus(object):
# Some checks to keep reconnect retries from going crazy.
if (self.reconnect_tries > 3 and
(time.time() - self.last_reconnect_time) < 200):
(time.time() - self.last_reconnect_time) < 200):
print("Throttling autoreconnect")
return
@@ -364,24 +369,26 @@ class ConnectionStatus(object):
# before we reconnect
print('Disconnecting from network')
wireless.DisconnectWireless()
print(('Trying to reconnect to last used wireless ' + \
'network'))
print('Trying to reconnect to last used wireless network')
wireless.ConnectWireless(cur_net_id)
else:
daemon.AutoConnect(True, reply_handler=reply_handle,
error_handler=err_handle)
self.reconnecting = False
def reply_handle():
""" Just a dummy function needed for asynchronous dbus calls. """
"""Just a dummy function needed for asynchronous dbus calls."""
pass
def err_handle(error):
""" Just a dummy function needed for asynchronous dbus calls. """
"""Just a dummy function needed for asynchronous dbus calls."""
pass
def main():
""" Starts the connection monitor.
"""Starts the connection monitor.
Starts a ConnectionStatus instance, sets the status to update
an amount of time determined by the active backend.

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
""" Suspends the wicd daemon.
"""Suspends the wicd daemon.
Sets a flag in the daemon that will stop it from monitoring network status.
Used for when a laptop enters hibernation/suspension.
@@ -23,10 +23,10 @@ Used for when a laptop enters hibernation/suspension.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import dbus
import dbus.service
import sys
try:
bus = dbus.SystemBus()

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -* coding: utf-8 -*-
""" translations -- module for handling the translation strings for wicd. """
"""translations -- module for handling the translation strings for wicd."""
#
# Copyright (C) 2007 - 2009 Adam Blackburn
# Copyright (C) 2007 - 2009 Dan O'Reilly
@@ -21,12 +21,12 @@
#
import locale
import os
from . import wpath
from wicd import wpath
import gettext
def get_gettext():
""" Set up gettext for translations. """
"""Set up gettext for translations."""
# Borrowed from an excellent post on how to do this at
# http://www.learningpython.com/2006/12/03/translating-your-pythonpygtk-application/
local_path = wpath.translations
@@ -41,8 +41,8 @@ def get_gettext():
try:
# This avoids a bug: locale.getdefaultlocale() prefers
# LC_CTYPE over LANG/LANGUAGE
lc, encoding = locale.getdefaultlocale(envvars=('LC_MESSAGES',
'LC_ALL', 'LANG',
lc, encoding = locale.getdefaultlocale(envvars=('LC_MESSAGES',
'LC_ALL', 'LANG',
'LANGUAGE'))
except ValueError as e:
print((str(e)))
@@ -50,11 +50,11 @@ def get_gettext():
if (lc):
langs += [lc]
langs += ["en_US"]
lang = gettext.translation('wicd', local_path, languages=langs,
lang = gettext.translation('wicd', local_path, languages=langs,
fallback=True)
return lang.gettext
_ = get_gettext()
_ = get_gettext() # noqa
# language[] should contain only strings in encryption templates, which
@@ -66,13 +66,15 @@ language = {}
# FIXME: these were present in wicd 1.7.0, can't find where they are.
# Leaving here for future reference, they should be removed whenever
# possible.
#language['cannot_start_daemon'] = _('Unable to connect to wicd daemon ' + \
# 'DBus interface. This typically means there was a problem starting ' + \
# 'the daemon. Check the wicd log for more information.')
#language['backend_alert'] = _('Changes to your backend won't occur until ' + \
# 'the daemon is restarted.')
#language['about_help'] = _('Stop a network connection in progress')
#language['connect'] = _('Connect')
# language['cannot_start_daemon'] = _('Unable to connect to wicd daemon '
# 'DBus interface. This typically means '
# 'there was a problem starting the '
# 'daemon. Check the wicd log for more '
# 'information.')
# language['backend_alert'] = _('Changes to your backend won't occur until '
# 'the daemon is restarted.')
# language['about_help'] = _('Stop a network connection in progress')
# language['connect'] = _('Connect')
# from templates, dict populated with:
# grep -R "*" encryption/templates/ | tr " " "\n" | grep "^*" | \

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff