mirror of
https://github.com/gryf/pygtktalog.git
synced 2025-12-17 03:20:17 +01:00
Added details/discs/files models, added scan module and its test
Changes im logging module, but also on others
This commit is contained in:
@@ -183,6 +183,7 @@ class DiscsController(Controller):
|
||||
Change of a current dir signalized by other controllers/models
|
||||
"""
|
||||
LOG.debug(self.property_currentdir_value_change.__doc__.strip())
|
||||
self._set_cursor_to_obj_position(new)
|
||||
|
||||
# private methods
|
||||
def _popup_menu(self, selection, event, button):
|
||||
@@ -203,3 +204,11 @@ class DiscsController(Controller):
|
||||
|
||||
self.view.menu['discs_popup'].popup(None, None, None,
|
||||
button, event.time)
|
||||
|
||||
def _set_cursor_to_obj_position(self, obj):
|
||||
"""
|
||||
Set cursor/focus to specified object postion in Discs treeview.
|
||||
"""
|
||||
path = self.model.discs.find_path(obj)
|
||||
self.view['discs'].expand_to_path(path)
|
||||
self.view['discs'].set_cursor(path)
|
||||
|
||||
@@ -176,7 +176,7 @@ class FilesController(Controller):
|
||||
if row and gtk_column:
|
||||
fileob = self.files_model.get_value(row)
|
||||
if fileob.parent.parent.id != 1:
|
||||
self.files_model.refresh(fileob.parent.parent)
|
||||
#self.files_model.refresh(fileob.parent.parent)
|
||||
# TODO: synchronize with disks
|
||||
self.model.discs.currentdir = fileob.parent.parent
|
||||
|
||||
|
||||
@@ -25,14 +25,21 @@ Session = sessionmaker()
|
||||
LOG = get_logger("dbcommon")
|
||||
|
||||
|
||||
def connect(filename):
|
||||
def connect(filename=None):
|
||||
"""
|
||||
create engine and bind to Meta object.
|
||||
Arguments:
|
||||
@filename - string with absolute or relative path to sqlite database
|
||||
file.
|
||||
file. If None, db in-memory will be created
|
||||
"""
|
||||
|
||||
if not filename:
|
||||
filename = ':memory:'
|
||||
|
||||
LOG.info("db filename: %s" % filename)
|
||||
engine = create_engine("sqlite:///%s" % filename)
|
||||
|
||||
connect_string = "sqlite:///%s" % filename
|
||||
engine = create_engine(connect_string)
|
||||
Meta.bind = engine
|
||||
Meta.create_all(engine)
|
||||
Meta.create_all(checkfirst=True)
|
||||
|
||||
|
||||
@@ -98,7 +98,7 @@ class Image(Base):
|
||||
|
||||
def __init__(self, filename=None):
|
||||
self.filename = filename
|
||||
self.file = file
|
||||
self.file = None
|
||||
|
||||
def __repr__(self):
|
||||
return "<Image('%s', %s)>" % (str(self.filename), str(self.id))
|
||||
|
||||
@@ -40,8 +40,10 @@ class Dialog(object):
|
||||
# Ofcourse, if something changes in the future, this could break
|
||||
# things.
|
||||
if self.ok_default:
|
||||
button = self.dialog.get_children()[0].get_children()[2]
|
||||
button.get_children()[self.ok_default].grab_default()
|
||||
# this is tricky: Ok/Yes buttons appears as first on the list, but
|
||||
# they are visibile in oposite order. This could be a bug.
|
||||
button = self.dialog.get_action_area().get_children()[0]
|
||||
button.grab_default()
|
||||
|
||||
retval = self.dialog.run()
|
||||
self.dialog.destroy()
|
||||
|
||||
@@ -9,6 +9,42 @@ import os
|
||||
import sys
|
||||
import logging
|
||||
|
||||
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
|
||||
|
||||
# The background is set with 40 plus the number of the color, and the
|
||||
# foreground with 30
|
||||
|
||||
#These are the sequences need to get colored ouput
|
||||
RESET_SEQ = "\033[0m"
|
||||
COLOR_SEQ = "\033[1;%dm"
|
||||
BOLD_SEQ = "\033[1m"
|
||||
|
||||
def formatter_message(message, use_color = True):
|
||||
if use_color:
|
||||
message = message.replace("$RESET", RESET_SEQ).replace("$BOLD",
|
||||
BOLD_SEQ)
|
||||
else:
|
||||
message = message.replace("$RESET", "").replace("$BOLD", "")
|
||||
return message
|
||||
|
||||
COLORS = {'WARNING': YELLOW,
|
||||
'INFO': WHITE,
|
||||
'DEBUG': BLUE,
|
||||
'CRITICAL': YELLOW,
|
||||
'ERROR': RED}
|
||||
|
||||
class ColoredFormatter(logging.Formatter):
|
||||
def __init__(self, msg, use_color = True):
|
||||
logging.Formatter.__init__(self, msg)
|
||||
self.use_color = use_color
|
||||
|
||||
def format(self, record):
|
||||
levelname = record.levelname
|
||||
if self.use_color and levelname in COLORS:
|
||||
levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) \
|
||||
+ levelname + RESET_SEQ
|
||||
record.levelname = levelname_color
|
||||
return logging.Formatter.format(self, record)
|
||||
LEVEL = {'DEBUG': logging.DEBUG,
|
||||
'INFO': logging.INFO,
|
||||
'WARN': logging.WARN,
|
||||
@@ -44,9 +80,10 @@ def get_logger(module_name, level=None, to_file=False):
|
||||
"%(levelname)s - %(message)s")
|
||||
else:
|
||||
log_handler = logging.StreamHandler(sys.stderr)
|
||||
formatter = logging.Formatter("%(filename)s:%(lineno)s - "
|
||||
formatter = ColoredFormatter("%(filename)s:%(lineno)s - "
|
||||
"%(levelname)s - %(message)s")
|
||||
|
||||
log_handler.setFormatter(formatter)
|
||||
log.addHandler(log_handler)
|
||||
return log
|
||||
|
||||
|
||||
31
pygtktalog/models/details.py
Normal file
31
pygtktalog/models/details.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""
|
||||
Project: pyGTKtalog
|
||||
Description: Model(s) for details part of the application
|
||||
Type: core
|
||||
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
|
||||
Created: 2010-11-09
|
||||
"""
|
||||
import gtk
|
||||
import gobject
|
||||
|
||||
from gtkmvc import Model
|
||||
|
||||
|
||||
class DetailsModel(Model):
|
||||
"""
|
||||
Main model for application.
|
||||
It is responsible for communicate with database objects and I/O
|
||||
operations.
|
||||
"""
|
||||
|
||||
exif = gtk.ListStore(gobject.TYPE_PYOBJECT,
|
||||
gobject.TYPE_STRING,
|
||||
gobject.TYPE_STRING,
|
||||
gobject.TYPE_STRING,
|
||||
gobject.TYPE_UINT64,
|
||||
gobject.TYPE_STRING,
|
||||
gobject.TYPE_INT,
|
||||
str)
|
||||
|
||||
__observables__ = ['exif']
|
||||
|
||||
107
pygtktalog/models/discs.py
Normal file
107
pygtktalog/models/discs.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""
|
||||
Project: pyGTKtalog
|
||||
Description: Model for discs representation
|
||||
Type: core
|
||||
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
|
||||
Created: 2009-05-02
|
||||
"""
|
||||
import gtk
|
||||
import gobject
|
||||
|
||||
from gtkmvc import Model
|
||||
|
||||
from pygtktalog.dbobjects import File
|
||||
from pygtktalog.dbcommon import Session
|
||||
from pygtktalog.logger import get_logger
|
||||
|
||||
LOG = get_logger("discs model")
|
||||
|
||||
|
||||
class DiscsModel(Model):
|
||||
"""
|
||||
Model for discs representation.
|
||||
"""
|
||||
|
||||
currentdir = None
|
||||
|
||||
__observables__ = ("currentdir",)
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialization. Make some nice defaults.
|
||||
"""
|
||||
Model.__init__(self)
|
||||
self.discs = gtk.TreeStore(gobject.TYPE_PYOBJECT,
|
||||
gobject.TYPE_STRING,
|
||||
str)
|
||||
self.files_model = None
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
Make TreeStore empty
|
||||
"""
|
||||
self.discs.clear()
|
||||
|
||||
def refresh(self, session=Session()):
|
||||
"""
|
||||
Read objects from database, fill TreeStore model with discs
|
||||
information
|
||||
Arguments:
|
||||
@session current sqlalchemy.orm.session.Session object
|
||||
"""
|
||||
LOG.debug("session obj: %s" % str(session))
|
||||
dirs = session.query(File).filter(File.type == 1)
|
||||
dirs = dirs.order_by(File.filename).all()
|
||||
|
||||
def get_children(parent_id=1, iterator=None):
|
||||
"""
|
||||
Get all children of the selected parent.
|
||||
Arguments:
|
||||
@parent_id - integer with id of the parent (from db)
|
||||
@iterator - gtk.TreeIter, which points to a path inside model
|
||||
"""
|
||||
for fileob in dirs:
|
||||
if fileob.parent_id == parent_id:
|
||||
myiter = self.discs.insert_before(iterator, None)
|
||||
self.discs.set_value(myiter, 0, fileob)
|
||||
self.discs.set_value(myiter, 1, fileob.filename)
|
||||
if iterator is None:
|
||||
self.discs.set_value(myiter, 2, gtk.STOCK_CDROM)
|
||||
else:
|
||||
self.discs.set_value(myiter, 2, gtk.STOCK_DIRECTORY)
|
||||
get_children(fileob.id, myiter)
|
||||
return
|
||||
get_children()
|
||||
return True
|
||||
|
||||
def find_path(self, obj):
|
||||
"""
|
||||
Return path of specified File object (which should be the first one)
|
||||
"""
|
||||
path = None
|
||||
gtkiter = self.discs.get_iter_first()
|
||||
|
||||
def get_children(iterator):
|
||||
"""
|
||||
Iterate through entire TreeModel, and return path for specified in
|
||||
outter scope File object
|
||||
"""
|
||||
if self.discs.get_value(iterator, 0) == obj:
|
||||
return self.discs.get_path(iterator)
|
||||
|
||||
if self.discs.iter_has_child(iterator):
|
||||
path = get_children(self.discs.iter_children(iterator))
|
||||
if path:
|
||||
return path
|
||||
|
||||
iterator = self.discs.iter_next(iterator)
|
||||
if iterator is None:
|
||||
return None
|
||||
|
||||
return get_children(iterator)
|
||||
|
||||
path = get_children(gtkiter)
|
||||
LOG.debug("found path for object '%s': %s" % (str(obj), str(path)))
|
||||
return path
|
||||
|
||||
|
||||
79
pygtktalog/models/files.py
Normal file
79
pygtktalog/models/files.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""
|
||||
Project: pyGTKtalog
|
||||
Description: Model for files representation
|
||||
Type: core
|
||||
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
|
||||
Created: 2010-11-12
|
||||
"""
|
||||
import gtk
|
||||
import gobject
|
||||
|
||||
from gtkmvc import Model
|
||||
|
||||
from pygtktalog.dbcommon import Session
|
||||
from pygtktalog.logger import get_logger
|
||||
|
||||
LOG = get_logger("files model")
|
||||
|
||||
|
||||
class FilesModel(Model):
|
||||
"""
|
||||
Model for files representation
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialization. Make some nice defaults.
|
||||
"""
|
||||
Model.__init__(self)
|
||||
self.files = gtk.ListStore(gobject.TYPE_PYOBJECT,
|
||||
gobject.TYPE_STRING,
|
||||
gobject.TYPE_STRING,
|
||||
gobject.TYPE_STRING,
|
||||
gobject.TYPE_UINT64,
|
||||
gobject.TYPE_STRING,
|
||||
gobject.TYPE_INT,
|
||||
str)
|
||||
self.discs_model = None
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
Cleanup ListStore model
|
||||
"""
|
||||
self.files.clear()
|
||||
|
||||
def refresh(self, fileob):
|
||||
"""
|
||||
Update files ListStore
|
||||
Arguments:
|
||||
fileob - File object
|
||||
"""
|
||||
LOG.info("found %d files for File object: %s" % (len(fileob.children),
|
||||
str(fileob)))
|
||||
self.files.clear()
|
||||
|
||||
for child in fileob.children:
|
||||
myiter = self.files.insert_before(None, None)
|
||||
self.files.set_value(myiter, 0, child)
|
||||
self.files.set_value(myiter, 1, child.parent_id \
|
||||
if child.parent_id != 1 else None)
|
||||
self.files.set_value(myiter, 2, child.filename)
|
||||
self.files.set_value(myiter, 3, child.filepath)
|
||||
self.files.set_value(myiter, 4, child.size)
|
||||
self.files.set_value(myiter, 5, child.date)
|
||||
self.files.set_value(myiter, 6, 1)
|
||||
self.files.set_value(myiter, 7, gtk.STOCK_DIRECTORY \
|
||||
if child.type == 1 else gtk.STOCK_FILE)
|
||||
|
||||
def get_value(self, row=None, fiter=None, column=0):
|
||||
"""
|
||||
TODO:
|
||||
"""
|
||||
if row:
|
||||
fiter = self.files.get_iter(row)
|
||||
if not fiter:
|
||||
LOG.error("ERROR: there is no way to determine gtk_iter object!"
|
||||
" Please specify valid row or gtk_iter!")
|
||||
return None
|
||||
|
||||
return self.files.get_value(fiter, column)
|
||||
@@ -77,6 +77,10 @@ class MainModel(ModelMT):
|
||||
LOG.warn("db file '%s' doesn't exist.", filename)
|
||||
return False
|
||||
|
||||
if not os.path.isfile(filename):
|
||||
LOG.warn("db file '%s' is not a regular file.", filename)
|
||||
return False
|
||||
|
||||
self.cat_fname = filename
|
||||
|
||||
if self._open_or_decompress():
|
||||
|
||||
423
pygtktalog/scan.py
Normal file
423
pygtktalog/scan.py
Normal file
@@ -0,0 +1,423 @@
|
||||
"""
|
||||
Project: pyGTKtalog
|
||||
Description: Filesystem scan and file automation layer
|
||||
Type: core
|
||||
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
|
||||
Created: 2011-03-27
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
import magic
|
||||
|
||||
from pygtktalog.dbobjects import File
|
||||
from pygtktalog.logger import get_logger
|
||||
LOG = get_logger(__name__)
|
||||
|
||||
|
||||
class NoAccessError(Exception):
|
||||
pass
|
||||
|
||||
class Scan(object):
|
||||
"""
|
||||
Retrieve and identify all files recursively on given path
|
||||
"""
|
||||
def __init__(self, path):
|
||||
"""
|
||||
Initialize
|
||||
"""
|
||||
self.abort = False
|
||||
self.path = path.rstrip(os.path.sep)
|
||||
self._items = []
|
||||
self.magic = magic.open(magic.MIME)
|
||||
self.magic.load()
|
||||
|
||||
def add_files(self):
|
||||
"""
|
||||
Returns list, which contain object, modification date and file
|
||||
size.
|
||||
@Arguments:
|
||||
@path - string with initial root directory to scan
|
||||
"""
|
||||
self._items = []
|
||||
LOG.debug("given path: %s" % self.path)
|
||||
|
||||
# See, if file exists. If not it would raise OSError exception
|
||||
os.stat(self.path)
|
||||
|
||||
if not os.access(self.path, os.R_OK|os.X_OK) \
|
||||
or not os.path.isdir(self.path):
|
||||
raise NoAccessError("Access to %s is forbidden" % self.path)
|
||||
|
||||
directory = os.path.basename(self.path)
|
||||
path = os.path.dirname(self.path)
|
||||
if not self._recursive(None, directory, path, 0, 0, 1):
|
||||
return None
|
||||
|
||||
return self._items
|
||||
|
||||
def _get_dirsize(self, path):
|
||||
"""
|
||||
Returns sum of all files under specified path (also in subdirs)
|
||||
"""
|
||||
|
||||
size = 0
|
||||
|
||||
for root, dirs, files in os.walk(path):
|
||||
for fname in files:
|
||||
try:
|
||||
size += os.stat(os.path.join(root, fname)).st_size
|
||||
except OSError:
|
||||
LOG.info("Cannot access file %s" % \
|
||||
os.path.join(root, fname))
|
||||
|
||||
return size
|
||||
|
||||
def _gather_information(self, fobj):
|
||||
"""
|
||||
Try to guess type and gather information about File object if possible
|
||||
"""
|
||||
fp = os.path.join(fobj.filepath.encode(sys.getfilesystemencoding()),
|
||||
fobj.filename.encode(sys.getfilesystemencoding()))
|
||||
import mimetypes
|
||||
print mimetypes.guess_type(fp)
|
||||
|
||||
def _mk_file(self, fname, path, parent):
|
||||
"""
|
||||
Create and return File object
|
||||
"""
|
||||
fullpath = os.path.join(path, fname)
|
||||
|
||||
fname = fname.decode(sys.getfilesystemencoding())
|
||||
path = path.decode(sys.getfilesystemencoding())
|
||||
fob = File(filename=fname, path=path)
|
||||
fob.date = datetime.fromtimestamp(os.stat(fullpath).st_mtime)
|
||||
fob.size = os.stat(fullpath).st_size
|
||||
fob.parent = parent
|
||||
fob.type = 2
|
||||
|
||||
if not parent:
|
||||
fob.parent_id = 1
|
||||
|
||||
self._items.append(fob)
|
||||
return fob
|
||||
|
||||
def _recursive(self, parent, fname, path, date, size, ftype):
|
||||
"""
|
||||
Do the walk through the file system
|
||||
@Arguments:
|
||||
@parent - directory File object which is parent for the current
|
||||
scope
|
||||
@fname - string that hold filename
|
||||
@path - full path for further scanning
|
||||
@date -
|
||||
@size - size of the object
|
||||
@ftype -
|
||||
"""
|
||||
if self.abort:
|
||||
return False
|
||||
|
||||
LOG.debug("args: fname: %s, path: %s" % (fname, path))
|
||||
fullpath = os.path.join(path, fname)
|
||||
|
||||
parent = self._mk_file(fname, path, parent)
|
||||
parent.size = self._get_dirsize(fullpath)
|
||||
parent.type = 1
|
||||
|
||||
root, dirs, files = os.walk(fullpath).next()
|
||||
for fname in files:
|
||||
fpath = os.path.join(root, fname)
|
||||
fob = self._mk_file(fname, root, parent)
|
||||
if os.path.islink(fpath):
|
||||
fob.filename = fob.filename + " -> " + os.readlink(fpath)
|
||||
fob.type = 3
|
||||
size += fob.size
|
||||
else:
|
||||
self._gather_information(fob)
|
||||
|
||||
for dirname in dirs:
|
||||
dirpath = os.path.join(root, dirname)
|
||||
|
||||
if not os.access(dirpath, os.R_OK|os.X_OK):
|
||||
LOG.info("Cannot access directory %s" % dirpath)
|
||||
continue
|
||||
|
||||
if os.path.islink(dirpath):
|
||||
fob = self._mk_file(dirname, root, parent)
|
||||
fob.filename = fob.filename + " -> " + os.readlink(dirpath)
|
||||
fob.type = 3
|
||||
else:
|
||||
LOG.debug("going into %s" % dirname)
|
||||
self._recursive(parent, dirname, fullpath, date, size, ftype)
|
||||
|
||||
LOG.debug("size of items: %s" % parent.size)
|
||||
return True
|
||||
|
||||
class asdScan(object):
|
||||
"""
|
||||
Retrieve and identify all files recursively on given path
|
||||
"""
|
||||
def __init__(self, path, tree_model):
|
||||
LOG.debug("initialization")
|
||||
self.path = path
|
||||
self.abort = False
|
||||
self.label = None
|
||||
self.DIR = None
|
||||
self.source = None
|
||||
|
||||
def scan(self):
|
||||
"""
|
||||
scan content of the given path
|
||||
"""
|
||||
self.busy = True
|
||||
|
||||
# count files in directory tree
|
||||
LOG.info("Calculating number of files in directory tree...")
|
||||
|
||||
step = 0
|
||||
try:
|
||||
for root, dirs, files in os.walk(self.path):
|
||||
step += len(files)
|
||||
except Exception, ex:
|
||||
LOG.warning("exception on file %s: %s: %s" \
|
||||
% (self.path, ex.__class__.__name__, str(ex)))
|
||||
pass
|
||||
|
||||
step = 1 / float(step or 1)
|
||||
|
||||
self.count = 0
|
||||
|
||||
def _recurse(parent_id, name, path, date, size, filetype,
|
||||
discs_tree_iter=None):
|
||||
"""recursive scans given path"""
|
||||
if self.abort:
|
||||
return -1
|
||||
|
||||
_size = size
|
||||
|
||||
if parent_id == 1:
|
||||
sql = """INSERT INTO
|
||||
files(parent_id, filename, filepath, date,
|
||||
size, type, source)
|
||||
VALUES(?,?,?,?,?,?,?)"""
|
||||
print(sql, (parent_id, name, path, date, size,
|
||||
filetype, self.source))
|
||||
else:
|
||||
sql = """INSERT INTO
|
||||
files(parent_id, filename, filepath, date, size, type)
|
||||
VALUES(?,?,?,?,?,?)"""
|
||||
print(sql, (parent_id, name, path,
|
||||
date, size, filetype))
|
||||
|
||||
sql = """SELECT seq FROM sqlite_sequence WHERE name='files'"""
|
||||
print(sql)
|
||||
currentid = None #db_cursor.fetchone()[0]
|
||||
|
||||
try:
|
||||
root, dirs, files = os.walk(path).next()
|
||||
except:
|
||||
LOG.debug("cannot access ", path)
|
||||
return 0
|
||||
|
||||
#############
|
||||
# directories
|
||||
for i in dirs:
|
||||
j = i #j = self.__decode_filename(i)
|
||||
current_dir = os.path.join(root, i)
|
||||
|
||||
try:
|
||||
st = os.stat(current_dir)
|
||||
st_mtime = st.st_mtime
|
||||
except OSError:
|
||||
st_mtime = 0
|
||||
|
||||
# do NOT follow symbolic links
|
||||
if os.path.islink(current_dir):
|
||||
l = self.__decode_filename(os.readlink(current_dir))
|
||||
|
||||
sql = """INSERT INTO
|
||||
files(parent_id, filename, filepath, date, size, type)
|
||||
VALUES(?,?,?,?,?,?)"""
|
||||
print(sql, (currentid, j + " -> " + l,
|
||||
current_dir, st_mtime, 0,
|
||||
self.LIN))
|
||||
dirsize = 0
|
||||
else:
|
||||
myit = None
|
||||
dirsize = _recurse(currentid, j, current_dir,
|
||||
st_mtime, 0, self.DIR, myit)
|
||||
|
||||
if dirsize == -1:
|
||||
break
|
||||
else:
|
||||
_size = _size + dirsize
|
||||
|
||||
########
|
||||
# files:
|
||||
for i in files:
|
||||
if self.abort:
|
||||
break
|
||||
|
||||
self.count = self.count + 1
|
||||
current_file = os.path.join(root, i)
|
||||
|
||||
try:
|
||||
st = os.stat(current_file)
|
||||
st_mtime = st.st_mtime
|
||||
st_size = st.st_size
|
||||
except OSError:
|
||||
st_mtime = 0
|
||||
st_size = 0
|
||||
|
||||
_size = _size + st_size
|
||||
j = i #self.__decode_filename(i)
|
||||
|
||||
# do NOT follow symbolic links
|
||||
if os.path.islink(current_file):
|
||||
l = self.__decode_filename(os.readlink(current_file))
|
||||
sql = """INSERT INTO
|
||||
files(parent_id, filename, filepath, date, size, type)
|
||||
VALUES(?,?,?,?,?,?)"""
|
||||
print(sql, (currentid, j + " -> " + l,
|
||||
current_file, st_mtime, 0,
|
||||
self.LIN))
|
||||
else:
|
||||
sql = """INSERT INTO
|
||||
files(parent_id, filename, filepath, date, size, type)
|
||||
VALUES(?,?,?,?,?,?)"""
|
||||
print(sql, (currentid, j, current_file,
|
||||
st_mtime, st_size, self.FIL))
|
||||
|
||||
if self.count % 32 == 0:
|
||||
update = True
|
||||
else:
|
||||
update = False
|
||||
|
||||
###########################
|
||||
# fetch details about files
|
||||
if self.config.confd['retrive']:
|
||||
update = True
|
||||
exif = None
|
||||
|
||||
sql = """SELECT seq FROM sqlite_sequence
|
||||
WHERE name='files'"""
|
||||
print(sql)
|
||||
fileid = 1 # dummy!
|
||||
|
||||
ext = i.split('.')[-1].lower()
|
||||
|
||||
# Video
|
||||
if ext in self.MOV:
|
||||
v = Video(current_file)
|
||||
cfn = v.capture()
|
||||
img = Img(cfn, self.image_path)
|
||||
th = img.save()
|
||||
if th:
|
||||
sql = """INSERT INTO
|
||||
thumbnails(file_id, filename)
|
||||
VALUES(?, ?)"""
|
||||
print(sql, (fileid, th + "_t"))
|
||||
sql = """INSERT INTO images(file_id, filename)
|
||||
VALUES(?, ?)"""
|
||||
print(sql, (fileid, th))
|
||||
os.unlink(cfn)
|
||||
|
||||
# Images - thumbnails and exif data
|
||||
if self.config.confd['thumbs'] and ext in self.IMG:
|
||||
thumb = Thumbnail(current_file, self.image_path)
|
||||
th, exif = thumb.save()
|
||||
if th:
|
||||
sql = """INSERT INTO
|
||||
thumbnails(file_id, filename)
|
||||
VALUES(?, ?)"""
|
||||
print(sql, (fileid, th))
|
||||
|
||||
# exif - store data in exif table
|
||||
jpg = ['jpg', 'jpeg']
|
||||
if self.config.confd['exif'] and ext in jpg:
|
||||
p = None
|
||||
if self.config.confd['thumbs'] and exif:
|
||||
p = ParseExif(exif_dict=exif)
|
||||
else:
|
||||
p = ParseExif(exif_file=current_file)
|
||||
if not p.exif_dict:
|
||||
p = None
|
||||
if p:
|
||||
p = p.parse()
|
||||
p = list(p)
|
||||
p.insert(0, fileid)
|
||||
sql = """INSERT INTO exif (file_id,
|
||||
camera,
|
||||
date,
|
||||
aperture,
|
||||
exposure_program,
|
||||
exposure_bias,
|
||||
iso,
|
||||
focal_length,
|
||||
subject_distance,
|
||||
metering_mode,
|
||||
flash,
|
||||
light_source,
|
||||
resolution,
|
||||
orientation)
|
||||
values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""
|
||||
print(sql, (tuple(p)))
|
||||
|
||||
# gthumb - save comments from gThumb program
|
||||
if self.config.confd['gthumb']:
|
||||
gt = GthumbCommentParser(root, i)
|
||||
cmnts = gt.parse()
|
||||
if cmnts:
|
||||
sql = """insert into gthumb(file_id,
|
||||
note,
|
||||
place,
|
||||
date)
|
||||
values(?,?,?,?)"""
|
||||
print(sql, (fileid,
|
||||
cmnts['note'],
|
||||
cmnts['place'],
|
||||
cmnts['date']))
|
||||
if 'keywords' in cmnts:
|
||||
# TODO: add gthumb keywords to tags
|
||||
pass
|
||||
|
||||
# Extensions - user defined actions
|
||||
if ext in self.config.confd['extensions'].keys():
|
||||
cmd = self.config.confd['extensions'][ext]
|
||||
arg = current_file.replace('"', '\\"')
|
||||
output = os.popen(cmd % arg).readlines()
|
||||
desc = ''
|
||||
for line in output:
|
||||
desc += line
|
||||
|
||||
sql = """UPDATE files SET description=?
|
||||
WHERE id=?"""
|
||||
print(sql, (desc, fileid))
|
||||
|
||||
### end of scan
|
||||
if update:
|
||||
self.statusmsg = "Scannig: %s" % current_file
|
||||
self.progress = step * self.count
|
||||
|
||||
sql = """UPDATE files SET size=? WHERE id=?"""
|
||||
print(sql, (_size, currentid))
|
||||
if self.abort:
|
||||
return -1
|
||||
else:
|
||||
return _size
|
||||
|
||||
if _recurse(1, self.label, self.path, 0, 0, self.DIR) == -1:
|
||||
LOG.debug("interrupted self.abort = True")
|
||||
else:
|
||||
LOG.debug("recursive goes without interrupt")
|
||||
if self.currentid:
|
||||
LOG.debug("removing old branch")
|
||||
self.statusmsg = "Removing old branch..."
|
||||
self.currentid = None
|
||||
|
||||
self.busy = False
|
||||
|
||||
# refresh discs tree
|
||||
self.statusmsg = "Idle"
|
||||
self.progress = 0
|
||||
self.abort = False
|
||||
8
test/unit/main_view_test.py
Normal file
8
test/unit/main_view_test.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""
|
||||
Project: pyGTKtalog
|
||||
Description: Tests for main view class.
|
||||
Type: test
|
||||
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
|
||||
Created: 2010-03-14
|
||||
"""
|
||||
|
||||
79
test/unit/scan_test.py
Normal file
79
test/unit/scan_test.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""
|
||||
Project: pyGTKtalog
|
||||
Description: Tests for scan files.
|
||||
Type: test
|
||||
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
|
||||
Created: 2011-03-26
|
||||
"""
|
||||
import os
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
from pygtktalog import scan
|
||||
|
||||
|
||||
class TestScan(unittest.TestCase):
|
||||
"""
|
||||
Testcases for scan functionality
|
||||
|
||||
1. execution scan function:
|
||||
1.1 simple case - should pass
|
||||
1.2 non-existent directory passed
|
||||
1.3 file passed
|
||||
1.4 directory has permission that forbids file listing
|
||||
|
||||
2. rescan directory; looking for changes
|
||||
2.0 don't touch records for changed files (same directories, same
|
||||
filename, same type and size)
|
||||
2.1 search for files of the same type, same size.
|
||||
2.2 change parent node for moved files (don't insert new)
|
||||
|
||||
3. adding new directory tree which contains same files like already stored
|
||||
in the database
|
||||
"""
|
||||
|
||||
def test_happy_scenario(self):
|
||||
"""
|
||||
make scan, count items
|
||||
"""
|
||||
scanob = scan.Scan(os.path.abspath(os.path.join(__file__,
|
||||
"../../../mocks")))
|
||||
scanob = scan.Scan("/mnt/data/_test_/test_dir")
|
||||
result_list = scanob.add_files()
|
||||
self.assertEqual(len(result_list), 143)
|
||||
self.assertEqual(len(result_list[0].children), 8)
|
||||
# check soft links
|
||||
self.assertEqual(len([x for x in result_list if x.type == 3]), 2)
|
||||
|
||||
def test_wrong_and_nonexistent(self):
|
||||
"""
|
||||
Check for accessing non existent directory, regular file instead of
|
||||
the directory, or file.directory with no access to it.
|
||||
"""
|
||||
scanobj = scan.Scan('/nonexistent_directory_')
|
||||
self.assertRaises(OSError, scanobj.add_files)
|
||||
|
||||
scanobj.path = '/root'
|
||||
self.assertRaises(scan.NoAccessError, scanobj.add_files)
|
||||
|
||||
scanobj.path = '/bin/sh'
|
||||
self.assertRaises(scan.NoAccessError, scanobj.add_files)
|
||||
|
||||
|
||||
# dir contains some non accessable items. Should just pass, and on
|
||||
# logs should be messages about it
|
||||
logging.basicConfig(level=logging.CRITICAL)
|
||||
scanobj.path = "/mnt/data/_test_/test_dir_permissions/"
|
||||
scanobj.add_files()
|
||||
|
||||
def test_abort_functionality(self):
|
||||
scanobj = scan.Scan("/mnt/data/_test_/test_dir")
|
||||
scanobj.abort = True
|
||||
self.assertEqual(None, scanobj.add_files())
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
os.chdir(os.path.join(os.path.abspath(os.path.dirname(__file__)), "../"))
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user