1
0
mirror of https://github.com/gryf/pygtktalog.git synced 2025-12-17 11:30:19 +01:00

Simplify db interactions. Follow KISS principle.

This commit is contained in:
2009-07-19 18:16:14 +00:00
parent 6b1fdb90e9
commit 71162da225

View File

@@ -5,9 +5,7 @@
Author: Roman 'gryf' Dobosz, gryf73@gmail.com
Created: 2009-05-20
"""
import os
from sqlite3 import dbapi2 as sqlite
from tempfile import mkstemp
class SchemaError(Exception):
@@ -111,8 +109,7 @@ class DataBase(object):
@classmethod
def open(self, filename=":memory:", force=False):
"""
Open connection, check database schema, if it looks ok, than copy it
to memory.
Open connection, check database schema, and alternatively create one.
If provided filename is different from current, current connection is
closed and new connection is set up.
@@ -121,7 +118,7 @@ class DataBase(object):
@filename - full path of database file.
@force - force schema creating.
Returns: cursor, if db open succeded, False in other cases.
Returns: True, if db open succeded, False in other cases.
"""
if DataBase.filename is not None:
@@ -129,7 +126,7 @@ class DataBase(object):
DataBase.filename = filename
DataBase.__connect()
DataBase.connect()
if DataBase.cur is None:
return False
@@ -140,12 +137,9 @@ class DataBase(object):
" with pyGTKtalog")
else:
# create schema
DataBase.__create_schema(DataBase.cur, DataBase.conn)
DataBase.create_schema(DataBase.cur, DataBase.conn)
if not DataBase.__copy_to_memory():
return False
return DataBase.cur
return True
@classmethod
def check_schema(self):
@@ -172,7 +166,7 @@ class DataBase(object):
return orig_tables == table_names
@classmethod
def __create_schema(self, cur, conn):
def create_schema(self, cur, conn):
"""
Create database schema.
Returns: True on success, False otherwise
@@ -197,123 +191,20 @@ class DataBase(object):
conn.commit()
return True
@classmethod
def __connect(self):
def connect(self):
"""
Connect to database.
Returns: cursor for connection.
"""
if DataBase.cur:
return DataBase.cur
types = sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES
DataBase.conn = sqlite.connect(DataBase.filename, detect_types=types)
DataBase.cur = DataBase.conn.cursor()
return DataBase.cur
@classmethod
def __copy_to_memory(self):
"""
Copy database to :memory:
Returns: True if succeded, False otherwise
"""
if DataBase.cur is None:
return False
if not DataBase.check_schema():
return False
types = sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES
mem_conn = sqlite.connect(":memory:", detect_types=types)
mem_cur = mem_conn.cursor()
DataBase.__create_schema(mem_cur, mem_conn)
# copy data
for tablename in DataBase.schema_order:
if tablename in ('files', 'groups'):
sql = "select * from %s where id!=1" % tablename
else:
sql = "select * from %s" % tablename
data = DataBase.cur.execute(sql).fetchall()
cols = 0
if data and data[0] and data[0][0]:
cols = (len(data[0]) * "?,")[:-1]
sql = "insert into %s values(%s)" % (tablename, cols)
for row in data:
mem_cur.execute(sql, row)
# update sequences
seqs = DataBase.cur.execute("select name, seq from "
"sqlite_sequence").fetchall()
for seq in seqs:
sql = "update sqlite_sequence set seq=? where name=?"
mem_cur.execute(sql, (seq[1], seq[0]))
mem_conn.commit()
DataBase.conn.commit()
DataBase.cur.close()
DataBase.conn.close()
DataBase.cur = mem_cur
DataBase.conn = mem_conn
return True
@classmethod
def __copy_to_file(self):
"""
Copy database from :memory:
Returns: True if succeded, False otherwise
"""
if DataBase.cur is None:
return False
file_desc, dbfilename = mkstemp()
os.close(file_desc)
types = sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES
file_conn = sqlite.connect(dbfilename, detect_types=types)
file_cur = file_conn.cursor()
DataBase.__create_schema(file_cur, file_conn)
# copy data
for tablename in DataBase.schema_order:
if tablename in ('files', 'groups'):
sql = "select * from %s where id!=1" % tablename
else:
sql = "select * from %s" % tablename
data = DataBase.cur.execute(sql).fetchall()
cols = 0
if data and data[0] and data[0][0]:
cols = (len(data[0]) * "?,")[:-1]
sql = "insert into %s values(%s)" % (tablename, cols)
for row in data:
file_cur.execute(sql, row)
# update sequences
seqs = DataBase.cur.execute("select name, seq from "
"sqlite_sequence").fetchall()
for name, seq in seqs:
sql = "update sqlite_sequence set seq=? where name=?"
file_cur.execute(sql, (seq, name))
file_conn.commit()
file_cur.close()
file_conn.close()
return dbfilename
def create_database():
DataBase.open(":memory:", True)