diff --git a/pygtktalog/dbcommon.py b/pygtktalog/dbcommon.py index d14cd7d..9b22da4 100644 --- a/pygtktalog/dbcommon.py +++ b/pygtktalog/dbcommon.py @@ -1,211 +1,29 @@ """ Project: pyGTKtalog - Description: Pseudo ORM. Warning. This is a hack. + Description: Common database operations. Type: core Author: Roman 'gryf' Dobosz, gryf73@gmail.com - Created: 2009-05-20 + Created: 2009-08-07 """ -from sqlite3 import dbapi2 as sqlite +from sqlalchemy import MetaData, create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.ext.declarative import declarative_base -class SchemaError(Exception): +# Prepare SQLAlchemy objects +Meta = MetaData() +Base = declarative_base(metadata=Meta) +Session = sessionmaker() + +def connect(filename): """ - Simple class for raising exceptions connected with DataBase class - """ - pass - - -class DataBase(object): - """ - Main class for database common stuff + create engine and bind to Meta object. + Arguments: + @filename - string with absolute or relative path to sqlite database + file. """ - schema_order = ["files", "tags", "tags_files", "groups", "thumbnails", - "images", "exif", "gthumb"] - - schema = {"files": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"), - ("parent_id", "INTEGER"), - ("filename", "TEXT"), - ("filepath", "TEXT"), - ("date", "datetime"), - ("size", "INTEGER"), - ("type", "INTEGER"), - ("source", "INTEGER"), - ("note", "TEXT"), - ("description", "TEXT")], - "tags": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"), - ("group_id", "INTEGER"), - ("tag", "TEXT")], - "tags_files": [("file_id", "INTEGER"), - ("tag_id", "INTEGER")], - "groups": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"), - ("name", "TEXT"), - ("color", "TEXT")], - "thumbnails": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"), - ("file_id", "INTEGER"), - ("filename", "TEXT")], - "images": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"), - ("file_id", "INTEGER"), - ("filename", "TEXT")], - "exif": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"), - ("file_id", "INTEGER"), - ("camera", "TEXT"), - ("date", "TEXT"), - ("aperture", "TEXT"), - ("exposure_program", "TEXT"), - ("exposure_bias", "TEXT"), - ("iso", "TEXT"), - ("focal_length", "TEXT"), - ("subject_distance", "TEXT"), - ("metering_mode", "TEXT"), - ("flash", "TEXT"), - ("light_source", "TEXT"), - ("resolution", "TEXT"), - ("orientation", "TEXT")], - "gthumb": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"), - ("file_id", "INTEGER"), - ("note", "TEXT"), - ("place", "TEXT"), - ("date", "datetime")]} - - conn = None - cur = None - filename = None - - @classmethod - def get_connection(self): - """ - Returns: current connection or None if not connected. - """ - return DataBase.conn - - @classmethod - def get_cursor(self): - """ - Returns: current connection cursor, or None if not connected. - """ - return DataBase.cur - - @classmethod - def close(self): - """ - Close current connection. If there is no connection, do nothing. - Returns: True, if db close was performed, False if there is no - connection to close. - """ - if DataBase.cur is None: - return False - - DataBase.conn.commit() - DataBase.cur.close() - DataBase.conn.close() - - DataBase.filename = None - DataBase.cur = None - DataBase.conn = None - - return True - - @classmethod - def open(self, filename=":memory:", force=False): - """ - Open connection, check database schema, and alternatively create one. - - If provided filename is different from current, current connection is - closed and new connection is set up. - - Arguments: - @filename - full path of database file. - @force - force schema creating. - - Returns: True, if db open succeded, False in other cases. - """ - - if DataBase.filename is not None: - DataBase.close() - - DataBase.filename = filename - - DataBase.connect() - - if DataBase.cur is None: - return False - - if not DataBase.check_schema(): - if not force: - raise SchemaError("Schema for this database is not compatible" - " with pyGTKtalog") - else: - # create schema - DataBase.create_schema(DataBase.cur, DataBase.conn) - - return True - - @classmethod - def check_schema(self): - """ - Check schema information. - Returns: True, if schema is compatible with pyGTKtalog, False - otherwise - """ - - if DataBase.cur is None: - return False - - tables = DataBase.cur.execute("select name, sql from sqlite_master " - "where type='table' and " - "name!='sqlite_sequence'").fetchall() - table_names = [] - for table in tables: - table_names.append(str(table[0])) - - table_names.sort() - orig_tables = DataBase.schema_order[:] - orig_tables.sort() - - return orig_tables == table_names - - @classmethod - def create_schema(self, cur, conn): - """ - Create database schema. - Returns: True on success, False otherwise - """ - if cur is None: - return False - - for tablename in DataBase.schema_order: - sql = "create table %s(" % tablename - for column, coltype in DataBase.schema[tablename]: - sql += "%s %s," % (column, coltype) - sql = sql[:-1] + ")" # remove last comma and close definition - #import pdb; pdb.set_trace() - cur.execute(sql) - - # add initial values - cur.execute("INSERT INTO files VALUES" - "(1, 1, 'root', null, 0, 0, 0, 0, null, null)") - - cur.execute("INSERT INTO groups VALUES(1, 'default'," - "'black')") - conn.commit() - return True - - @classmethod - def connect(self): - """ - Connect to database. - Returns: cursor for connection. - """ - if DataBase.cur: - return DataBase.cur - - types = sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES - DataBase.conn = sqlite.connect(DataBase.filename, detect_types=types) - DataBase.cur = DataBase.conn.cursor() - return DataBase.cur - - -def create_database(): - DataBase.open(":memory:", True) + engine = create_engine("sqlite:///%s" % filename, echo=True) + Meta.bind = engine + Meta.create_all() diff --git a/pygtktalog/dbobjects.py b/pygtktalog/dbobjects.py new file mode 100644 index 0000000..d1ca174 --- /dev/null +++ b/pygtktalog/dbobjects.py @@ -0,0 +1,157 @@ +""" + Project: pyGTKtalog + Description: Definition of DB objects classes. Using SQLAlchemy. + Type: core + Author: Roman 'gryf' Dobosz, gryf73@gmail.com + Created: 2009-08-07 +""" +from sqlalchemy import Column, Table, Integer, Text +from sqlalchemy import DateTime, ForeignKey, Sequence +from sqlalchemy.orm import relation, backref +from pygtktalog.dbcommon import Base + + +tags_files = Table("tags_files", Base.metadata, + Column("file_id", Integer, ForeignKey("files.id")), + Column("tag_id", Integer, ForeignKey("tags.id"))) + +class File(Base): + __tablename__ = "files" + id = Column(Integer, Sequence("file_id_seq"), primary_key=True) + parent_id = Column(Integer, ForeignKey("files.id")) + filename = Column(Text) + filepath = Column(Text) + date = Column(DateTime) + size = Column(Integer) + type = Column(Integer) + source = Column(Integer) + note = Column(Text) + description = Column(Text) + + children = relation('File', backref=backref('parent', + remote_side="File.id")) + tags = relation("Tag", secondary=tags_files) + thumbnail = relation("Thumbnail", backref="file") + images = relation("Image", backref="file") + + def __init__(self, filename=None, path=None, date=None, size=None, + ftype=None, src=None): + """Create file object with empty defaults""" + self.filename = filename + self.filepath = path + self.date = date + self.size = size + self.type = ftype + self.source = src + + def __repr__(self): + return "" % (str(self.filename), str(self.id)) + +class Group(Base): + __tablename__ = "groups" + id = Column(Integer, Sequence("group_id_seq"), primary_key=True) + name = Column(Text) + color = Column(Text) + + def __init__(self, name=None, color=None): + self.name = name + self.color = color + + def __repr__(self): + return "" % (str(self.name), str(self.id)) + +class Tag(Base): + __tablename__ = "tags" + id = Column(Integer, Sequence("tags_id_seq"), primary_key=True) + group_id = Column(Integer, ForeignKey("groups.id")) + tag = Column(Text) + group = relation('Group', backref=backref('tags', remote_side="Group.id")) + + files = relation("File", secondary=tags_files) + + def __init__(self, tag=None, group=None): + self.tag = tag + self.group = group + + def __repr__(self): + return "" % (str(self.tag), str(self.id)) + + +class Thumbnail(Base): + __tablename__ = "thumbnails" + id = Column(Integer, Sequence("thumbnail_id_seq"), primary_key=True) + file_id = Column(Integer, ForeignKey("files.id")) + filename = Column(Text) + + def __init__(self, filename=None): + self.filename = None + + def __repr__(self): + return "" % (str(self.filename), str(self.id)) + +class Image(Base): + __tablename__ = "images" + id = Column(Integer, Sequence("images_id_seq"), primary_key=True) + file_id = Column(Integer, ForeignKey("files.id")) + filename = Column(Text) + + def __init__(self, filename=None): + self.filename = filename + self.file = file + + def __repr__(self): + return "" % (str(self.filename), str(self.id)) + +class Exif(Base): + __tablename__ = "exif" + id = Column(Integer, Sequence("exif_id_seq"), primary_key=True) + file_id = Column(Integer, ForeignKey("files.id")) + camera = Column(Text) + date = Column(Text) + aperture = Column(Text) + exposure_program = Column(Text) + exposure_bias = Column(Text) + iso = Column(Text) + focal_length = Column(Text) + subject_distance = Column(Text) + metering_mode = Column(Text) + flash = Column(Text) + light_source = Column(Text) + resolution = Column(Text) + orientation = Column(Text) + + def __init__(self): + self.camera = None + self.date = None + self.aperture = None + self.exposure_program = None + self.exposure_bias = None + self.iso = None + self.focal_length = None + self.subject_distance = None + self.metering_mode = None + self.flash = None + self.light_source = None + self.resolution = None + self.orientation = None + + def __repr__(self): + return "" % (str(self.date), str(self.id)) + +class Gthumb(Base): + __tablename__ = "gthumb" + id = Column(Integer, Sequence("gthumb_id_seq"), primary_key=True) + file_id = Column(Integer, ForeignKey("files.id")) + note = Column(Text) + place = Column(Text) + date = Column(DateTime) + + def __init__(self, note=None, place=None, date=None): + self.note = note + self.place = place + self.date = date + + def __repr__(self): + return "" % (str(self.date), str(self.place), + str(self.id)) + diff --git a/test/unit/dbcommon_test.py b/test/unit/dbcommon_test.py index f6fdf76..60769b8 100644 --- a/test/unit/dbcommon_test.py +++ b/test/unit/dbcommon_test.py @@ -9,7 +9,7 @@ import unittest import os from tempfile import mkstemp -from pygtktalog.dbcommon import DataBase, create_database +from pygtktalog.dbcommon import connect, Meta, Session, Base class TestDataBase(unittest.TestCase): @@ -17,116 +17,11 @@ class TestDataBase(unittest.TestCase): Class responsible for database connection and schema creation """ - def tearDown(self): - """ - Tear down method. Close db connection. - """ - DataBase.close() - def test_connect(self): """ Test connection to database. Memory and file method will be tested. """ - DataBase.filename = ":memory:" - cursor = DataBase.connect() - self.assertTrue(cursor) - self.assertTrue(cursor == DataBase.connect()) - DataBase.close() - - file_desc, dbfilename = mkstemp() - os.close(file_desc) - - DataBase.filename = dbfilename - cursor = DataBase.connect() - - self.assertTrue(cursor) - self.assertTrue(cursor == DataBase.connect()) - DataBase.close() - - os.unlink(dbfilename) - - def test_close(self): - """ - Test close method - """ - DataBase.filename = ":memory:" - DataBase.connect() - - self.assertFalse(DataBase.cur is None) - self.assertFalse(DataBase.conn is None) - self.assertFalse(DataBase.filename is None) - - result = DataBase.close() - - self.assertTrue(result, "Should ne True, but was %s" % str(result)) - self.assertTrue(DataBase.cur is None) - self.assertTrue(DataBase.conn is None) - self.assertTrue(DataBase.filename is None) - - self.assertFalse(DataBase.close()) - - def test_get_cursor(self): - """ - Test get_cursor() method. - """ - cur = DataBase.get_cursor() - self.assertEqual(cur, None, "Cursor should be None") - - DataBase.filename = ":memory:" - DataBase.connect() - - cur = DataBase.get_cursor() - self.assertNotEqual(cur, None, "Cursor shouldn't be None") - - def test_get_cconnection(self): - """ - Test get_connection() method. - """ - conn = DataBase.get_connection() - self.assertEqual(conn, None, "Connection object should be None") - - DataBase.filename = ":memory:" - DataBase.connect() - - conn = DataBase.get_connection() - self.assertNotEqual(conn, None, "Connection object shouldn't be None") - - def test_check_schema(self): - """ - Test check_schema() method. - """ - - self.assertFalse(DataBase.check_schema()) - - DataBase.filename = ":memory:" - DataBase.connect() - - self.assertFalse(DataBase.check_schema()) - - DataBase.create_schema(DataBase.cur, DataBase.conn) - - self.assertTrue(DataBase.check_schema()) - - def test_create_schema(self): - """ - Test create_schema() method. - """ - self.assertFalse(DataBase.create_schema(DataBase.cur, DataBase.conn)) - - DataBase.filename = ":memory:" - DataBase.connect() - - result = DataBase.create_schema(DataBase.cur, DataBase.conn) - self.assertTrue(result, "%s" % result) - - self.assertTrue(DataBase.check_schema()) - - def test_create_database(self): - """ - Test create_database function - """ - create_database() - self.assertTrue(True) + connect(":memory:") if __name__ == "__main__":