pg.py
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:7k
- # pgutil.py
- # Written by D'Arcy J.M. Cain
- # This library implements some basic database management stuff
- # It includes the pg module and builds on it
- from _pg import *
- import string, re, sys
- # utility function
- # We expect int, seq, decimal, text or date (more later)
- def _quote(d, t):
- if t in ['int', 'decimal', 'seq']:
- if d == "": return 0
- return "%s" % d
- if t == 'money':
- if d == "": return '0.00'
- return "'%.2f'" % d
- if t == 'bool':
- if string.upper(d) in ['T', 'TRUE', 'Y', 'YES', 1, '1', 'ON']:
- return "'t'"
- else:
- return "'f'"
- if d == "": return "null"
- return "'%s'" % string.strip(re.sub("'", "''", "%s" % d))
- class DB:
- """This class wraps the pg connection type"""
- def __init__(self, *args, **kw):
- self.db = apply(connect, args, kw)
- # Create convience methods, in a way that is still overridable.
- for e in ( 'query', 'reset', 'close', 'getnotify', 'inserttable',
- 'putline', 'getline', 'endcopy',
- 'host', 'port', 'db', 'options',
- 'tty', 'error', 'status', 'user',
- 'locreate', 'getlo', 'loimport' ):
- if not hasattr(self,e) and hasattr(self.db,e):
- exec 'self.%s = self.db.%s' % ( e, e )
- self.attnames = {}
- self.pkeys = {}
- self.debug = None # For debugging scripts, set to output format
- # that takes a single string arg. For example
- # in a CGI set to "%s<BR>"
- # Get all the primary keys at once
- for rel, att in self.db.query("""SELECT
- pg_class.relname, pg_attribute.attname
- FROM pg_class, pg_attribute, pg_index
- WHERE pg_class.oid = pg_attribute.attrelid AND
- pg_class.oid = pg_index.indrelid AND
- pg_index.indkey[0] = pg_attribute.attnum AND
- pg_index.indisprimary = 't'""").getresult():
- self.pkeys[rel] = att
- def pkey(self, cl):
- # will raise an exception if primary key doesn't exist
- return self.pkeys[cl]
- def get_databases(self):
- l = []
- for n in self.db.query("SELECT datname FROM pg_database").getresult():
- l.append(n[0])
- return l
- def get_tables(self):
- l = []
- for n in self.db.query("""SELECT relname FROM pg_class
- WHERE relkind = 'r' AND
- relname !~ '^Inv' AND
- relname !~ '^pg_'""").getresult():
- l.append(n[0])
- return l
- def get_attnames(self, cl):
- # May as well cache them
- if self.attnames.has_key(cl):
- return self.attnames[cl]
- query = """SELECT pg_attribute.attname, pg_type.typname
- FROM pg_class, pg_attribute, pg_type
- WHERE pg_class.relname = '%s' AND
- pg_attribute.attnum > 0 AND
- pg_attribute.attrelid = pg_class.oid AND
- pg_attribute.atttypid = pg_type.oid"""
- l = {}
- for attname, typname in self.db.query(query % cl).getresult():
- if re.match("^int", typname):
- l[attname] = 'int'
- elif re.match("^oid", typname):
- l[attname] = 'int'
- elif re.match("^text", typname):
- l[attname] = 'text'
- elif re.match("^char", typname):
- l[attname] = 'text'
- elif re.match("^name", typname):
- l[attname] = 'text'
- elif re.match("^abstime", typname):
- l[attname] = 'date'
- elif re.match("^date", typname):
- l[attname] = 'date'
- elif re.match("^bool", typname):
- l[attname] = 'bool'
- elif re.match("^float", typname):
- l[attname] = 'decimal'
- elif re.match("^money", typname):
- l[attname] = 'money'
- else:
- l[attname] = 'text'
- self.attnames[cl] = l
- return self.attnames[cl]
- # return a tuple from a database
- def get(self, cl, arg, keyname = None):
- if keyname == None: # use the primary key by default
- keyname = self.pkeys[cl]
- fnames = self.get_attnames(cl)
- if type(arg) == type({}):
- # To allow users to work with multiple tables we munge the
- # name when the key is "oid"
- if keyname == 'oid': k = arg['oid_%s' % cl]
- else: k = arg[keyname]
- else:
- k = arg
- arg = {}
- # We want the oid for later updates if that isn't the key
- if keyname == 'oid':
- q = "SELECT * FROM %s WHERE oid = %s" % (cl, k)
- else:
- q = "SELECT oid AS oid_%s, %s FROM %s WHERE %s = %s" %
- (cl, string.join(fnames.keys(), ','),
- cl, keyname, _quote(k, fnames[keyname]))
- if self.debug != None: print self.debug % q
- res = self.db.query(q).dictresult()
- if res == []:
- raise error,
- "No such record in %s where %s is %s" %
- (cl, keyname, _quote(k, fnames[keyname]))
- return None
- for k in res[0].keys():
- arg[k] = res[0][k]
- return arg
- # Inserts a new tuple into a table
- def insert(self, cl, a):
- fnames = self.get_attnames(cl)
- l = []
- n = []
- for f in fnames.keys():
- if a.has_key(f):
- if a[f] == "": l.append("null")
- else: l.append(_quote(a[f], fnames[f]))
- n.append(f)
- try:
- q = "INSERT INTO %s (%s) VALUES (%s)" %
- (cl, string.join(n, ','), string.join(l, ','))
- if self.debug != None: print self.debug % q
- a['oid_%s' % cl] = self.db.query(q)
- except:
- raise error, "Error inserting into %s: %s" % (cl, sys.exc_value)
- # reload the dictionary to catch things modified by engine
- # note that get() changes 'oid' below to oid_table
- # if no read perms (it can and does happen) return None
- try: return self.get(cl, a, 'oid')
- except: return None
- # Update always works on the oid which get returns if available
- # otherwise use the primary key. Fail if neither.
- def update(self, cl, a):
- foid = 'oid_%s' % cl
- pk = self.pkeys[cl]
- if a.has_key(foid):
- where = "oid = %s" % a[foid]
- elif a.has_key(pk):
- where = "%s = '%s'" % (pk, a[pk])
- else:
- raise error, "Update needs key (%s) or oid as %s" % (pk, foid)
- q = "SELECT oid FROM %s WHERE %s" % (cl, where)
- if self.debug != None: print self.debug % q
- res = self.db.query(q).getresult()
- if len(res) < 1:
- raise error, "No record in %s where %s (%s)" %
- (cl, where, sys.exc_value)
- else: a[foid] = res[0][0]
- v = []
- k = 0
- fnames = self.get_attnames(cl)
- for ff in fnames.keys():
- if a.has_key(ff) and a[ff] != res[0][k]:
- v.append("%s = %s" % (ff, _quote(a[ff], fnames[ff])))
- if v == []:
- return None
- try:
- q = "UPDATE %s SET %s WHERE oid = %s" %
- (cl, string.join(v, ','), a[foid])
- if self.debug != None: print self.debug % q
- self.db.query(q)
- except:
- raise error, "Can't update %s: %s" % (cl, sys.exc_value)
- # reload the dictionary to catch things modified by engine
- return self.get(cl, a, 'oid')
- # At some point we will need a way to get defaults from a table
- def clear(self, cl, a = {}):
- fnames = self.get_attnames(cl)
- for ff in fnames.keys():
- if fnames[ff] in ['int', 'decimal', 'seq', 'money']:
- a[ff] = 0
- elif fnames[ff] == 'date':
- a[ff] = 'TODAY'
- else:
- a[ff] = ""
- a['oid'] = 0
- return a
- # Like update, delete works on the oid
- # one day we will be testing that the record to be deleted
- # isn't referenced somewhere (or else PostgreSQL will)
- def delete(self, cl, a):
- try:
- q = "DELETE FROM %s WHERE oid = %s" % (cl, a['oid_%s' % cl])
- if self.debug != None: print self.debug % q
- self.db.query(q)
- except:
- raise error, "Can't delete %s: %s" % (cl, sys.exc_value)
- return None