1
0
mirror of https://github.com/gryf/pygtktalog.git synced 2025-12-17 11:30:19 +01:00
Files
pygtktalog/pycatalog/__init__.py

396 lines
13 KiB
Python

"""
Fast and ugly CLI interface
"""
import argparse
import os
import re
from sqlalchemy import or_
from pycatalog import scan
from pycatalog import misc
from pycatalog import dbobjects as dbo
from pycatalog import dbcommon
from pycatalog import logger
LOG = logger.get_logger()
TYPE_MAP = {0: 'd', 1: 'd', 2: 'f', 3: 'l'}
class Iface(object):
"""Main class which interacts with the pyGTKtalog modules"""
def __init__(self, dbname, pretend=False, debug=False, use_color=True):
"""Init"""
self.engine = dbcommon.connect(dbname)
self.sess = dbcommon.Session()
self.dry_run = pretend
self.root = None
self._dbname = dbname
self.use_color = use_color
if debug:
scan.LOG.setLevel('DEBUG')
LOG.setLevel('DEBUG')
def _resolve_path(self, path):
"""Identify path in the DB"""
if not path.startswith("/"):
raise AttributeError('Path have to start with slash (/)')
last_node = self.root
for part in path.split('/'):
if not part.strip():
continue
for node in last_node.children:
if node.filename == part:
last_node = node
break
else:
raise AttributeError('No such path: %s' % path)
return last_node
def _get_full_path(self, file_object):
"""given the file object, return string with full path to it"""
parent = file_object.parent
path = [file_object.filename]
while parent.type:
path.insert(0, parent.filename)
parent = parent.parent
return u'/' + u'/'.join(path)
def _make_path(self, node):
"""Make the path to the item in the DB"""
orig_node = node
if node.parent == node:
return {'/': (u' ', 0, u' ')}
ext = ''
if node.parent.type == dbo.TYPE['root'] and self.use_color:
ext = misc.colorize(' (%s)' % node.filepath, 'white')
path = []
path.append(node.filename)
while node.parent != self.root:
path.append(node.parent.filename)
node = node.parent
path = '/'.join([''] + path[::-1]) + ext
return {path: (TYPE_MAP[orig_node.type],
orig_node.size,
orig_node.date)}
def _walk(self, dirnode):
"""Recursively go through the leaves of the node"""
items = {}
for node in dirnode.children:
if node.type == dbo.TYPE['dir']:
items.update(self._walk(node))
items.update(self._make_path(node))
return items
def _list(self, node):
"""List only current node content"""
items = {}
for node in node.children:
if node != self.root:
items.update(self._make_path(node))
return items
def close(self):
"""Close the session"""
self.sess.commit()
self.sess.close()
def list(self, path=None, recursive=False, long_=False, mode='plain'):
"""Simulate ls command for the provided item path"""
if mode == 'mc':
self.use_color = False
self.root = self.sess.query(dbo.File)
self.root = self.root.filter(dbo.File.type == dbo.TYPE['root']).first()
if path:
node = self._resolve_path(path)
msg = "Content of path `%s':" % path
else:
node = self.root
msg = "Content of path `/':"
if mode != 'mc':
print(misc.colorize(msg, 'white'))
if recursive:
items = self._walk(node)
else:
items = self._list(node)
if mode == 'mc':
filenames = []
format_str = ('{} 1 {} {} {:>%d} {} {}' %
len(str(sorted([i[1] for i in
items.values()])[-1])))
for fname in sorted(items.keys()):
type_, size, date = items[fname]
if type_ == 'd':
perms = 'drwxrwxrwx'
elif type_ == 'l':
perms = 'lrw-rw-rw-'
elif type_ == 'f':
perms = '-rw-rw-rw-'
else:
continue
filenames.append(format_str
.format(perms, os.getuid(),
os.getgid(), size,
date.strftime('%d/%m/%Y %H:%M:%S'),
fname))
elif long_:
filenames = []
format_str = ('{} {:>%d,} {} {}' %
_get_highest_size_length(items))
for fname in sorted(items.keys()):
type_, size, date = items[fname]
filenames.append(format_str.format(type_, size, date, fname))
else:
filenames = sorted(items.keys())
print('\n'.join(filenames))
def update(self, path, dir_to_update=None):
"""
Update the DB against provided path and optionally directory on the
real filesystem
"""
self.root = self.sess.query(dbo.File)
self.root = self.root.filter(dbo.File.type == dbo.TYPE['root']).first()
node = self._resolve_path(path)
if node == self.root:
print(misc.colorize('Cannot update entire db, since root was '
'provided as path.', 'red'))
return
if not dir_to_update:
dir_to_update = os.path.join(node.filepath, node.filename)
if not os.path.exists(dir_to_update):
raise OSError("Path to updtate doesn't exists: %s", dir_to_update)
print(misc.colorize("Updating node `%s' against directory "
"`%s'" % (path, dir_to_update), 'white'))
if not self.dry_run:
scanob = scan.Scan(dir_to_update)
# scanob.update_files(node.id)
scanob.update_files(node.id, self.engine)
def create(self, dir_to_add, label=None):
"""Create new database"""
self.root = dbo.File()
self.root.id = 1
self.root.filename = 'root'
self.root.size = 0
self.root.source = 0
self.root.type = 0
self.root.parent_id = 1
if not self.dry_run:
self.sess.add(self.root)
self.sess.commit()
print(misc.colorize("Creating new db against directory `%s'" %
dir_to_add, 'white'))
if not self.dry_run:
scanob = scan.Scan(dir_to_add)
scanob.add_files(label=label)
def add(self, dir_to_add, label=None):
"""Add new directory to the db"""
self.root = self.sess.query(dbo.File)
self.root = self.root.filter(dbo.File.type == 0).first()
if not os.path.exists(dir_to_add):
raise OSError("Path to add doesn't exists: %s", dir_to_add)
print(misc.colorize("Adding directory `%s'" % dir_to_add, 'white'))
if not self.dry_run:
scanob = scan.Scan(dir_to_add)
scanob.add_files(label=label)
def _annotate(self, item, search_words):
"""
Find ranges to be highlighted in item, provide them and return result
string
"""
indexes = []
for word in search_words:
for match in re.finditer(re.escape(word.lower()), item.lower()):
for index in range(match.start(), match.end()):
indexes.append(index)
highlight = False
result = []
for idx, char in enumerate(item):
if idx in indexes:
if not highlight:
highlight = True
result.append(misc.COLOR_SEQ % misc.WHITE)
result.append(char)
else:
if highlight:
highlight = False
result.append(misc.RESET_SEQ)
result.append(char)
return "".join(result)
def find(self, search_words):
query = self.sess.query(dbo.File).filter(or_(dbo.File.type == 2,
dbo.File.type == 3))
result = []
for word in search_words:
phrase = u'%%%s%%' % word
query = query.filter(dbo.File.filename.like(phrase))
for item in query.all():
result.append(self._get_full_path(item))
if not result:
print("No results for `%s'" % ' '.join(search_words))
return
result.sort()
for item in result:
print(self._annotate(item, search_words))
def _get_highest_size_length(item_dict):
highest = len(str(sorted([i[1] for i in item_dict.values()])[-1]))
return highest + highest / 3
@misc.asserdb
def list_db(args):
"""List"""
if args.mode == 'mc':
LOG.setLevel(100) # supress logging
obj = Iface(args.db, False, args.debug)
obj.list(path=args.path, recursive=args.recursive, long_=args.long,
mode=args.mode)
obj.close()
@misc.asserdb
def update_db(args):
"""Update"""
obj = Iface(args.db, args.pretend, args.debug)
obj.update(args.path, dir_to_update=args.dir_to_update)
obj.close()
@misc.asserdb
def add_dir(args):
"""Add"""
obj = Iface(args.db, args.pretend, args.debug)
obj.add(args.dir_to_add, args.label)
obj.close()
def create_db(args):
"""List"""
obj = Iface(args.db, args.pretend, args.debug)
obj.create(args.dir_to_add, args.label)
obj.close()
@misc.asserdb
def search(args):
"""Find"""
obj = Iface(args.db, False, args.debug)
obj.find(args.search_words)
obj.close()
def main():
"""Main"""
parser = argparse.ArgumentParser()
subparser = parser.add_subparsers()
list_ = subparser.add_parser('list')
list_.add_argument('db')
list_.add_argument('path', nargs='?')
list_.add_argument('-m', '--mode', help='List items using mode. By '
'default is simply plain mode, other possibility is to '
'use "mc" mode, which is suitable to use with extfs '
'plugin', default='plain')
list_.add_argument('-c', '--color', help='Use colors for listing',
action='store_true', default=False)
list_.add_argument('-l', '--long', help='Show size, date and type',
action='store_true', default=False)
list_.add_argument('-r', '--recursive', help='list items in '
'subdirectories', action='store_true', default=False)
list_.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
list_.set_defaults(func=list_db)
update = subparser.add_parser('update')
update.add_argument('db')
update.add_argument('path')
update.add_argument('dir_to_update', nargs='?')
update.add_argument('-p', '--pretend', help="Don't do the action, just "
"give the info what would gonna to happen.",
action='store_true', default=False)
update.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
update.set_defaults(func=update_db)
create = subparser.add_parser('create')
create.add_argument('db')
create.add_argument('dir_to_add')
create.add_argument('-l', '--label', help='Add label as the root item of '
'the added directory')
create.add_argument('-p', '--pretend', help="Don't do the action, just "
"give the info what would gonna to happen.",
action='store_true', default=False)
create.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
create.set_defaults(func=create_db)
add = subparser.add_parser('add')
add.add_argument('db')
add.add_argument('dir_to_add')
add.add_argument('-p', '--pretend', help="Don't do the action, just "
"give the info what would gonna to happen.",
action='store_true', default=False)
add.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
add.add_argument('-l', '--label', help='Add label as the root item of the '
'added directory')
add.set_defaults(func=add_dir)
find = subparser.add_parser('find')
find.add_argument('db')
find.add_argument('search_words', nargs='+')
find.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
find.set_defaults(func=search)
args = parser.parse_args()
if 'func' in args:
args.func(args)
else:
parser.print_help()
if __name__ == '__main__':
main()