Implement automatic database cleanup
[cascardo/ipsilon.git] / ipsilon / util / data.py
1 # Copyright (C) 2013 Ipsilon project Contributors, for license see COPYING
2
3 import cherrypy
4 import datetime
5 from ipsilon.util.log import Log
6 from sqlalchemy import create_engine
7 from sqlalchemy import MetaData, Table, Column, Text
8 from sqlalchemy.pool import QueuePool, SingletonThreadPool
9 from sqlalchemy.schema import (PrimaryKeyConstraint, Index, AddConstraint,
10                                CreateIndex)
11 from sqlalchemy.sql import select, and_
12 import ConfigParser
13 import os
14 import uuid
15 import logging
16 import time
17
18
19 CURRENT_SCHEMA_VERSION = 2
20 OPTIONS_TABLE = {'columns': ['name', 'option', 'value'],
21                  'primary_key': ('name', 'option'),
22                  'indexes': [('name',)]
23                  }
24 UNIQUE_DATA_TABLE = {'columns': ['uuid', 'name', 'value'],
25                      'primary_key': ('uuid', 'name'),
26                      'indexes': [('uuid',)]
27                      }
28
29
30 class DatabaseError(Exception):
31     pass
32
33
34 class BaseStore(Log):
35     # Some helper functions used for upgrades
36     def add_constraint(self, table):
37         raise NotImplementedError()
38
39     def add_index(self, index):
40         raise NotImplementedError()
41
42
43 class SqlStore(BaseStore):
44     __instances = {}
45
46     @classmethod
47     def get_connection(cls, name):
48         if name not in cls.__instances:
49             if cherrypy.config.get('db.conn.log', False):
50                 logging.debug('SqlStore new: %s', name)
51             cls.__instances[name] = SqlStore(name)
52         return cls.__instances[name]
53
54     def __init__(self, name):
55         self.db_conn_log = cherrypy.config.get('db.conn.log', False)
56         self.debug('SqlStore init: %s' % name)
57         self.name = name
58         engine_name = name
59         if '://' not in engine_name:
60             engine_name = 'sqlite:///' + engine_name
61         # This pool size is per configured database. The minimum needed,
62         #  determined by binary search, is 23. We're using 25 so we have a bit
63         #  more playroom, and then the overflow should make sure things don't
64         #  break when we suddenly need more.
65         pool_args = {'poolclass': QueuePool,
66                      'pool_size': 25,
67                      'max_overflow': 50}
68         if engine_name.startswith('sqlite://'):
69             # It's not possible to share connections for SQLite between
70             #  threads, so let's use the SingletonThreadPool for them
71             pool_args = {'poolclass': SingletonThreadPool}
72         self._dbengine = create_engine(engine_name, **pool_args)
73         self.is_readonly = False
74
75     def add_constraint(self, constraint):
76         if self._dbengine.dialect.name != 'sqlite':
77             # It is impossible to add constraints to a pre-existing table for
78             #  SQLite
79             # source: http://www.sqlite.org/omitted.html
80             create_constraint = AddConstraint(constraint, bind=self._dbengine)
81             create_constraint.execute()
82
83     def add_index(self, index):
84         add_index = CreateIndex(index, bind=self._dbengine)
85         add_index.execute()
86
87     def debug(self, fact):
88         if self.db_conn_log:
89             super(SqlStore, self).debug(fact)
90
91     def engine(self):
92         return self._dbengine
93
94     def connection(self):
95         self.debug('SqlStore connect: %s' % self.name)
96         conn = self._dbengine.connect()
97
98         def cleanup_connection():
99             self.debug('SqlStore cleanup: %s' % self.name)
100             conn.close()
101         cherrypy.request.hooks.attach('on_end_request', cleanup_connection)
102         return conn
103
104
105 class SqlQuery(Log):
106
107     def __init__(self, db_obj, table, table_def, trans=True):
108         self._db = db_obj
109         self._con = self._db.connection()
110         self._trans = self._con.begin() if trans else None
111         self._table = self._get_table(table, table_def)
112
113     def _get_table(self, name, table_def):
114         if isinstance(table_def, list):
115             table_def = {'columns': table_def,
116                          'indexes': [],
117                          'primary_key': None}
118         table_creation = []
119         for col_name in table_def['columns']:
120             table_creation.append(Column(col_name, Text()))
121         if table_def['primary_key']:
122             table_creation.append(PrimaryKeyConstraint(
123                 *table_def['primary_key']))
124         for index in table_def['indexes']:
125             idx_name = 'idx_%s_%s' % (name, '_'.join(index))
126             table_creation.append(Index(idx_name, *index))
127         table = Table(name, MetaData(self._db.engine()), *table_creation)
128         return table
129
130     def _where(self, kvfilter):
131         where = None
132         if kvfilter is not None:
133             for k in kvfilter:
134                 w = self._table.columns[k] == kvfilter[k]
135                 if where is None:
136                     where = w
137                 else:
138                     where = where & w
139         return where
140
141     def _columns(self, columns=None):
142         cols = None
143         if columns is not None:
144             cols = []
145             for c in columns:
146                 cols.append(self._table.columns[c])
147         else:
148             cols = self._table.columns
149         return cols
150
151     def rollback(self):
152         self._trans.rollback()
153
154     def commit(self):
155         self._trans.commit()
156
157     def create(self):
158         self._table.create(checkfirst=True)
159
160     def drop(self):
161         self._table.drop(checkfirst=True)
162
163     def select(self, kvfilter=None, columns=None):
164         return self._con.execute(select(self._columns(columns),
165                                         self._where(kvfilter)))
166
167     def insert(self, values):
168         self._con.execute(self._table.insert(values))
169
170     def update(self, values, kvfilter):
171         self._con.execute(self._table.update(self._where(kvfilter), values))
172
173     def delete(self, kvfilter):
174         self._con.execute(self._table.delete(self._where(kvfilter)))
175
176
177 class FileStore(BaseStore):
178
179     def __init__(self, name):
180         self._filename = name
181         self.is_readonly = True
182         self._timestamp = None
183         self._config = None
184
185     def get_config(self):
186         try:
187             stat = os.stat(self._filename)
188         except OSError, e:
189             self.error("Unable to check config file %s: [%s]" % (
190                 self._filename, e))
191             self._config = None
192             raise
193         timestamp = stat.st_mtime
194         if self._config is None or timestamp > self._timestamp:
195             self._config = ConfigParser.RawConfigParser()
196             self._config.optionxform = str
197             self._config.read(self._filename)
198         return self._config
199
200     def add_constraint(self, table):
201         raise NotImplementedError()
202
203     def add_index(self, index):
204         raise NotImplementedError()
205
206
207 class FileQuery(Log):
208
209     def __init__(self, fstore, table, table_def, trans=True):
210         # We don't need indexes in a FileQuery, so drop that info
211         if isinstance(table_def, dict):
212             columns = table_def['columns']
213         else:
214             columns = table_def
215         self._fstore = fstore
216         self._config = fstore.get_config()
217         self._section = table
218         if len(columns) > 3 or columns[-1] != 'value':
219             raise ValueError('Unsupported configuration format')
220         self._columns = columns
221
222     def rollback(self):
223         return
224
225     def commit(self):
226         return
227
228     def create(self):
229         raise NotImplementedError
230
231     def drop(self):
232         raise NotImplementedError
233
234     def select(self, kvfilter=None, columns=None):
235         if self._section not in self._config.sections():
236             return []
237
238         opts = self._config.options(self._section)
239
240         prefix = None
241         prefix_ = ''
242         if self._columns[0] in kvfilter:
243             prefix = kvfilter[self._columns[0]]
244             prefix_ = prefix + ' '
245
246         name = None
247         if len(self._columns) == 3 and self._columns[1] in kvfilter:
248             name = kvfilter[self._columns[1]]
249
250         value = None
251         if self._columns[-1] in kvfilter:
252             value = kvfilter[self._columns[-1]]
253
254         res = []
255         for o in opts:
256             if len(self._columns) == 3:
257                 # 3 cols
258                 if prefix and not o.startswith(prefix_):
259                     continue
260
261                 col1, col2 = o.split(' ', 1)
262                 if name and col2 != name:
263                     continue
264
265                 col3 = self._config.get(self._section, o)
266                 if value and col3 != value:
267                     continue
268
269                 r = [col1, col2, col3]
270             else:
271                 # 2 cols
272                 if prefix and o != prefix:
273                     continue
274                 r = [o, self._config.get(self._section, o)]
275
276             if columns:
277                 s = []
278                 for c in columns:
279                     s.append(r[self._columns.index(c)])
280                 res.append(s)
281             else:
282                 res.append(r)
283
284         self.debug('SELECT(%s, %s, %s) -> %s' % (self._section,
285                                                  repr(kvfilter),
286                                                  repr(columns),
287                                                  repr(res)))
288         return res
289
290     def insert(self, values):
291         raise NotImplementedError
292
293     def update(self, values, kvfilter):
294         raise NotImplementedError
295
296     def delete(self, kvfilter):
297         raise NotImplementedError
298
299
300 class Store(Log):
301     # Static, Store-level variables
302     _is_upgrade = False
303     __cleanups = {}
304
305     # Static, class-level variables
306     # Either set this to False, or implement _cleanup, in child classes
307     _should_cleanup = True
308
309     def __init__(self, config_name=None, database_url=None):
310         if config_name is None and database_url is None:
311             raise ValueError('config_name or database_url must be provided')
312         if config_name:
313             if config_name not in cherrypy.config:
314                 raise NameError('Unknown database %s' % config_name)
315             name = cherrypy.config[config_name]
316         else:
317             name = database_url
318         if name.startswith('configfile://'):
319             _, filename = name.split('://')
320             self._db = FileStore(filename)
321             self._query = FileQuery
322         else:
323             self._db = SqlStore.get_connection(name)
324             self._query = SqlQuery
325
326         if not self._is_upgrade:
327             self._check_database()
328             if self._should_cleanup:
329                 self._schedule_cleanup()
330
331     def _schedule_cleanup(self):
332         store_name = self.__class__.__name__
333         if self.is_readonly:
334             # No use in cleanups on a readonly database
335             self.debug('Not scheduling cleanup for %s due to readonly' %
336                        store_name)
337             return
338         if store_name in Store.__cleanups:
339             # This class was already scheduled, skip
340             return
341         self.debug('Scheduling cleanups for %s' % store_name)
342         # Check once every minute whether we need to clean
343         task = cherrypy.process.plugins.BackgroundTask(
344             60, self._maybe_run_cleanup)
345         task.start()
346         Store.__cleanups[store_name] = task
347
348     def _maybe_run_cleanup(self):
349         # Let's see if we need to do cleanup
350         last_clean = self.load_options('dbinfo').get('%s_last_clean' %
351                                                      self.__class__.__name__,
352                                                      {})
353         time_diff = cherrypy.config.get('cleanup_interval', 30) * 60
354         next_ts = int(time.time()) - time_diff
355         self.debug('Considering cleanup for %s: %s. Next at: %s'
356                    % (self.__class__.__name__, last_clean, next_ts))
357         if ('timestamp' not in last_clean or
358                 int(last_clean['timestamp']) <= next_ts):
359             # First store the current time so that other servers don't start
360             self.save_options('dbinfo', '%s_last_clean'
361                               % self.__class__.__name__,
362                               {'timestamp': int(time.time()),
363                                'removed_entries': -1})
364
365             # Cleanup has been long enough ago, let's run
366             self.debug('Cleaning up for %s' % self.__class__.__name__)
367             removed_entries = self._cleanup()
368             self.debug('Cleaned up %i entries for %s' %
369                        (removed_entries, self.__class__.__name__))
370             self.save_options('dbinfo', '%s_last_clean'
371                               % self.__class__.__name__,
372                               {'timestamp': int(time.time()),
373                                'removed_entries': removed_entries})
374
375     def _cleanup(self):
376         # The default cleanup is to do nothing
377         # This function should return the number of rows it cleaned up.
378         # This information may be used to automatically tune the clean period.
379         self.error('Cleanup for %s not implemented' %
380                    self.__class__.__name__)
381         return 0
382
383     def _code_schema_version(self):
384         # This function makes it possible for separate plugins to have
385         #  different schema versions. We default to the global schema
386         #  version.
387         return CURRENT_SCHEMA_VERSION
388
389     def _get_schema_version(self):
390         # We are storing multiple versions: one per class
391         # That way, we can support plugins with differing schema versions from
392         #  the main codebase, and even in the same database.
393         q = self._query(self._db, 'dbinfo', OPTIONS_TABLE, trans=False)
394         q.create()
395         q._con.close()  # pylint: disable=protected-access
396         cls_name = self.__class__.__name__
397         current_version = self.load_options('dbinfo').get('%s_schema'
398                                                           % cls_name, {})
399         if 'version' in current_version:
400             return int(current_version['version'])
401         else:
402             # Also try the old table name.
403             # "scheme" was a typo, but we need to retain that now for compat
404             fallback_version = self.load_options('dbinfo').get('scheme',
405                                                                {})
406             if 'version' in fallback_version:
407                 # Explanation for this is in def upgrade_database(self)
408                 return -1
409             else:
410                 return None
411
412     def _check_database(self):
413         if self.is_readonly:
414             # If the database is readonly, we cannot do anything to the
415             #  schema. Let's just return, and assume people checked the
416             #  upgrade notes
417             return
418
419         current_version = self._get_schema_version()
420         if current_version is None:
421             self.error('Database initialization required! ' +
422                        'Please run ipsilon-upgrade-database')
423             raise DatabaseError('Database initialization required for %s' %
424                                 self.__class__.__name__)
425         if current_version != self._code_schema_version():
426             self.error('Database upgrade required! ' +
427                        'Please run ipsilon-upgrade-database')
428             raise DatabaseError('Database upgrade required for %s' %
429                                 self.__class__.__name__)
430
431     def _store_new_schema_version(self, new_version):
432         cls_name = self.__class__.__name__
433         self.save_options('dbinfo', '%s_schema' % cls_name,
434                           {'version': new_version})
435
436     def _initialize_schema(self):
437         raise NotImplementedError()
438
439     def _upgrade_schema(self, old_version):
440         # Datastores need to figure out what to do with bigger old_versions
441         #  themselves.
442         # They might implement downgrading if that's feasible, or just throw
443         #  NotImplementedError
444         # Should return the new schema version
445         raise NotImplementedError()
446
447     def upgrade_database(self):
448         # Do whatever is needed to get schema to current version
449         old_schema_version = self._get_schema_version()
450         if old_schema_version is None:
451             # Just initialize a new schema
452             self._initialize_schema()
453             self._store_new_schema_version(self._code_schema_version())
454         elif old_schema_version == -1:
455             # This is a special-case from 1.0: we only created tables at the
456             # first time they were actually used, but the upgrade code assumes
457             # that the tables exist. So let's fix this.
458             self._initialize_schema()
459             # The old version was schema version 1
460             self._store_new_schema_version(1)
461             self.upgrade_database()
462         elif old_schema_version != self._code_schema_version():
463             # Upgrade from old_schema_version to code_schema_version
464             self.debug('Upgrading from schema version %i' % old_schema_version)
465             new_version = self._upgrade_schema(old_schema_version)
466             if not new_version:
467                 error = ('Schema upgrade error: %s did not provide a ' +
468                          'new schema version number!' %
469                          self.__class__.__name__)
470                 self.error(error)
471                 raise Exception(error)
472             self._store_new_schema_version(new_version)
473             # Check if we are now up-to-date
474             self.upgrade_database()
475
476     @property
477     def is_readonly(self):
478         return self._db.is_readonly
479
480     def _row_to_dict_tree(self, data, row):
481         name = row[0]
482         if len(row) > 2:
483             if name not in data:
484                 data[name] = dict()
485             d2 = data[name]
486             self._row_to_dict_tree(d2, row[1:])
487         else:
488             value = row[1]
489             if name in data:
490                 if data[name] is list:
491                     data[name].append(value)
492                 else:
493                     v = data[name]
494                     data[name] = [v, value]
495             else:
496                 data[name] = value
497
498     def _rows_to_dict_tree(self, rows):
499         data = dict()
500         for r in rows:
501             self._row_to_dict_tree(data, r)
502         return data
503
504     def _load_data(self, table, columns, kvfilter=None):
505         rows = []
506         try:
507             q = self._query(self._db, table, columns, trans=False)
508             rows = q.select(kvfilter)
509         except Exception, e:  # pylint: disable=broad-except
510             self.error("Failed to load data for table %s: [%s]" % (table, e))
511         return self._rows_to_dict_tree(rows)
512
513     def load_config(self):
514         table = 'config'
515         columns = ['name', 'value']
516         return self._load_data(table, columns)
517
518     def load_options(self, table, name=None):
519         kvfilter = dict()
520         if name:
521             kvfilter['name'] = name
522         options = self._load_data(table, OPTIONS_TABLE, kvfilter)
523         if name and name in options:
524             return options[name]
525         return options
526
527     def save_options(self, table, name, options):
528         curvals = dict()
529         q = None
530         try:
531             q = self._query(self._db, table, OPTIONS_TABLE)
532             rows = q.select({'name': name}, ['option', 'value'])
533             for row in rows:
534                 curvals[row[0]] = row[1]
535
536             for opt in options:
537                 if opt in curvals:
538                     q.update({'value': options[opt]},
539                              {'name': name, 'option': opt})
540                 else:
541                     q.insert((name, opt, options[opt]))
542
543             q.commit()
544         except Exception, e:  # pylint: disable=broad-except
545             if q:
546                 q.rollback()
547             self.error("Failed to save options: [%s]" % e)
548             raise
549
550     def delete_options(self, table, name, options=None):
551         kvfilter = {'name': name}
552         q = None
553         try:
554             q = self._query(self._db, table, OPTIONS_TABLE)
555             if options is None:
556                 q.delete(kvfilter)
557             else:
558                 for opt in options:
559                     kvfilter['option'] = opt
560                     q.delete(kvfilter)
561             q.commit()
562         except Exception, e:  # pylint: disable=broad-except
563             if q:
564                 q.rollback()
565             self.error("Failed to delete from %s: [%s]" % (table, e))
566             raise
567
568     def new_unique_data(self, table, data):
569         newid = str(uuid.uuid4())
570         q = None
571         try:
572             q = self._query(self._db, table, UNIQUE_DATA_TABLE)
573             for name in data:
574                 q.insert((newid, name, data[name]))
575             q.commit()
576         except Exception, e:  # pylint: disable=broad-except
577             if q:
578                 q.rollback()
579             self.error("Failed to store %s data: [%s]" % (table, e))
580             raise
581         return newid
582
583     def get_unique_data(self, table, uuidval=None, name=None, value=None):
584         kvfilter = dict()
585         if uuidval:
586             kvfilter['uuid'] = uuidval
587         if name:
588             kvfilter['name'] = name
589         if value:
590             kvfilter['value'] = value
591         return self._load_data(table, UNIQUE_DATA_TABLE, kvfilter)
592
593     def save_unique_data(self, table, data):
594         q = None
595         try:
596             q = self._query(self._db, table, UNIQUE_DATA_TABLE)
597             for uid in data:
598                 curvals = dict()
599                 rows = q.select({'uuid': uid}, ['name', 'value'])
600                 for r in rows:
601                     curvals[r[0]] = r[1]
602
603                 datum = data[uid]
604                 for name in datum:
605                     if name in curvals:
606                         if datum[name] is None:
607                             q.delete({'uuid': uid, 'name': name})
608                         else:
609                             q.update({'value': datum[name]},
610                                      {'uuid': uid, 'name': name})
611                     else:
612                         if datum[name] is not None:
613                             q.insert((uid, name, datum[name]))
614
615             q.commit()
616         except Exception, e:  # pylint: disable=broad-except
617             if q:
618                 q.rollback()
619             self.error("Failed to store data in %s: [%s]" % (table, e))
620             raise
621
622     def del_unique_data(self, table, uuidval):
623         kvfilter = {'uuid': uuidval}
624         try:
625             q = self._query(self._db, table, UNIQUE_DATA_TABLE, trans=False)
626             q.delete(kvfilter)
627         except Exception, e:  # pylint: disable=broad-except
628             self.error("Failed to delete data from %s: [%s]" % (table, e))
629
630     def _reset_data(self, table):
631         q = None
632         try:
633             q = self._query(self._db, table, UNIQUE_DATA_TABLE)
634             q.drop()
635             q.create()
636             q.commit()
637         except Exception, e:  # pylint: disable=broad-except
638             if q:
639                 q.rollback()
640             self.error("Failed to erase all data from %s: [%s]" % (table, e))
641
642
643 class AdminStore(Store):
644     _should_cleanup = False
645
646     def __init__(self):
647         super(AdminStore, self).__init__('admin.config.db')
648
649     def get_data(self, plugin, idval=None, name=None, value=None):
650         return self.get_unique_data(plugin+"_data", idval, name, value)
651
652     def save_data(self, plugin, data):
653         return self.save_unique_data(plugin+"_data", data)
654
655     def new_datum(self, plugin, datum):
656         table = plugin+"_data"
657         return self.new_unique_data(table, datum)
658
659     def del_datum(self, plugin, idval):
660         table = plugin+"_data"
661         return self.del_unique_data(table, idval)
662
663     def wipe_data(self, plugin):
664         table = plugin+"_data"
665         self._reset_data(table)
666
667     def _initialize_schema(self):
668         for table in ['config',
669                       'info_config',
670                       'login_config',
671                       'provider_config']:
672             q = self._query(self._db, table, OPTIONS_TABLE, trans=False)
673             q.create()
674             q._con.close()  # pylint: disable=protected-access
675
676     def _upgrade_schema(self, old_version):
677         if old_version == 1:
678             # In schema version 2, we added indexes and primary keys
679             for table in ['config',
680                           'info_config',
681                           'login_config',
682                           'provider_config']:
683                 # pylint: disable=protected-access
684                 table = self._query(self._db, table, OPTIONS_TABLE,
685                                     trans=False)._table
686                 self._db.add_constraint(table.primary_key)
687                 for index in table.indexes:
688                     self._db.add_index(index)
689             return 2
690         else:
691             raise NotImplementedError()
692
693     def create_plugin_data_table(self, plugin_name):
694         if not self.is_readonly:
695             table = plugin_name+'_data'
696             q = self._query(self._db, table, UNIQUE_DATA_TABLE,
697                             trans=False)
698             q.create()
699             q._con.close()  # pylint: disable=protected-access
700
701
702 class UserStore(Store):
703     _should_cleanup = False
704
705     def __init__(self, path=None):
706         super(UserStore, self).__init__('user.prefs.db')
707
708     def save_user_preferences(self, user, options):
709         self.save_options('users', user, options)
710
711     def load_user_preferences(self, user):
712         return self.load_options('users', user)
713
714     def save_plugin_data(self, plugin, user, options):
715         self.save_options(plugin+"_data", user, options)
716
717     def load_plugin_data(self, plugin, user):
718         return self.load_options(plugin+"_data", user)
719
720     def _initialize_schema(self):
721         q = self._query(self._db, 'users', OPTIONS_TABLE, trans=False)
722         q.create()
723         q._con.close()  # pylint: disable=protected-access
724
725     def _upgrade_schema(self, old_version):
726         if old_version == 1:
727             # In schema version 2, we added indexes and primary keys
728             # pylint: disable=protected-access
729             table = self._query(self._db, 'users', OPTIONS_TABLE,
730                                 trans=False)._table
731             self._db.add_constraint(table.primary_key)
732             for index in table.indexes:
733                 self._db.add_index(index)
734             return 2
735         else:
736             raise NotImplementedError()
737
738
739 class TranStore(Store):
740
741     def __init__(self, path=None):
742         super(TranStore, self).__init__('transactions.db')
743
744     def _initialize_schema(self):
745         q = self._query(self._db, 'transactions', UNIQUE_DATA_TABLE,
746                         trans=False)
747         q.create()
748         q._con.close()  # pylint: disable=protected-access
749
750     def _upgrade_schema(self, old_version):
751         if old_version == 1:
752             # In schema version 2, we added indexes and primary keys
753             # pylint: disable=protected-access
754             table = self._query(self._db, 'transactions', UNIQUE_DATA_TABLE,
755                                 trans=False)._table
756             self._db.add_constraint(table.primary_key)
757             for index in table.indexes:
758                 self._db.add_index(index)
759             return 2
760         else:
761             raise NotImplementedError()
762
763
764 class SAML2SessionStore(Store):
765
766     def __init__(self, database_url):
767         super(SAML2SessionStore, self).__init__(database_url=database_url)
768         self.table = 'saml2_sessions'
769         # pylint: disable=protected-access
770         table = SqlQuery(self._db, self.table, UNIQUE_DATA_TABLE)._table
771         table.create(checkfirst=True)
772
773     def _get_unique_id_from_column(self, name, value):
774         """
775         The query is going to return only the column in the query.
776         Use this method to get the uuidval which can be used to fetch
777         the entire entry.
778
779         Returns None or the uuid of the first value found.
780         """
781         data = self.get_unique_data(self.table, name=name, value=value)
782         count = len(data)
783         if count == 0:
784             return None
785         elif count != 1:
786             raise ValueError("Multiple entries returned")
787         return data.keys()[0]
788
789     def _cleanup(self):
790         # pylint: disable=protected-access
791         table = SqlQuery(self._db, self.table, UNIQUE_DATA_TABLE)._table
792         sel = select([table.columns.uuid]). \
793             where(and_(table.c.name == 'expiration_time',
794                        table.c.value <= datetime.datetime.now()))
795         # pylint: disable=no-value-for-parameter
796         d = table.delete().where(table.c.uuid.in_(sel))
797         return d.execute().rowcount
798
799     def get_data(self, idval=None, name=None, value=None):
800         return self.get_unique_data(self.table, idval, name, value)
801
802     def new_session(self, datum):
803         if 'supported_logout_mechs' in datum:
804             datum['supported_logout_mechs'] = ','.join(
805                 datum['supported_logout_mechs']
806             )
807         return self.new_unique_data(self.table, datum)
808
809     def get_session(self, session_id=None, request_id=None):
810         if session_id:
811             uuidval = self._get_unique_id_from_column('session_id', session_id)
812         elif request_id:
813             uuidval = self._get_unique_id_from_column('request_id', request_id)
814         else:
815             raise ValueError("Unable to find session")
816         if not uuidval:
817             return None, None
818         data = self.get_unique_data(self.table, uuidval=uuidval)
819         return uuidval, data[uuidval]
820
821     def get_user_sessions(self, user):
822         """
823         Return a list of all sessions for a given user.
824         """
825         rows = self.get_unique_data(self.table, name='user', value=user)
826
827         # We have a list of sessions for this user, now get the details
828         logged_in = []
829         for r in rows:
830             data = self.get_unique_data(self.table, uuidval=r)
831             data[r]['supported_logout_mechs'] = data[r].get(
832                 'supported_logout_mechs', '').split(',')
833             logged_in.append(data)
834
835         return logged_in
836
837     def update_session(self, datum):
838         self.save_unique_data(self.table, datum)
839
840     def remove_session(self, uuidval):
841         self.del_unique_data(self.table, uuidval)
842
843     def wipe_data(self):
844         self._reset_data(self.table)
845
846     def _initialize_schema(self):
847         q = self._query(self._db, self.table, UNIQUE_DATA_TABLE,
848                         trans=False)
849         q.create()
850         q._con.close()  # pylint: disable=protected-access
851
852     def _upgrade_schema(self, old_version):
853         if old_version == 1:
854             # In schema version 2, we added indexes and primary keys
855             # pylint: disable=protected-access
856             table = self._query(self._db, self.table, UNIQUE_DATA_TABLE,
857                                 trans=False)._table
858             self._db.add_constraint(table.primary_key)
859             for index in table.indexes:
860                 self._db.add_index(index)
861             return 2
862         else:
863             raise NotImplementedError()