1
0
mirror of https://github.com/gryf/pygtktalog.git synced 2025-12-17 11:30:19 +01:00

Moved pygtktalog to pycatalog.

Also, clean up setup things and imports.
This commit is contained in:
2022-09-24 18:32:16 +02:00
parent 10e7e87031
commit a1a17158bb
17 changed files with 96 additions and 493 deletions

520
pycatalog/__init__.py Normal file
View File

@@ -0,0 +1,520 @@
"""
Fast and ugly CLI interface
"""
import argparse
import os
import re
import sys
from sqlalchemy import or_
from pycatalog import scan
from pycatalog import misc
from pycatalog import dbobjects as dbo
from pycatalog.dbcommon import connect, Session
from pycatalog import logger
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30, 38)
RESET_SEQ = '\033[0m'
COLOR_SEQ = '\033[1;%dm'
BOLD_SEQ = '\033[1m'
LOG = logger.get_logger(__name__)
def colorize(txt, color):
"""Pretty print with colors to console."""
color_map = {'black': BLACK,
'red': RED,
'green': GREEN,
'yellow': YELLOW,
'blue': BLUE,
'magenta': MAGENTA,
'cyan': CYAN,
'white': WHITE}
return COLOR_SEQ % color_map[color] + txt + RESET_SEQ
def asserdb(func):
def wrapper(args):
if not os.path.exists(args.db):
print(colorize("File `%s' does not exists!" % args.db, 'red'))
sys.exit(1)
func(args)
return wrapper
TYPE_MAP = {0: 'd', 1: 'd', 2: 'f', 3: 'l'}
class Iface(object):
"""Main class which interacts with the pyGTKtalog modules"""
def __init__(self, dbname, pretend=False, debug=False):
"""Init"""
self.engine = connect(dbname)
self.sess = Session()
self.dry_run = pretend
self.root = None
self._dbname = dbname
if debug:
scan.LOG.setLevel('DEBUG')
LOG.setLevel('DEBUG')
def _resolve_path(self, path):
"""Identify path in the DB"""
if not path.startswith("/"):
raise AttributeError('Path have to start with slash (/)')
last_node = self.root
for part in path.split('/'):
if not part.strip():
continue
for node in last_node.children:
if node.filename == part:
last_node = node
break
else:
raise AttributeError('No such path: %s' % path)
return last_node
def _get_full_path(self, file_object):
"""given the file object, return string with full path to it"""
parent = file_object.parent
path = [file_object.filename]
while parent.type:
path.insert(0, parent.filename)
parent = parent.parent
return u'/' + u'/'.join(path)
def _make_path(self, node):
"""Make the path to the item in the DB"""
orig_node = node
if node.parent == node:
return {u'/': (u' ', 0, u' ')}
ext = ''
if node.parent.type == dbo.TYPE['root']:
ext = colorize(' (%s)' % node.filepath, 'white')
path = []
path.append(node.filename)
while node.parent != self.root:
path.append(node.parent.filename)
node = node.parent
path = '/'.join([''] + path[::-1]) + ext
return {path: (TYPE_MAP[orig_node.type],
orig_node.size,
orig_node.date)}
def _walk(self, dirnode):
"""Recursively go through the leaves of the node"""
items = {}
for node in dirnode.children:
if node.type == dbo.TYPE['dir']:
items.update(self._walk(node))
items.update(self._make_path(node))
return items
def _list(self, node):
"""List only current node content"""
items = {}
for node in node.children:
if node != self.root:
items.update(self._make_path(node))
return items
def close(self):
"""Close the session"""
self.sess.commit()
self.sess.close()
def list(self, path=None, recursive=False, long_=False):
"""Simulate ls command for the provided item path"""
self.root = self.sess.query(dbo.File)
self.root = self.root.filter(dbo.File.type == dbo.TYPE['root']).first()
if path:
node = self._resolve_path(path)
msg = "Content of path `%s':" % path
else:
node = self.root
msg = "Content of path `/':"
print(colorize(msg, 'white'))
if recursive:
items = self._walk(node)
else:
items = self._list(node)
if long_:
filenames = []
format_str = (u'{} {:>%d,} {} {}' %
_get_highest_size_length(items))
for fname in sorted(items.keys()):
type_, size, date = items[fname]
filenames.append(format_str.format(type_, size, date, fname))
else:
filenames = sorted(items.keys())
print('\n'.join(filenames))
def update(self, path, dir_to_update=None):
"""
Update the DB against provided path and optionally directory on the
real filesystem
"""
self.root = self.sess.query(dbo.File)
self.root = self.root.filter(dbo.File.type == dbo.TYPE['root']).first()
node = self._resolve_path(path)
if node == self.root:
print(colorize('Cannot update entire db, since root was provided '
'as path.', 'red'))
return
if not dir_to_update:
dir_to_update = os.path.join(node.filepath, node.filename)
if not os.path.exists(dir_to_update):
raise OSError("Path to updtate doesn't exists: %s", dir_to_update)
print(colorize("Updating node `%s' against directory "
"`%s'" % (path, dir_to_update), 'white'))
if not self.dry_run:
scanob = scan.Scan(dir_to_update)
# scanob.update_files(node.id)
scanob.update_files(node.id, self.engine)
def create(self, dir_to_add, data_dir):
"""Create new database"""
self.root = dbo.File()
self.root.id = 1
self.root.filename = 'root'
self.root.size = 0
self.root.source = 0
self.root.type = 0
self.root.parent_id = 1
config = dbo.Config()
config.key = 'image_path'
config.value = data_dir
if not self.dry_run:
self.sess.add(self.root)
self.sess.add(config)
self.sess.commit()
print(colorize("Creating new db against directory `%s'" % dir_to_add,
'white'))
if not self.dry_run:
if data_dir == ':same_as_db:':
misc.calculate_image_path(None, True)
else:
misc.calculate_image_path(data_dir, True)
scanob = scan.Scan(dir_to_add)
scanob.add_files(self.engine)
def add(self, dir_to_add):
"""Add new directory to the db"""
self.root = self.sess.query(dbo.File)
self.root = self.root.filter(dbo.File.type == 0).first()
if not os.path.exists(dir_to_add):
raise OSError("Path to add doesn't exists: %s", dir_to_add)
print(colorize("Adding directory `%s'" % dir_to_add, 'white'))
if not self.dry_run:
scanob = scan.Scan(dir_to_add)
scanob.add_files()
def _annotate(self, item, search_words):
"""
Find ranges to be highlighted in item, provide them and return result
string
"""
indexes = []
for word in search_words:
for match in re.finditer(re.escape(word.lower()), item.lower()):
for index in range(match.start(), match.end()):
indexes.append(index)
highlight = False
result = []
for idx, char in enumerate(item):
if idx in indexes:
if not highlight:
highlight = True
result.append(COLOR_SEQ % WHITE)
result.append(char)
else:
if highlight:
highlight = False
result.append(RESET_SEQ)
result.append(char)
return "".join(result)
def find(self, search_words):
query = self.sess.query(dbo.File).filter(or_(dbo.File.type == 2,
dbo.File.type == 3))
result = []
for word in search_words:
phrase = u'%%%s%%' % word
query = query.filter(dbo.File.filename.like(phrase))
for item in query.all():
result.append(self._get_full_path(item))
if not result:
print("No results for `%s'" % ' '.join(search_words))
return
result.sort()
for item in result:
print(self._annotate(item, search_words))
def fsck(self):
"""Fsck orphaned images/thumbs"""
image_path = (self.sess.query(dbo.Config)
.filter(dbo.Config.key == 'image_path')).one().value
if image_path == ':same_as_db:':
image_path = misc.calculate_image_path(None, False)
files_to_remove = []
# remove images/thumbnails which doesn't have file relation
for name, obj in (("images", dbo.Image),
("thumbnails", dbo.Thumbnail)):
self._purge_orphaned_objects(obj, "Scanning %s " % name)
# find all image files not associate with either Image (image/thumb)
# or Thumbnail (thumb) objects
sys.stdout.write(40 * " " + "\r")
count = 0
for root, dirs, files in os.walk(image_path):
for fname in files:
sys.stdout.write("Scanning files " +
"| / - \\".split()[count % 4] + "\r")
sys.stdout.flush()
count += 1
fname_ = os.path.join(root.split(image_path)[1],
fname).lstrip('/')
if '_t' in fname:
obj = (self.sess.query(dbo.Thumbnail)
.filter(dbo.Thumbnail.filename == fname_)).all()
if obj:
continue
obj = (self.sess.query(dbo.Image)
.filter(dbo.Image.filename ==
fname_.replace('_t.', '.'))).all()
if obj:
continue
else:
obj = (self.sess.query(dbo.Image)
.filter(dbo.Image.filename == fname_)).all()
if obj:
continue
files_to_remove.append(os.path.join(root, fname))
LOG.debug("Found %d orphaned files", len(files_to_remove))
sys.stdout.write(40 * " " + "\r")
sys.stdout.flush()
if self.dry_run:
print("Following files are not associated to any items in the DB:")
for filename in sorted(files_to_remove):
print(filename)
self.sess.rollback()
else:
_remove_files(image_path, files_to_remove)
self.sess.commit()
def _purge_orphaned_objects(self, sa_class, msg):
"""Return tuple of lists of images that are orphaned"""
ids_to_remove = []
for count, item in enumerate(self.sess.query(sa_class).all()):
sys.stdout.write(msg + "| / - \\".split()[count % 4] + "\r")
if not item.file:
self.sess.delete(item)
ids_to_remove.append(item.id)
del item
sys.stdout.flush()
LOG.debug("Found %d orphaned object of class %s",
len(ids_to_remove), sa_class.__name__)
self.sess.flush()
def _remove_files(image_path, filenames):
"""Remove files and empty directories in provided location"""
count = 0
for count, fname in enumerate(filenames, start=1):
os.unlink(fname)
LOG.info("Removed %d orphaned files", count)
count = 0
for root, dirs, _ in os.walk(image_path):
for dirname in dirs:
try:
os.rmdir(os.path.join(root, dirname))
count += 1
except OSError:
pass
LOG.info("Removed %d empty directories", count)
def _get_highest_size_length(item_dict):
highest = len(str(sorted([i[1] for i in item_dict.values()])[-1]))
return highest + highest / 3
@asserdb
def list_db(args):
"""List"""
obj = Iface(args.db, False, args.debug)
obj.list(path=args.path, recursive=args.recursive, long_=args.long)
obj.close()
@asserdb
def update_db(args):
"""Update"""
obj = Iface(args.db, args.pretend, args.debug)
obj.update(args.path, dir_to_update=args.dir_to_update)
obj.close()
@asserdb
def add_dir(args):
"""Add"""
obj = Iface(args.db, args.pretend, args.debug)
obj.add(args.dir_to_add)
obj.close()
def create_db(args):
"""List"""
__import__('pdb').set_trace()
obj = Iface(args.db, args.pretend, args.debug)
obj.create(args.dir_to_add, args.imagedir)
obj.close()
@asserdb
def search(args):
"""Find"""
obj = Iface(args.db, False, args.debug)
obj.find(args.search_words)
obj.close()
@asserdb
def cleanup(args):
"""Cleanup"""
obj = Iface(args.db, False, args.debug)
obj.fsck()
obj.close()
def main():
"""Main"""
parser = argparse.ArgumentParser()
subparser = parser.add_subparsers()
list_ = subparser.add_parser('list')
list_.add_argument('db')
list_.add_argument('path', nargs='?')
list_.add_argument('-l', '--long', help='Show size, date and type',
action='store_true', default=False)
list_.add_argument('-r', '--recursive', help='list items in '
'subdirectories', action='store_true', default=False)
list_.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
list_.set_defaults(func=list_db)
update = subparser.add_parser('update')
update.add_argument('db')
update.add_argument('path')
update.add_argument('dir_to_update', nargs='?')
update.add_argument('-p', '--pretend', help="Don't do the action, just "
"give the info what would gonna to happen.",
action='store_true', default=False)
update.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
update.set_defaults(func=update_db)
create = subparser.add_parser('create')
create.add_argument('db')
create.add_argument('dir_to_add')
create.add_argument('-i', '--imagedir', help="Directory where to put "
"images for the database. Popular, but deprecated "
"choice is `~/.pygtktalog/images'. Current default "
"is special string `:same_as_db:' which will try to "
"create directory with the same name as the db with "
"data suffix", default=':same_as_db:')
create.add_argument('-p', '--pretend', help="Don't do the action, just "
"give the info what would gonna to happen.",
action='store_true', default=False)
create.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
create.set_defaults(func=create_db)
add = subparser.add_parser('add')
add.add_argument('db')
add.add_argument('dir_to_add')
add.add_argument('-p', '--pretend', help="Don't do the action, just "
"give the info what would gonna to happen.",
action='store_true', default=False)
add.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
add.set_defaults(func=add_dir)
find = subparser.add_parser('find')
find.add_argument('db')
find.add_argument('search_words', nargs='+')
find.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
find.set_defaults(func=search)
fsck = subparser.add_parser('fsck')
fsck.add_argument('db')
fsck.add_argument('-p', '--pretend', help="Don't do the action, just give"
" the info what would gonna to happen.",
action='store_true', default=False)
fsck.add_argument('-d', '--debug', help='Turn on debug',
action='store_true', default=False)
fsck.set_defaults(func=cleanup)
args = parser.parse_args()
if 'func' in args:
args.func(args)
else:
parser.print_help()
if __name__ == '__main__':
main()

43
pycatalog/dbcommon.py Normal file
View File

@@ -0,0 +1,43 @@
"""
Project: pyGTKtalog
Description: Common database operations.
Type: core
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2009-08-07
"""
from sqlalchemy import MetaData, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from pycatalog.logger import get_logger
# Prepare SQLAlchemy objects
Meta = MetaData()
Base = declarative_base(metadata=Meta)
Session = sessionmaker()
DbFilename = None
LOG = get_logger("dbcommon")
def connect(filename=None):
"""
create engine and bind to Meta object.
Arguments:
@filename - string with absolute or relative path to sqlite database
file. If None, db in-memory will be created
"""
global DbFilename
if not filename:
filename = ':memory:'
LOG.info("db filename: %s" % filename)
DbFilename = filename
connect_string = "sqlite:///%s" % filename
engine = create_engine(connect_string)
Meta.bind = engine
Meta.create_all(checkfirst=True)
return engine

295
pycatalog/dbobjects.py Normal file
View File

@@ -0,0 +1,295 @@
"""
Project: pyGTKtalog
Description: Definition of DB objects classes. Using SQLAlchemy.
Type: core
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2009-08-07
"""
import os
import shutil
from sqlalchemy import Column, Table, Integer, Text
from sqlalchemy import DateTime, ForeignKey, Sequence
from sqlalchemy.orm import relation, backref
from pycatalog.dbcommon import Base
from pycatalog.thumbnail import ThumbCreator
from pycatalog.logger import get_logger
from pycatalog.misc import mk_paths
LOG = get_logger(__name__)
tags_files = Table("tags_files", Base.metadata,
Column("file_id", Integer, ForeignKey("files.id")),
Column("tag_id", Integer, ForeignKey("tags.id")))
TYPE = {'root': 0, 'dir': 1, 'file': 2, 'link': 3}
class File(Base):
"""
File mapping. Instances of this object can reference other File object
which make the structure to be tree-like
"""
__tablename__ = "files"
id = Column(Integer, Sequence("file_id_seq"), primary_key=True)
parent_id = Column(Integer, ForeignKey("files.id"), index=True)
filename = Column(Text)
filepath = Column(Text)
date = Column(DateTime)
size = Column(Integer)
type = Column(Integer, index=True)
source = Column(Integer)
note = Column(Text)
description = Column(Text)
# checksum = Column(Text)
children = relation('File',
backref=backref('parent', remote_side="File.id"),
order_by=[type, filename])
tags = relation("Tag", secondary=tags_files, order_by="Tag.tag")
thumbnail = relation("Thumbnail", backref="file")
images = relation("Image", backref="file", order_by="Image.filename")
def __init__(self, filename=None, path=None, date=None, size=None,
ftype=None, src=None):
"""Create file object with empty defaults"""
self.filename = filename
self.filepath = path
self.date = date
self.size = size
self.type = ftype
self.source = src
def __repr__(self):
return "<File('%s', %s)>" % (self.filename, str(self.id))
def get_all_children(self):
"""
Return list of all node direct and indirect children
"""
def _recursive(node):
children = []
if node.children:
for child in node.children:
children += _recursive(child)
if node != self:
children.append(node)
return children
if self.children:
return _recursive(self)
else:
return []
class Group(Base):
"""TODO: what is this class for?"""
__tablename__ = "groups"
id = Column(Integer, Sequence("group_id_seq"), primary_key=True)
name = Column(Text)
color = Column(Text)
def __init__(self, name=None, color=None):
self.name = name
self.color = color
def __repr__(self):
return "<Group('%s', %s)>" % (str(self.name), str(self.id))
class Tag(Base):
"""Tag mapping"""
__tablename__ = "tags"
id = Column(Integer, Sequence("tags_id_seq"), primary_key=True)
group_id = Column(Integer, ForeignKey("groups.id"), index=True)
tag = Column(Text)
group = relation('Group', backref=backref('tags', remote_side="Group.id"))
files = relation("File", secondary=tags_files)
def __init__(self, tag=None, group=None):
self.tag = tag
self.group = group
def __repr__(self):
return "<Tag('%s', %s)>" % (str(self.tag), str(self.id))
class Thumbnail(Base):
"""Thumbnail for the file"""
__tablename__ = "thumbnails"
id = Column(Integer, Sequence("thumbnail_id_seq"), primary_key=True)
file_id = Column(Integer, ForeignKey("files.id"), index=True)
filename = Column(Text)
def __init__(self, filename=None, img_path=None, file_obj=None):
self.filename = filename
self.file = file_obj
self.img_path = img_path
if filename and file_obj and img_path:
self.save(self.filename, img_path)
def save(self, fname, img_path):
"""
Create file related thumbnail, add it to the file object.
"""
new_name = mk_paths(fname, img_path)
ext = os.path.splitext(self.filename)[1]
if ext:
new_name.append("".join([new_name.pop(), ext]))
thumb = ThumbCreator(self.filename).generate()
name, ext = os.path.splitext(new_name.pop())
new_name.append("".join([name, "_t", ext]))
self.filename = os.path.sep.join(new_name)
if not os.path.exists(os.path.join(img_path, *new_name)):
shutil.move(thumb, os.path.join(img_path, *new_name))
else:
LOG.info("Thumbnail already exists (%s: %s)",
fname, "/".join(new_name))
os.unlink(thumb)
def __repr__(self):
return "<Thumbnail('%s', %s)>" % (str(self.filename), str(self.id))
class Image(Base):
"""Images and their thumbnails"""
__tablename__ = "images"
id = Column(Integer, Sequence("images_id_seq"), primary_key=True)
file_id = Column(Integer, ForeignKey("files.id"), index=True)
filename = Column(Text)
def __init__(self, filename=None, img_path=None, file_obj=None, move=True):
self.filename = None
self.file = file_obj
self.img_path = img_path
if filename and img_path:
self.filename = filename
self.save(filename, img_path, move)
def save(self, fname, img_path, move=True):
"""
Save and create coressponding thumbnail (note: it differs from file
related thumbnail!)
"""
new_name = mk_paths(fname, img_path)
ext = os.path.splitext(self.filename)[1]
if ext:
new_name.append("".join([new_name.pop(), ext]))
if not os.path.exists(os.path.join(img_path, *new_name)):
if move:
shutil.move(self.filename, os.path.join(img_path, *new_name))
else:
shutil.copy(self.filename, os.path.join(img_path, *new_name))
else:
LOG.warning("Image with same CRC already exists "
"('%s', '%s')" % (self.filename, "/".join(new_name)))
self.filename = os.path.sep.join(new_name)
name, ext = os.path.splitext(new_name.pop())
new_name.append("".join([name, "_t", ext]))
if not os.path.exists(os.path.join(img_path, *new_name)):
thumb = ThumbCreator(os.path.join(img_path, self.filename))
shutil.move(thumb.generate(), os.path.join(img_path, *new_name))
else:
LOG.info("Thumbnail already generated %s" % "/".join(new_name))
def get_copy(self):
"""
Create the very same object as self with exception of id field
"""
img = Image()
img.filename = self.filename
return img
@property
def thumbnail(self):
"""
Return path to thumbnail for this image
"""
path, fname = os.path.split(self.filename)
base, ext = os.path.splitext(fname)
return os.path.join(path, base + "_t" + ext)
def __repr__(self):
return "<Image('%s', %s)>" % (str(self.filename), str(self.id))
class Exif(Base):
"""Selected EXIF information"""
__tablename__ = "exif"
id = Column(Integer, Sequence("exif_id_seq"), primary_key=True)
file_id = Column(Integer, ForeignKey("files.id"), index=True)
camera = Column(Text)
date = Column(Text)
aperture = Column(Text)
exposure_program = Column(Text)
exposure_bias = Column(Text)
iso = Column(Text)
focal_length = Column(Text)
subject_distance = Column(Text)
metering_mode = Column(Text)
flash = Column(Text)
light_source = Column(Text)
resolution = Column(Text)
orientation = Column(Text)
def __init__(self):
self.camera = None
self.date = None
self.aperture = None
self.exposure_program = None
self.exposure_bias = None
self.iso = None
self.focal_length = None
self.subject_distance = None
self.metering_mode = None
self.flash = None
self.light_source = None
self.resolution = None
self.orientation = None
def __repr__(self):
return "<Exif('%s', %s)>" % (str(self.date), str(self.id))
class Gthumb(Base):
"""Gthumb information"""
__tablename__ = "gthumb"
id = Column(Integer, Sequence("gthumb_id_seq"), primary_key=True)
file_id = Column(Integer, ForeignKey("files.id"), index=True)
note = Column(Text)
place = Column(Text)
date = Column(DateTime)
def __init__(self, note=None, place=None, date=None):
self.note = note
self.place = place
self.date = date
def __repr__(self):
return "<Gthumb('%s', '%s', %s)>" % (str(self.date), str(self.place),
str(self.id))
class Config(Base):
"""Per-database configuration"""
__tablename__ = "config"
id = Column(Integer, Sequence("config_id_seq"), primary_key=True)
key = Column(Text)
value = Column(Text)
def __init__(self, key=None, value=None):
self.key = key
self.value = value
def __repr__(self):
return "<Config('%s', '%s')>" % (str(self.key), str(self.value))

107
pycatalog/logger.py Normal file
View File

@@ -0,0 +1,107 @@
"""
Project: pyGTKtalog
Description: Logging functionality
Type: core
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2009-09-02
"""
import os
import sys
import logging
LEVEL = {'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARN': logging.WARN,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL}
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
BOLD_SEQ = "\033[1m"
COLORS = {'WARNING': YELLOW,
'INFO': GREEN,
'DEBUG': BLUE,
'CRITICAL': WHITE,
'ERROR': RED}
def cprint(txt, color):
color_map = {"black": BLACK,
"red": RED,
"green": GREEN,
"yellow": YELLOW,
"blue": BLUE,
"magenta": MAGENTA,
"cyan": CYAN,
"white": WHITE}
print(COLOR_SEQ % (30 + color_map[color]) + txt + RESET_SEQ)
class DummyFormater(logging.Formatter):
"""Just don't output anything"""
def format(self, record):
return ""
class ColoredFormatter(logging.Formatter):
def __init__(self, msg, use_color=True):
logging.Formatter.__init__(self, msg)
self.use_color = use_color
def format(self, record):
levelname = record.levelname
if self.use_color and levelname in COLORS:
levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) \
+ levelname + RESET_SEQ
record.levelname = levelname_color
return logging.Formatter.format(self, record)
log_obj = None
def get_logger(module_name, level='INFO', to_file=True, to_console=True):
"""
Prepare and return log object. Standard formatting is used for all logs.
Arguments:
@module_name - String name for Logger object.
@level - Log level (as string), one of DEBUG, INFO, WARN, ERROR and
CRITICAL.
@to_file - If True, additionally stores full log in file inside
.pycatalog config directory and to stderr, otherwise log
is only redirected to stderr.
Returns: object of logging.Logger class
"""
path = os.path.join(os.path.expanduser("~"), ".pycatalog", "app.log")
log = logging.getLogger(module_name)
log.setLevel(LEVEL[level])
if to_console:
console_handler = logging.StreamHandler(sys.stderr)
console_formatter = ColoredFormatter("%(filename)s:%(lineno)s - "
"%(levelname)s - %(message)s")
console_handler.setFormatter(console_formatter)
log.addHandler(console_handler)
elif to_file:
file_handler = logging.FileHandler(path)
file_formatter = logging.Formatter("%(asctime)s %(levelname)6s "
"%(filename)s: %(lineno)s - "
"%(message)s")
file_handler.setFormatter(file_formatter)
file_handler.setLevel(LEVEL[level])
log.addHandler(file_handler)
else:
devnull = open(os.devnull, "w")
dummy_handler = logging.StreamHandler(devnull)
dummy_formatter = DummyFormater("")
dummy_handler.setFormatter(dummy_formatter)
log.addHandler(dummy_handler)
return log

77
pycatalog/misc.py Normal file
View File

@@ -0,0 +1,77 @@
"""
Project: pyGTKtalog
Description: Misc functions used more than once in src
Type: lib
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2009-04-05
"""
import os
import errno
from zlib import crc32
import pycatalog.dbcommon
from pycatalog.logger import get_logger
LOG = get_logger(__name__)
def float_to_string(float_length):
"""
Parse float digit into time string
Arguments:
@number - digit to be converted into time.
Returns HH:MM:SS formatted string
"""
hour = int(float_length / 3600)
float_length -= hour*3600
minutes = int(float_length / 60)
float_length -= minutes * 60
sec = int(float_length)
return "%02d:%02d:%02d" % (hour, minutes, sec)
def calculate_image_path(dbpath=None, create=False):
"""Calculate image path out of provided path or using current connection"""
if not dbpath:
dbpath = pycatalog.dbcommon.DbFilename
if dbpath == ":memory:":
raise OSError("Cannot create image path out of in-memory db!")
dir_, file_ = (os.path.dirname(dbpath), os.path.basename(dbpath))
file_base, dummy = os.path.splitext(file_)
images_dir = os.path.join(dir_, file_base + "_images")
else:
if dbpath and "~" in dbpath:
dbpath = os.path.expanduser(dbpath)
if dbpath and "$" in dbpath:
dbpath = os.path.expandvars(dbpath)
images_dir = dbpath
if create:
if not os.path.exists(images_dir):
try:
os.mkdir(images_dir)
except OSError as err:
if err.errno != errno.EEXIST:
raise
elif not os.path.exists(images_dir):
raise OSError("%s: No such directory" % images_dir)
return os.path.abspath(images_dir)
def mk_paths(fname, img_path):
"""Make path for provided pathname by calculating crc32 out of file"""
with open(fname, 'r+b') as fobj:
new_path = "%x" % (crc32(fobj.read(10*1024*1024)) & 0xffffffff)
new_path = [new_path[i:i + 2] for i in range(0, len(new_path), 2)]
full_path = os.path.join(img_path, *new_path[:-1])
try:
os.makedirs(full_path)
except OSError as exc:
if exc.errno != errno.EEXIST:
LOG.debug("Directory %s already exists." % full_path)
return new_path

25
pycatalog/pygtkutils.py Normal file
View File

@@ -0,0 +1,25 @@
"""
Project: pyGTKtalog
Description: pyGTK common utility functions
Type: utility
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2010-11-07 13:30:37
"""
def get_tv_item_under_cursor(treeview):
"""
Get item (most probably id of the row) form tree view under cursor.
Arguments:
@treeview - gtk.TreeView
Returns:
Item in first column of TreeModel, which TreeView is connected with,
None in other cases
"""
path, column = treeview.get_cursor()
if path and column:
model = treeview.get_model()
tm_iter = model.get_iter(path)
item_id = model.get_value(tm_iter, 0)
return item_id
return None

501
pycatalog/scan.py Normal file
View File

@@ -0,0 +1,501 @@
"""
Project: pyGTKtalog
Description: Filesystem scan and file automation layer
Type: core
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2011-03-27
"""
import os
import re
from datetime import datetime
import mimetypes
import pycatalog.misc
from pycatalog.dbobjects import File, Image, Thumbnail, Config, TYPE
from pycatalog.dbcommon import Session
from pycatalog.logger import get_logger
from pycatalog.video import Video
LOG = get_logger(__name__)
RE_FN_START = re.compile(r'(?P<fname_start>'
r'(\[[^\]]*\]\s)?'
r'([^(]*)\s'
r'((\(\d{4}\))\s)?).*'
r'(\[[A-Fa-f0-9]{8}\])\..*')
class NoAccessError(Exception):
"""No access exception"""
pass
class Scan(object):
"""
Retrieve and identify all files recursively on given path
"""
def __init__(self, path):
"""
Initialize
@Arguments:
@path - string with path to be added to topmost node (root)
"""
self.abort = False
self.path = path.rstrip(os.path.sep)
self._files = []
self._existing_files = [] # for re-use purpose in adding
self._existing_branch = [] # for branch storage, mainly for updating
self._session = Session()
self.files_count = self._get_files_count()
self.current_count = 0
self._set_image_path()
def add_files(self, engine=None):
"""
Returns list, which contain object, modification date and file
size.
"""
self._files = []
self._existing_branch = []
LOG.debug("given path: %s", self.path)
# See, if file exists. If not it would raise OSError exception
os.stat(self.path)
if not os.access(self.path, os.R_OK | os.X_OK) \
or not os.path.isdir(self.path):
raise NoAccessError("Access to %s is forbidden" % self.path)
directory = os.path.basename(self.path)
path = os.path.dirname(self.path)
if not self._recursive(None, directory, path, 0):
return None
# add only first item from _files, because it is a root of the other,
# so other will be automatically added aswell.
self._session.add(self._files[0])
self._session.commit()
return self._files
def get_all_children(self, node_id, engine):
"""
Get children by pure SQL
Starting from sqlite 3.8.3 it is possile to do this operation as a
one query using WITH statement. For now on it has to be done in
application.
"""
query = "select id from files where parent_id=? and type=1"
query2 = "select id from files where parent_id in (%s)"
row = ((node_id,),)
all_ids = []
def req(obj):
"""Requrisve function for gathering all child ids for given node"""
for line in obj:
all_ids.append(line[0])
res = engine.execute(query, (line[0],)).fetchall()
if res:
req(res)
req(row)
sql = query2 % ",".join("?" * len(all_ids))
all_ids = [row_[0] for row_ in engine
.execute(sql, tuple(all_ids))
.fetchall()]
all_obj = []
# number of objects to retrieve at once. Limit is 999. Let's do a
# little bit below.
num = 900
steps = len(all_ids) // num + 1
for step in range(steps):
all_obj.extend(self._session
.query(File)
.filter(File.id
.in_(all_ids[step * num:step * num + num]))
.all())
return all_obj
def update_files(self, node_id, engine=None):
"""
Updtate DB contents of provided node.
"""
self.current_count = 0
old_node = self._session.query(File).get(node_id)
if old_node is None:
LOG.warning("No such object in db: %s", node_id)
return
parent = old_node.parent
self._files = []
if engine:
LOG.debug("Getting all File objects via SQL")
self._existing_branch = self.get_all_children(node_id, engine)
else:
LOG.debug("Getting all File objects via ORM (yeah, it SLOW)")
self._existing_branch = old_node.get_all_children()
self._existing_branch.insert(0, old_node)
# Break the chain of parent-children relations
LOG.debug("Make them orphans")
for fobj in self._existing_branch:
fobj.parent = None
update_path = os.path.join(old_node.filepath, old_node.filename)
# gimme a string. unicode can't handle strange filenames in paths, so
# in case of such, better get me a byte string. It is not perfect
# though, since it WILL crash if the update_path would contain some
# unconvertable characters.
update_path = update_path
# refresh objects
LOG.debug("Refreshing objects")
self._get_all_files()
LOG.debug("path for update: %s", update_path)
# See, if file exists. If not it would raise OSError exception
os.stat(update_path)
if not os.access(update_path, os.R_OK | os.X_OK) \
or not os.path.isdir(update_path):
LOG.error("Access to %s is forbidden", update_path)
raise NoAccessError("Access to %s is forbidden" % update_path)
directory = os.path.basename(update_path)
path = os.path.dirname(update_path)
if not self._recursive(parent, directory, path, 0):
return None
# update branch
# self._session.merge(self._files[0])
LOG.debug("Deleting objects whitout parent: %s",
str(self._session.query(File)
.filter(File.parent.is_(None)).all()))
self._session.query(File).filter(File.parent.is_(None)).delete()
self._session.commit()
return self._files
def _gather_information(self, fobj):
"""
Try to guess type and gather information about File object if possible
"""
mimedict = {'audio': self._audio,
'video': self._video,
'image': self._image}
extdict = {'.mkv': 'video', # TODO: move this to config/plugin(?)
'.rmvb': 'video',
'.ogm': 'video',
'.ogv': 'video'}
fp = os.path.join(fobj.filepath, fobj.filename)
mimeinfo = mimetypes.guess_type(fp)
if mimeinfo[0]:
mimeinfo = mimeinfo[0].split("/")[0]
ext = os.path.splitext(fp)[1]
if mimeinfo and mimeinfo in mimedict:
mimedict[mimeinfo](fobj, fp)
elif ext and ext in extdict:
mimedict[extdict[ext]](fobj, fp)
else:
LOG.debug("Filetype not supported %s %s", str(mimeinfo), fp)
pass
def _audio(self, fobj, filepath):
# LOG.warning('audio')
return
def _image(self, fobj, filepath):
# LOG.warning('image')
return
def _video(self, fobj, filepath):
"""
Make captures for a movie. Save it under uniq name.
"""
result = RE_FN_START.match(fobj.filename)
if result:
self._check_related(fobj, result.groupdict()['fname_start'])
vid = Video(filepath)
fobj.description = vid.get_formatted_tags()
preview_fn = vid.capture()
if preview_fn:
Image(preview_fn, self.img_path, fobj)
def _check_related(self, fobj, filename_start):
"""
Try to search for related files which belongs to specified File
object and pattern. If found, additional File objects are created.
For example, if we have movie file named like:
[aXXo] Batman (1989) [D3ADBEEF].avi
[aXXo] Batman (1989) trailer [B00B1337].avi
Batman (1989) [D3ADBEEF].avi
Batman [D3ADBEEF].avi
And for example file '[aXXo] Batman (1989) [D3ADBEEF].avi' might have
some other accompanied files, like:
[aXXo] Batman (1989) [D3ADBEEF].avi.conf
[aXXo] Batman (1989) [DEADC0DE].nfo
[aXXo] Batman (1989) cover [BEEFD00D].jpg
[aXXo] Batman (1989) poster [FEEDD00D].jpg
Which can be atuomatically asociated with the movie.
This method find such files, and for some of them (currently images)
will perform extra actions - like creating corresponding Image objects.
"""
for fname in os.listdir(fobj.filepath):
extension = os.path.splitext(fname)[1]
if fname.startswith(filename_start) and \
extension in ('.jpg', '.gif', '.png'):
full_fname = os.path.join(fobj.filepath, fname)
LOG.debug('found corresponding image file: %s', full_fname)
Image(full_fname, self.img_path, fobj, False)
if not fobj.thumbnail:
Thumbnail(full_fname, self.img_path, fobj)
def _get_all_files(self):
"""Gather all File objects"""
self._existing_files = self._session.query(File).all()
def _mk_file(self, fname, path, parent, ftype=TYPE['file']):
"""
Create and return File object
"""
fullpath = os.path.join(path, fname)
if ftype == TYPE['link']:
fname = fname + " -> " + os.readlink(fullpath)
fob = {'filename': fname,
'path': path,
'ftype': ftype}
try:
fob['date'] = datetime.fromtimestamp(os.stat(fullpath).st_mtime)
fob['size'] = os.stat(fullpath).st_size
except OSError:
# in case of dead softlink, we will have no time and size
fob['date'] = None
fob['size'] = 0
fobj = self._get_old_file(fob, ftype)
if fobj:
LOG.debug("found existing file in db: %s", str(fobj))
# TODO: update whole tree sizes (for directories/discs)
fobj.size = fob['size']
fobj.filepath = fob['path']
fobj.type = fob['ftype']
else:
fobj = File(**fob)
# SLOW. Don't do this. Checksums has no value eventually
# fobj.mk_checksum()
if parent is None:
fobj.parent_id = 1
else:
fobj.parent = parent
self._files.append(fobj)
return fobj
def _non_recursive(self, parent, fname, path, size):
"""
Do the walk through the file system. Non recursively, since it's
slow as hell.
@Arguments:
@parent - directory File object which is parent for the current
scope
@fname - string that hold filename
@path - full path for further scanning
@size - size of the object
"""
fullpath = os.path.join(path, fname)
parent = self._mk_file(fname, path, parent, TYPE['dir'])
parent.size = 0
parent.type = TYPE['dir']
for root, dirs, files in os.walk(fullpath):
for dir_ in dirs:
pass
for file_ in files:
self.current_count += 1
stat = os.lstat(os.path.join(root, file_))
parent.size += stat.st_size
# TODO: finish that up
def _recursive(self, parent, fname, path, size):
"""
Do the walk through the file system
@Arguments:
@parent - directory File object which is parent for the current
scope
@fname - string that hold filename
@path - full path for further scanning
@size - size of the object
"""
if self.abort:
return False
fullpath = os.path.join(path, fname)
parent = self._mk_file(fname, path, parent, TYPE['dir'])
parent.size = _get_dirsize(fullpath)
parent.type = TYPE['dir']
LOG.info("Scanning `%s' [%s/%s]", fullpath, self.current_count,
self.files_count)
root, dirs, files = next(os.walk(fullpath))
for fname in files:
fpath = os.path.join(root, fname)
extension = os.path.splitext(fname)[1]
self.current_count += 1
LOG.debug("Processing %s [%s/%s]", fname, self.current_count,
self.files_count)
result = RE_FN_START.match(fname)
test_ = False
if result and extension in ('.jpg', '.gif', '.png'):
startfrom = result.groupdict()['fname_start']
matching_files = []
for fn_ in os.listdir(root):
if fn_.startswith(startfrom):
matching_files.append(fn_)
if len(matching_files) > 1:
LOG.debug('found image "%s" in group: %s, skipping', fname,
str(matching_files))
test_ = True
if test_:
continue
if os.path.islink(fpath):
fob = self._mk_file(fname, root, parent, TYPE['link'])
else:
fob = self._mk_file(fname, root, parent)
existing_obj = self._object_exists(fob)
if existing_obj:
existing_obj.parent = fob.parent
fob = existing_obj
else:
LOG.debug("gather information for %s",
os.path.join(root, fname))
self._gather_information(fob)
size += fob.size
if fob not in self._existing_files:
self._existing_files.append(fob)
for dirname in dirs:
dirpath = os.path.join(root, dirname)
if not os.access(dirpath, os.R_OK | os.X_OK):
LOG.info("Cannot access directory %s", dirpath)
continue
if os.path.islink(dirpath):
fob = self._mk_file(dirname, root, parent, TYPE['link'])
else:
LOG.debug("going into %s", os.path.join(root, dirname))
self._recursive(parent, dirname, fullpath, size)
LOG.debug("size of items: %s", parent.size)
return True
def _get_old_file(self, fdict, ftype):
"""
Search for object with provided data in dictionary in stored branch
(which is updating). Return such object on success, remove it from
list.
"""
for index, obj in enumerate(self._existing_branch):
if ftype == TYPE['link'] and fdict['filename'] == obj.filename:
return self._existing_branch.pop(index)
elif fdict['filename'] == obj.filename and \
fdict['date'] == obj.date and \
ftype == TYPE['file'] and \
fdict['size'] in (obj.size, 0):
obj = self._existing_branch.pop(index)
obj.size = fdict['size']
return obj
elif fdict['filename'] == obj.filename:
obj = self._existing_branch.pop(index)
obj.size = fdict['date']
return obj
return False
def _object_exists(self, fobj):
"""
Perform check if current File object already exists in collection. If
so, return first matching one, None otherwise.
"""
for efobj in self._existing_files:
if efobj.size == fobj.size \
and efobj.type == fobj.type \
and efobj.date == fobj.date \
and efobj.filename == fobj.filename:
return efobj
return None
def _get_files_count(self):
"""return size in bytes"""
count = 0
for _, _, files in os.walk(str(self.path)):
count += len(files)
LOG.debug("count of files: %s", count)
return count
def _set_image_path(self):
"""Get or calculate the images path"""
image_path = (self._session.query(Config)
.filter(Config.key == "image_path")).one()
if image_path.value == ":same_as_db:":
image_path = pycatalog.misc.calculate_image_path()
else:
image_path = pycatalog.misc.calculate_image_path(image_path.value)
self.img_path = image_path
def _get_dirsize(path):
"""
Returns sum of all files under specified path (also in subdirs)
"""
size = 0
for root, _, files in os.walk(path):
for fname in files:
try:
size += os.lstat(os.path.join(root, fname)).st_size
except OSError:
LOG.warning("Cannot access file %s",
os.path.join(root, fname))
LOG.debug("_get_dirsize, %s: %d", path, size)
return size

114
pycatalog/thumbnail.py Normal file
View File

@@ -0,0 +1,114 @@
"""
Project: pyGTKtalog
Description: Create thumbnail for sepcified image
Type: lib
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2011-05-15
"""
import os
from tempfile import mkstemp
import shutil
from PIL import Image
import exifread
from pycatalog.logger import get_logger
LOG = get_logger(__name__)
class ThumbCreator(object):
"""
Class for generate/extract thumbnail from image file
"""
def __init__(self, filename):
self.thumb_x = 160
self.thumb_y = 160
self.filename = filename
def generate(self):
"""
Save thumbnail into temporary file
"""
exif = {}
orientations = {2: Image.FLIP_LEFT_RIGHT, # Mirrored horizontal
3: Image.ROTATE_180, # Rotated 180
4: Image.FLIP_TOP_BOTTOM, # Mirrored vertical
5: Image.ROTATE_90, # Mirrored horizontal then
# rotated 90 CCW
6: Image.ROTATE_270, # Rotated 90 CW
7: Image.ROTATE_270, # Mirrored horizontal then
# rotated 90 CW
8: Image.ROTATE_90} # Rotated 90 CCW
flips = {7: Image.FLIP_LEFT_RIGHT, 5: Image.FLIP_LEFT_RIGHT}
exif = self._get_exif()
file_desc, thumb_fn = mkstemp(suffix=".jpg")
os.close(file_desc)
if exif and 'JPEGThumbnail' in exif and exif['JPEGThumbnail']:
LOG.debug("exif thumb for filename %s" % self.filename)
exif_thumbnail = exif['JPEGThumbnail']
thumb = open(thumb_fn, 'wb')
thumb.write(exif_thumbnail)
thumb.close()
else:
LOG.debug("no exif thumb")
if self.is_image_smaller():
shutil.copyfile(self.filename, thumb_fn)
else:
thumb = self._scale_image()
if thumb:
thumb.save(thumb_fn, "JPEG")
if exif and 'Image Orientation' in exif:
orient = exif['Image Orientation'].values[0]
if orient > 1 and orient in orientations:
thumb_image = Image.open(thumb_fn)
tmp_thumb_img = thumb_image.transpose(orientations[orient])
if orient in flips:
tmp_thumb_img = tmp_thumb_img.transpose(flips[orient])
tmp_thumb_img.save(thumb_fn, 'JPEG')
return thumb_fn
def is_image_smaller(self):
"""Check if image is smaller than desired dimention, return boolean"""
image = Image.open(self.filename)
im_x, im_y = image.size
image.close()
return im_x <= self.thumb_x and im_y <= self.thumb_y
def _get_exif(self):
"""
Get exif (if available), return as a dict
"""
image_file = open(self.filename, 'rb')
try:
exif = exifread.process_file(image_file)
except Exception:
exif = {}
LOG.info("Exif crashed on '%s'." % self.filename)
finally:
image_file.close()
return exif
def _scale_image(self):
"""
Create thumbnail. returns image object or None
"""
try:
image_thumb = Image.open(self.filename).convert('RGB')
except Exception:
return None
it_x, it_y = image_thumb.size
if it_x > self.thumb_x or it_y > self.thumb_y:
image_thumb.thumbnail((self.thumb_x, self.thumb_y),
Image.ANTIALIAS)
return image_thumb

283
pycatalog/video.py Normal file
View File

@@ -0,0 +1,283 @@
"""
Project: pyGTKtalog
Description: Gather video file information, make "screenshot" with content
of the movie file. Uses external tools like mplayer.
Type: lib
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2009-04-04
"""
import math
import os
import shutil
import tempfile
from PIL import Image
from pygtktalog.misc import float_to_string
from pygtktalog.logger import get_logger
LOG = get_logger("Video")
class Video(object):
"""Class for retrive midentify script output and put it in dict.
Usually there is no need for such a detailed movie/clip information.
Midentify script belongs to mplayer package.
"""
def __init__(self, filename, out_width=1024):
"""
Init class instance.
Arguments:
@filename - Filename of a video file (required).
@out_width - width of final image to be scaled to.
"""
self.filename = filename
self.out_width = out_width
self.tags = {}
output = self._get_movie_info()
attrs = {'ID_VIDEO_WIDTH': ['width', int],
'ID_VIDEO_HEIGHT': ['height', int],
# length is in seconds
'ID_LENGTH': ['length', lambda x: int(x.split(".")[0])],
'ID_START_TIME': ['start', self._get_start_pos],
'ID_DEMUXER': ['container', self._return_lower],
'ID_VIDEO_FORMAT': ['video_format', self._return_lower],
'ID_VIDEO_CODEC': ['video_codec', self._return_lower],
'ID_AUDIO_CODEC': ['audio_codec', self._return_lower],
'ID_AUDIO_FORMAT': ['audio_format', self._return_lower],
'ID_AUDIO_NCH': ['audio_no_channels', int]}
# TODO: what about audio/subtitle language/existence?
for key in output:
if key in attrs:
self.tags[attrs[key][0]] = attrs[key][1](output[key])
if 'length' in self.tags and self.tags['length'] > 0:
start = self.tags.get('start', 0)
length = self.tags['length'] - start
hours = length // 3600
seconds = length - hours * 3600
minutes = seconds // 60
seconds -= minutes * 60
length_str = "%02d:%02d:%02d" % (hours, minutes, seconds)
self.tags['duration'] = length_str
def capture(self):
"""
Extract images for given video filename and montage it into one, big
picture, similar to output from Windows Media Player thing, but without
captions and time (who need it anyway?).
Returns: image filename or None
NOTE: You should remove returned file manually, or move it in some
other place, otherwise it stays in filesystem.
"""
if not ('length' in self.tags and 'width' in self.tags):
# no length or width
return None
if not (self.tags['length'] > 0 and self.tags['width'] > 0):
# zero length or wight
return None
# Calculate number of pictures. Base is equivalent 72 pictures for
# 1:30:00 movie length
scale = int(10 * math.log(self.tags['length'], math.e) - 11)
if scale < 1:
return None
no_pictures = self.tags['length'] // scale
if no_pictures > 8:
no_pictures = (no_pictures // 8) * 8 # only multiple of 8, please.
else:
# for really short movies
no_pictures = 4
tempdir = tempfile.mkdtemp()
file_desc, image_fn = tempfile.mkstemp(suffix=".jpg")
os.close(file_desc)
self._make_captures(tempdir, no_pictures)
self._make_montage(tempdir, image_fn, no_pictures)
shutil.rmtree(tempdir)
return image_fn
def get_formatted_tags(self):
"""
Return formatted tags as a string
"""
out_tags = ''
if 'container' in self.tags:
out_tags += "Container: %s\n" % self.tags['container']
if 'width' in self.tags and 'height' in self.tags:
out_tags += "Resolution: %sx%s\n" % (self.tags['width'],
self.tags['height'])
if 'duration' in self.tags:
out_tags += "Duration: %s\n" % self.tags['duration']
if 'video_codec' in self.tags:
out_tags += "Video codec: %s\n" % self.tags['video_codec']
if 'video_format' in self.tags:
out_tags += "Video format: %s\n" % self.tags['video_format']
if 'audio_codec' in self.tags:
out_tags += "Audio codec: %s\n" % self.tags['audio_codec']
if 'audio_format' in self.tags:
out_tags += "Audio format: %s\n" % self.tags['audio_format']
if 'audio_no_channels' in self.tags:
out_tags += "Audio channels: %s\n" % self.tags['audio_no_channels']
return out_tags
def _get_movie_info(self):
"""
Gather movie file information with midentify shell command.
Returns: dict of command output. Each dict element represents pairs:
variable=value, for example output from midentify will be:
ID_VIDEO_ID=0
ID_AUDIO_ID=1
....
ID_AUDIO_CODEC=mp3
ID_EXIT=EOF
so method returns dict:
{'ID_VIDEO_ID': '0',
'ID_AUDIO_ID': 1,
....
'ID_AUDIO_CODEC': 'mp3',
'ID_EXIT': 'EOF'}
"""
output = os.popen('midentify "%s"' % self.filename).readlines()
return_dict = {}
for line in output:
line = line.strip()
key = line.split('=')
if len(key) > 1:
return_dict[key[0]] = line.replace("%s=" % key[0], "")
return return_dict
def _make_captures(self, directory, no_pictures):
"""
Make screens with mplayer into given directory
Arguments:
@directory - full output directory name
@no_pictures - number of pictures to take
"""
step = self.tags['length'] / (no_pictures + 1)
current_time = 0
for dummy in range(1, no_pictures + 1):
current_time += step
time = float_to_string(current_time)
cmd = ('mplayer "%s" -ao null -brightness 0 -hue 0 '
'-saturation 0 -contrast 0 -mc 0 -vf-clr '
'-vo jpeg:outdir="%s" -ss %s -frames 1 2>/dev/null')
os.popen(cmd % (self.filename, directory, time)).readlines()
try:
shutil.move(os.path.join(directory, "00000001.jpg"),
os.path.join(directory, "picture_%s.jpg" % time))
except IOError as exc:
errno, strerror = exc.args
LOG.error('error capturing file from movie "%s" at position '
'%s. Errors: %s, %s', self.filename, time, errno,
strerror)
def _make_montage(self, directory, image_fn, no_pictures):
"""
Generate one big image from screnshots and optionally resize it. Uses
PIL package to create output image.
Arguments:
@directory - source directory containing images
@image_fn - destination final image
@no_pictures - number of pictures
timeit result:
python /usr/lib/python2.6/timeit.py -n 1 -r 1 'from \
pycatalog.video import Video; v = Video("/home/gryf/t/a.avi"); \
v.capture()'
1 loops, best of 1: 18.8 sec per loop
"""
row_length = 4
if no_pictures < 8:
row_length = 2
if not (self.tags['width'] * row_length) > self.out_width:
for i in [8, 6, 5]:
if ((no_pictures % i) == 0 and
(i * self.tags['width']) <= self.out_width):
row_length = i
break
coef = (float(self.out_width - row_length - 1) /
(self.tags['width'] * row_length))
if coef < 1:
dim = (int(self.tags['width'] * coef),
int(self.tags['height'] * coef))
else:
dim = int(self.tags['width']), int(self.tags['height'])
ifn_list = os.listdir(directory)
ifn_list.sort()
img_list = [Image.open(os.path.join(directory, fn)).resize(dim)
for fn in ifn_list]
rows = no_pictures // row_length
cols = row_length
isize = (cols * dim[0] + cols + 1,
rows * dim[1] + rows + 1)
inew = Image.new('RGB', isize, (80, 80, 80))
for irow in range(no_pictures * row_length):
for icol in range(row_length):
left = 1 + icol * (dim[0] + 1)
right = left + dim[0]
upper = 1 + irow * (dim[1] + 1)
lower = upper + dim[1]
bbox = (left, upper, right, lower)
try:
img = img_list.pop(0)
except Exception:
break
inew.paste(img, bbox)
inew.save(image_fn, 'JPEG')
def _return_lower(self, chain):
"""
Return lowercase version of provided string argument
Arguments:
@chain string to be lowered
Returns:
@string with lowered string
"""
return str(chain).lower()
def _get_start_pos(self, chain):
"""
Return integer for starting point of the movie
"""
try:
return int(chain.split(".")[0])
except Exception:
return 0
def __str__(self):
str_out = ''
for key in self.tags:
str_out += "%20s: %s\n" % (key, self.tags[key])
return str_out