#!/usr/bin/env python """ Fast and ugly CLI interface for pyGTKtalog """ from argparse import ArgumentParser import errno import os import re import sys from sqlalchemy import or_ from pygtktalog import scan from pygtktalog import misc from pygtktalog.dbobjects import File, Config from pygtktalog.dbcommon import connect, Session BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30, 38) RESET_SEQ = "\033[0m" COLOR_SEQ = "\033[1;%dm" BOLD_SEQ = "\033[1m" def colorize(txt, color): """Pretty print with colors to console.""" color_map = {"black": BLACK, "red": RED, "green": GREEN, "yellow": YELLOW, "blue": BLUE, "magenta": MAGENTA, "cyan": CYAN, "white": WHITE} return COLOR_SEQ % color_map[color] + txt + RESET_SEQ class Iface(object): """Main class which interacts with the pyGTKtalog modules""" def __init__(self, dbname, pretend=False, debug=False): """Init""" self.engine = connect(dbname) self.sess = Session() self.dry_run = pretend self.root = None self._dbname = dbname if debug: scan.LOG.setLevel("DEBUG") def _resolve_path(self, path): """Identify path in the DB""" if not path.startswith("/"): raise AttributeError("Path have to start with slash (/)") last_node = self.root for part in path.split("/"): if not part.strip(): continue for node in last_node.children: if node.filename == part: last_node = node break else: raise AttributeError("No such path: %s" % path) return last_node def _get_full_path(self, file_object): """given the file object, return string with full path to it""" parent = file_object.parent path = [file_object.filename] while parent.type: path.insert(0, parent.filename) parent = parent.parent return u"/" + u"/".join(path) def _make_path(self, node): """Make the path to the item in the DB""" if node.parent == node: return "/" ext = "" if node.parent.type == 0: ext = colorize(" (%s)" % node.filepath, "white") path = [] path.append(node.filename) while node.parent != self.root: path.append(node.parent.filename) node = node.parent return "/".join([""] + path[::-1]) + ext def _walk(self, dirnode): """Recursively go through the leaves of the node""" items = [] for node in dirnode.children: if node.type == 1: items += self._walk(node) items.append(" " + self._make_path(node)) items.sort() return items def _list(self, node): """List only current node content""" items = [] for node in node.children: if node != self.root: items.append(" " + self._make_path(node)) items.sort() return items def close(self): """Close the session""" self.sess.commit() self.sess.close() def list(self, path=None, recursive=False): """Simulate ls command for the provided item path""" self.root = self.sess.query(File).filter(File.type==0).first() if path: node = self._resolve_path(path) msg = "Content of path `%s':" % path else: node = self.root msg = "Content of path `/':" print colorize(msg, "white") if recursive: items = self._walk(node) else: items = self._list(node) print "\n".join(items) def update(self, path, dir_to_update=None): """ Update the DB against provided path and optionally directory on the real filesystem """ self.root = self.sess.query(File).filter(File.type==0).first() node = self._resolve_path(path) if node == self.root: print colorize("Cannot update entire db, since root was provided " "as path.", "red") return if not dir_to_update: dir_to_update = os.path.join(node.filepath, node.filename) if not os.path.exists(dir_to_update): raise OSError("Path to updtate doesn't exists: %s", dir_to_update) print colorize("Updating node `%s' against directory " "`%s'" % (path, dir_to_update), "white") if not self.dry_run: scanob = scan.Scan(dir_to_update) # scanob.update_files(node.id) scanob.update_files(node.id, self.engine) def create(self, dir_to_add, data_dir): """Create new database""" self.root = File() self.root.id = 1 self.root.filename = 'root' self.root.size = 0 self.root.source = 0 self.root.type = 0 self.root.parent_id = 1 config = Config() config.key = "image_path" config.value = data_dir if not self.dry_run: self.sess.add(self.root) self.sess.add(config) self.sess.commit() print colorize("Creating new db against directory `%s'" % dir_to_add, "white") if not self.dry_run: if data_dir == ":same_as_db:": misc.calculate_image_path(None, True) else: misc.calculate_image_path(data_dir, True) scanob = scan.Scan(dir_to_add) scanob.add_files(self.engine) def add(self, dir_to_add): """Add new directory to the db""" self.root = self.sess.query(File).filter(File.type==0).first() if not os.path.exists(dir_to_add): raise OSError("Path to add doesn't exists: %s", dir_to_add) print colorize("Adding directory `%s'" % dir_to_add, "white") if not self.dry_run: scanob = scan.Scan(dir_to_add) scanob.add_files() def _annotate(self, item, search_words): """ Find ranges to be highlighted in item, provide them and return result string """ indexes = [] for word in search_words: for match in re.finditer(re.escape(word.lower()), item.lower()): for index in range(match.start(), match.end()): indexes.append(index) highlight = False result = [] for idx, char in enumerate(item): if idx in indexes: if not highlight: highlight = True result.append(COLOR_SEQ % WHITE) result.append(char) else: if highlight: highlight = False result.append(RESET_SEQ) result.append(char) return "".join(result) def find(self, search_words): query = self.sess.query(File).filter(or_(File.type == 2, File.type == 3)) result = [] for word in search_words: phrase = u"%%%s%%" % word.decode('utf-8') query = query.filter(File.filename.like(phrase)) for item in query.all(): result.append(self._get_full_path(item)) if not result: print "No results for `%s'" % " ".join(search_words) return result.sort() for item in result: print self._annotate(item, search_words) def list_db(args): """List""" if not os.path.exists(args.db): print colorize("File `%s' does not exists!" % args.db, "red") sys.exit(1) obj = Iface(args.db, False, args.debug) obj.list(path=args.path, recursive=args.recursive) obj.close() def update_db(args): """Update""" if not os.path.exists(args.db): print colorize("File `%s' does not exists!" % args.db, "red") sys.exit(1) obj = Iface(args.db, args.pretend, args.debug) obj.update(args.path, dir_to_update=args.dir_to_update) obj.close() def add_dir(args): """Add""" if not os.path.exists(args.db): print colorize("File `%s' does not exists!" % args.db, "red") sys.exit(1) obj = Iface(args.db, args.pretend, args.debug) obj.add(args.dir_to_add) obj.close() def create_db(args): """List""" if os.path.exists(args.db): print colorize("File `%s' exists!" % args.db, "yellow") obj = Iface(args.db, args.pretend, args.debug) obj.create(args.dir_to_add, args.imagedir) obj.close() def search(args): if not os.path.exists(args.db): print colorize("File `%s' does not exists!" % args.db, "red") sys.exit(1) obj = Iface(args.db, False, args.debug) obj.find(args.search_words) obj.close() def main(): """Main""" parser = ArgumentParser() subparser = parser.add_subparsers() list_ = subparser.add_parser("list") list_.add_argument("db") list_.add_argument("path", nargs="?") list_.add_argument("-r", "--recursive", help="list items in " "subdirectories", action="store_true", default=False) list_.add_argument("-d", "--debug", help="Turn on debug", action="store_true", default=False) list_.set_defaults(func=list_db) update = subparser.add_parser("update") update.add_argument("db") update.add_argument("path") update.add_argument("dir_to_update", nargs="?") update.add_argument("-p", "--pretend", help="Don't do the action, just " "give the info what would gonna to happen.", action="store_true", default=False) update.add_argument("-d", "--debug", help="Turn on debug", action="store_true", default=False) update.set_defaults(func=update_db) create = subparser.add_parser("create") create.add_argument("db") create.add_argument("dir_to_add") create.add_argument("-i", "--imagedir", help="Directory where to put " "images for the database. Popular, but deprecated " "choice is `~/.pygtktalog/images'. Currnet default " "is special string `:same_as_db:' which will try to " "create directory with the same name as the db with " "data suffix", default=":same_as_db:") create.add_argument("-p", "--pretend", help="Don't do the action, just " "give the info what would gonna to happen.", action="store_true", default=False) create.add_argument("-d", "--debug", help="Turn on debug", action="store_true", default=False) create.set_defaults(func=create_db) add = subparser.add_parser("add") add.add_argument("db") add.add_argument("dir_to_add") add.add_argument("-p", "--pretend", help="Don't do the action, just " "give the info what would gonna to happen.", action="store_true", default=False) add.add_argument("-d", "--debug", help="Turn on debug", action="store_true", default=False) add.set_defaults(func=add_dir) find = subparser.add_parser("find") find.add_argument("db") find.add_argument("search_words", nargs="+") find.add_argument("-d", "--debug", help="Turn on debug", action="store_true", default=False) find.set_defaults(func=search) args = parser.parse_args() args.func(args) if __name__ == "__main__": main()