mirror of
https://github.com/gryf/pygtktalog.git
synced 2025-12-17 11:30:19 +01:00
Migration of database stuff into objects managed by SQLAlchemy ORM.
This commit is contained in:
@@ -1,211 +1,29 @@
|
||||
"""
|
||||
Project: pyGTKtalog
|
||||
Description: Pseudo ORM. Warning. This is a hack.
|
||||
Description: Common database operations.
|
||||
Type: core
|
||||
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
|
||||
Created: 2009-05-20
|
||||
Created: 2009-08-07
|
||||
"""
|
||||
from sqlite3 import dbapi2 as sqlite
|
||||
from sqlalchemy import MetaData, create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
|
||||
class SchemaError(Exception):
|
||||
# Prepare SQLAlchemy objects
|
||||
Meta = MetaData()
|
||||
Base = declarative_base(metadata=Meta)
|
||||
Session = sessionmaker()
|
||||
|
||||
def connect(filename):
|
||||
"""
|
||||
Simple class for raising exceptions connected with DataBase class
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class DataBase(object):
|
||||
"""
|
||||
Main class for database common stuff
|
||||
"""
|
||||
|
||||
schema_order = ["files", "tags", "tags_files", "groups", "thumbnails",
|
||||
"images", "exif", "gthumb"]
|
||||
|
||||
schema = {"files": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"),
|
||||
("parent_id", "INTEGER"),
|
||||
("filename", "TEXT"),
|
||||
("filepath", "TEXT"),
|
||||
("date", "datetime"),
|
||||
("size", "INTEGER"),
|
||||
("type", "INTEGER"),
|
||||
("source", "INTEGER"),
|
||||
("note", "TEXT"),
|
||||
("description", "TEXT")],
|
||||
"tags": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"),
|
||||
("group_id", "INTEGER"),
|
||||
("tag", "TEXT")],
|
||||
"tags_files": [("file_id", "INTEGER"),
|
||||
("tag_id", "INTEGER")],
|
||||
"groups": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"),
|
||||
("name", "TEXT"),
|
||||
("color", "TEXT")],
|
||||
"thumbnails": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"),
|
||||
("file_id", "INTEGER"),
|
||||
("filename", "TEXT")],
|
||||
"images": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"),
|
||||
("file_id", "INTEGER"),
|
||||
("filename", "TEXT")],
|
||||
"exif": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"),
|
||||
("file_id", "INTEGER"),
|
||||
("camera", "TEXT"),
|
||||
("date", "TEXT"),
|
||||
("aperture", "TEXT"),
|
||||
("exposure_program", "TEXT"),
|
||||
("exposure_bias", "TEXT"),
|
||||
("iso", "TEXT"),
|
||||
("focal_length", "TEXT"),
|
||||
("subject_distance", "TEXT"),
|
||||
("metering_mode", "TEXT"),
|
||||
("flash", "TEXT"),
|
||||
("light_source", "TEXT"),
|
||||
("resolution", "TEXT"),
|
||||
("orientation", "TEXT")],
|
||||
"gthumb": [("id", "INTEGER PRIMARY KEY AUTOINCREMENT"),
|
||||
("file_id", "INTEGER"),
|
||||
("note", "TEXT"),
|
||||
("place", "TEXT"),
|
||||
("date", "datetime")]}
|
||||
|
||||
conn = None
|
||||
cur = None
|
||||
filename = None
|
||||
|
||||
@classmethod
|
||||
def get_connection(self):
|
||||
"""
|
||||
Returns: current connection or None if not connected.
|
||||
"""
|
||||
return DataBase.conn
|
||||
|
||||
@classmethod
|
||||
def get_cursor(self):
|
||||
"""
|
||||
Returns: current connection cursor, or None if not connected.
|
||||
"""
|
||||
return DataBase.cur
|
||||
|
||||
@classmethod
|
||||
def close(self):
|
||||
"""
|
||||
Close current connection. If there is no connection, do nothing.
|
||||
Returns: True, if db close was performed, False if there is no
|
||||
connection to close.
|
||||
"""
|
||||
if DataBase.cur is None:
|
||||
return False
|
||||
|
||||
DataBase.conn.commit()
|
||||
DataBase.cur.close()
|
||||
DataBase.conn.close()
|
||||
|
||||
DataBase.filename = None
|
||||
DataBase.cur = None
|
||||
DataBase.conn = None
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def open(self, filename=":memory:", force=False):
|
||||
"""
|
||||
Open connection, check database schema, and alternatively create one.
|
||||
|
||||
If provided filename is different from current, current connection is
|
||||
closed and new connection is set up.
|
||||
|
||||
create engine and bind to Meta object.
|
||||
Arguments:
|
||||
@filename - full path of database file.
|
||||
@force - force schema creating.
|
||||
|
||||
Returns: True, if db open succeded, False in other cases.
|
||||
@filename - string with absolute or relative path to sqlite database
|
||||
file.
|
||||
"""
|
||||
|
||||
if DataBase.filename is not None:
|
||||
DataBase.close()
|
||||
|
||||
DataBase.filename = filename
|
||||
|
||||
DataBase.connect()
|
||||
|
||||
if DataBase.cur is None:
|
||||
return False
|
||||
|
||||
if not DataBase.check_schema():
|
||||
if not force:
|
||||
raise SchemaError("Schema for this database is not compatible"
|
||||
" with pyGTKtalog")
|
||||
else:
|
||||
# create schema
|
||||
DataBase.create_schema(DataBase.cur, DataBase.conn)
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def check_schema(self):
|
||||
"""
|
||||
Check schema information.
|
||||
Returns: True, if schema is compatible with pyGTKtalog, False
|
||||
otherwise
|
||||
"""
|
||||
|
||||
if DataBase.cur is None:
|
||||
return False
|
||||
|
||||
tables = DataBase.cur.execute("select name, sql from sqlite_master "
|
||||
"where type='table' and "
|
||||
"name!='sqlite_sequence'").fetchall()
|
||||
table_names = []
|
||||
for table in tables:
|
||||
table_names.append(str(table[0]))
|
||||
|
||||
table_names.sort()
|
||||
orig_tables = DataBase.schema_order[:]
|
||||
orig_tables.sort()
|
||||
|
||||
return orig_tables == table_names
|
||||
|
||||
@classmethod
|
||||
def create_schema(self, cur, conn):
|
||||
"""
|
||||
Create database schema.
|
||||
Returns: True on success, False otherwise
|
||||
"""
|
||||
if cur is None:
|
||||
return False
|
||||
|
||||
for tablename in DataBase.schema_order:
|
||||
sql = "create table %s(" % tablename
|
||||
for column, coltype in DataBase.schema[tablename]:
|
||||
sql += "%s %s," % (column, coltype)
|
||||
sql = sql[:-1] + ")" # remove last comma and close definition
|
||||
#import pdb; pdb.set_trace()
|
||||
cur.execute(sql)
|
||||
|
||||
# add initial values
|
||||
cur.execute("INSERT INTO files VALUES"
|
||||
"(1, 1, 'root', null, 0, 0, 0, 0, null, null)")
|
||||
|
||||
cur.execute("INSERT INTO groups VALUES(1, 'default',"
|
||||
"'black')")
|
||||
conn.commit()
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def connect(self):
|
||||
"""
|
||||
Connect to database.
|
||||
Returns: cursor for connection.
|
||||
"""
|
||||
if DataBase.cur:
|
||||
return DataBase.cur
|
||||
|
||||
types = sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES
|
||||
DataBase.conn = sqlite.connect(DataBase.filename, detect_types=types)
|
||||
DataBase.cur = DataBase.conn.cursor()
|
||||
return DataBase.cur
|
||||
|
||||
|
||||
def create_database():
|
||||
DataBase.open(":memory:", True)
|
||||
engine = create_engine("sqlite:///%s" % filename, echo=True)
|
||||
Meta.bind = engine
|
||||
Meta.create_all()
|
||||
|
||||
|
||||
157
pygtktalog/dbobjects.py
Normal file
157
pygtktalog/dbobjects.py
Normal file
@@ -0,0 +1,157 @@
|
||||
"""
|
||||
Project: pyGTKtalog
|
||||
Description: Definition of DB objects classes. Using SQLAlchemy.
|
||||
Type: core
|
||||
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
|
||||
Created: 2009-08-07
|
||||
"""
|
||||
from sqlalchemy import Column, Table, Integer, Text
|
||||
from sqlalchemy import DateTime, ForeignKey, Sequence
|
||||
from sqlalchemy.orm import relation, backref
|
||||
from pygtktalog.dbcommon import Base
|
||||
|
||||
|
||||
tags_files = Table("tags_files", Base.metadata,
|
||||
Column("file_id", Integer, ForeignKey("files.id")),
|
||||
Column("tag_id", Integer, ForeignKey("tags.id")))
|
||||
|
||||
class File(Base):
|
||||
__tablename__ = "files"
|
||||
id = Column(Integer, Sequence("file_id_seq"), primary_key=True)
|
||||
parent_id = Column(Integer, ForeignKey("files.id"))
|
||||
filename = Column(Text)
|
||||
filepath = Column(Text)
|
||||
date = Column(DateTime)
|
||||
size = Column(Integer)
|
||||
type = Column(Integer)
|
||||
source = Column(Integer)
|
||||
note = Column(Text)
|
||||
description = Column(Text)
|
||||
|
||||
children = relation('File', backref=backref('parent',
|
||||
remote_side="File.id"))
|
||||
tags = relation("Tag", secondary=tags_files)
|
||||
thumbnail = relation("Thumbnail", backref="file")
|
||||
images = relation("Image", backref="file")
|
||||
|
||||
def __init__(self, filename=None, path=None, date=None, size=None,
|
||||
ftype=None, src=None):
|
||||
"""Create file object with empty defaults"""
|
||||
self.filename = filename
|
||||
self.filepath = path
|
||||
self.date = date
|
||||
self.size = size
|
||||
self.type = ftype
|
||||
self.source = src
|
||||
|
||||
def __repr__(self):
|
||||
return "<File('%s', %s)>" % (str(self.filename), str(self.id))
|
||||
|
||||
class Group(Base):
|
||||
__tablename__ = "groups"
|
||||
id = Column(Integer, Sequence("group_id_seq"), primary_key=True)
|
||||
name = Column(Text)
|
||||
color = Column(Text)
|
||||
|
||||
def __init__(self, name=None, color=None):
|
||||
self.name = name
|
||||
self.color = color
|
||||
|
||||
def __repr__(self):
|
||||
return "<Group('%s', %s)>" % (str(self.name), str(self.id))
|
||||
|
||||
class Tag(Base):
|
||||
__tablename__ = "tags"
|
||||
id = Column(Integer, Sequence("tags_id_seq"), primary_key=True)
|
||||
group_id = Column(Integer, ForeignKey("groups.id"))
|
||||
tag = Column(Text)
|
||||
group = relation('Group', backref=backref('tags', remote_side="Group.id"))
|
||||
|
||||
files = relation("File", secondary=tags_files)
|
||||
|
||||
def __init__(self, tag=None, group=None):
|
||||
self.tag = tag
|
||||
self.group = group
|
||||
|
||||
def __repr__(self):
|
||||
return "<Tag('%s', %s)>" % (str(self.tag), str(self.id))
|
||||
|
||||
|
||||
class Thumbnail(Base):
|
||||
__tablename__ = "thumbnails"
|
||||
id = Column(Integer, Sequence("thumbnail_id_seq"), primary_key=True)
|
||||
file_id = Column(Integer, ForeignKey("files.id"))
|
||||
filename = Column(Text)
|
||||
|
||||
def __init__(self, filename=None):
|
||||
self.filename = None
|
||||
|
||||
def __repr__(self):
|
||||
return "<Thumbnail('%s', %s)>" % (str(self.filename), str(self.id))
|
||||
|
||||
class Image(Base):
|
||||
__tablename__ = "images"
|
||||
id = Column(Integer, Sequence("images_id_seq"), primary_key=True)
|
||||
file_id = Column(Integer, ForeignKey("files.id"))
|
||||
filename = Column(Text)
|
||||
|
||||
def __init__(self, filename=None):
|
||||
self.filename = filename
|
||||
self.file = file
|
||||
|
||||
def __repr__(self):
|
||||
return "<Image('%s', %s)>" % (str(self.filename), str(self.id))
|
||||
|
||||
class Exif(Base):
|
||||
__tablename__ = "exif"
|
||||
id = Column(Integer, Sequence("exif_id_seq"), primary_key=True)
|
||||
file_id = Column(Integer, ForeignKey("files.id"))
|
||||
camera = Column(Text)
|
||||
date = Column(Text)
|
||||
aperture = Column(Text)
|
||||
exposure_program = Column(Text)
|
||||
exposure_bias = Column(Text)
|
||||
iso = Column(Text)
|
||||
focal_length = Column(Text)
|
||||
subject_distance = Column(Text)
|
||||
metering_mode = Column(Text)
|
||||
flash = Column(Text)
|
||||
light_source = Column(Text)
|
||||
resolution = Column(Text)
|
||||
orientation = Column(Text)
|
||||
|
||||
def __init__(self):
|
||||
self.camera = None
|
||||
self.date = None
|
||||
self.aperture = None
|
||||
self.exposure_program = None
|
||||
self.exposure_bias = None
|
||||
self.iso = None
|
||||
self.focal_length = None
|
||||
self.subject_distance = None
|
||||
self.metering_mode = None
|
||||
self.flash = None
|
||||
self.light_source = None
|
||||
self.resolution = None
|
||||
self.orientation = None
|
||||
|
||||
def __repr__(self):
|
||||
return "<Exif('%s', %s)>" % (str(self.date), str(self.id))
|
||||
|
||||
class Gthumb(Base):
|
||||
__tablename__ = "gthumb"
|
||||
id = Column(Integer, Sequence("gthumb_id_seq"), primary_key=True)
|
||||
file_id = Column(Integer, ForeignKey("files.id"))
|
||||
note = Column(Text)
|
||||
place = Column(Text)
|
||||
date = Column(DateTime)
|
||||
|
||||
def __init__(self, note=None, place=None, date=None):
|
||||
self.note = note
|
||||
self.place = place
|
||||
self.date = date
|
||||
|
||||
def __repr__(self):
|
||||
return "<Gthumb('%s', '%s', %s)>" % (str(self.date), str(self.place),
|
||||
str(self.id))
|
||||
|
||||
@@ -9,7 +9,7 @@ import unittest
|
||||
import os
|
||||
from tempfile import mkstemp
|
||||
|
||||
from pygtktalog.dbcommon import DataBase, create_database
|
||||
from pygtktalog.dbcommon import connect, Meta, Session, Base
|
||||
|
||||
|
||||
class TestDataBase(unittest.TestCase):
|
||||
@@ -17,116 +17,11 @@ class TestDataBase(unittest.TestCase):
|
||||
Class responsible for database connection and schema creation
|
||||
"""
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
Tear down method. Close db connection.
|
||||
"""
|
||||
DataBase.close()
|
||||
|
||||
def test_connect(self):
|
||||
"""
|
||||
Test connection to database. Memory and file method will be tested.
|
||||
"""
|
||||
DataBase.filename = ":memory:"
|
||||
cursor = DataBase.connect()
|
||||
self.assertTrue(cursor)
|
||||
self.assertTrue(cursor == DataBase.connect())
|
||||
DataBase.close()
|
||||
|
||||
file_desc, dbfilename = mkstemp()
|
||||
os.close(file_desc)
|
||||
|
||||
DataBase.filename = dbfilename
|
||||
cursor = DataBase.connect()
|
||||
|
||||
self.assertTrue(cursor)
|
||||
self.assertTrue(cursor == DataBase.connect())
|
||||
DataBase.close()
|
||||
|
||||
os.unlink(dbfilename)
|
||||
|
||||
def test_close(self):
|
||||
"""
|
||||
Test close method
|
||||
"""
|
||||
DataBase.filename = ":memory:"
|
||||
DataBase.connect()
|
||||
|
||||
self.assertFalse(DataBase.cur is None)
|
||||
self.assertFalse(DataBase.conn is None)
|
||||
self.assertFalse(DataBase.filename is None)
|
||||
|
||||
result = DataBase.close()
|
||||
|
||||
self.assertTrue(result, "Should ne True, but was %s" % str(result))
|
||||
self.assertTrue(DataBase.cur is None)
|
||||
self.assertTrue(DataBase.conn is None)
|
||||
self.assertTrue(DataBase.filename is None)
|
||||
|
||||
self.assertFalse(DataBase.close())
|
||||
|
||||
def test_get_cursor(self):
|
||||
"""
|
||||
Test get_cursor() method.
|
||||
"""
|
||||
cur = DataBase.get_cursor()
|
||||
self.assertEqual(cur, None, "Cursor should be None")
|
||||
|
||||
DataBase.filename = ":memory:"
|
||||
DataBase.connect()
|
||||
|
||||
cur = DataBase.get_cursor()
|
||||
self.assertNotEqual(cur, None, "Cursor shouldn't be None")
|
||||
|
||||
def test_get_cconnection(self):
|
||||
"""
|
||||
Test get_connection() method.
|
||||
"""
|
||||
conn = DataBase.get_connection()
|
||||
self.assertEqual(conn, None, "Connection object should be None")
|
||||
|
||||
DataBase.filename = ":memory:"
|
||||
DataBase.connect()
|
||||
|
||||
conn = DataBase.get_connection()
|
||||
self.assertNotEqual(conn, None, "Connection object shouldn't be None")
|
||||
|
||||
def test_check_schema(self):
|
||||
"""
|
||||
Test check_schema() method.
|
||||
"""
|
||||
|
||||
self.assertFalse(DataBase.check_schema())
|
||||
|
||||
DataBase.filename = ":memory:"
|
||||
DataBase.connect()
|
||||
|
||||
self.assertFalse(DataBase.check_schema())
|
||||
|
||||
DataBase.create_schema(DataBase.cur, DataBase.conn)
|
||||
|
||||
self.assertTrue(DataBase.check_schema())
|
||||
|
||||
def test_create_schema(self):
|
||||
"""
|
||||
Test create_schema() method.
|
||||
"""
|
||||
self.assertFalse(DataBase.create_schema(DataBase.cur, DataBase.conn))
|
||||
|
||||
DataBase.filename = ":memory:"
|
||||
DataBase.connect()
|
||||
|
||||
result = DataBase.create_schema(DataBase.cur, DataBase.conn)
|
||||
self.assertTrue(result, "%s" % result)
|
||||
|
||||
self.assertTrue(DataBase.check_schema())
|
||||
|
||||
def test_create_database(self):
|
||||
"""
|
||||
Test create_database function
|
||||
"""
|
||||
create_database()
|
||||
self.assertTrue(True)
|
||||
connect(":memory:")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user