Also create plugin UserStore data tables
[cascardo/ipsilon.git] / ipsilon / util / data.py
index e92aae4..65bf4b5 100644 (file)
@@ -6,12 +6,14 @@ from ipsilon.util.log import Log
 from sqlalchemy import create_engine
 from sqlalchemy import MetaData, Table, Column, Text
 from sqlalchemy.pool import QueuePool, SingletonThreadPool
-from sqlalchemy.schema import PrimaryKeyConstraint, Index
+from sqlalchemy.schema import (PrimaryKeyConstraint, Index, AddConstraint,
+                               CreateIndex)
 from sqlalchemy.sql import select, and_
 import ConfigParser
 import os
 import uuid
 import logging
+import time
 
 
 CURRENT_SCHEMA_VERSION = 2
@@ -29,7 +31,16 @@ class DatabaseError(Exception):
     pass
 
 
-class SqlStore(Log):
+class BaseStore(Log):
+    # Some helper functions used for upgrades
+    def add_constraint(self, table):
+        raise NotImplementedError()
+
+    def add_index(self, index):
+        raise NotImplementedError()
+
+
+class SqlStore(BaseStore):
     __instances = {}
 
     @classmethod
@@ -58,9 +69,24 @@ class SqlStore(Log):
             # It's not possible to share connections for SQLite between
             #  threads, so let's use the SingletonThreadPool for them
             pool_args = {'poolclass': SingletonThreadPool}
-        self._dbengine = create_engine(engine_name, **pool_args)
+        self._dbengine = create_engine(engine_name,
+                                       echo=cherrypy.config.get('db.echo',
+                                                                False),
+                                       **pool_args)
         self.is_readonly = False
 
+    def add_constraint(self, constraint):
+        if self._dbengine.dialect.name != 'sqlite':
+            # It is impossible to add constraints to a pre-existing table for
+            #  SQLite
+            # source: http://www.sqlite.org/omitted.html
+            create_constraint = AddConstraint(constraint, bind=self._dbengine)
+            create_constraint.execute()
+
+    def add_index(self, index):
+        add_index = CreateIndex(index, bind=self._dbengine)
+        add_index.execute()
+
     def debug(self, fact):
         if self.db_conn_log:
             super(SqlStore, self).debug(fact)
@@ -151,7 +177,7 @@ class SqlQuery(Log):
         self._con.execute(self._table.delete(self._where(kvfilter)))
 
 
-class FileStore(Log):
+class FileStore(BaseStore):
 
     def __init__(self, name):
         self._filename = name
@@ -174,6 +200,12 @@ class FileStore(Log):
             self._config.read(self._filename)
         return self._config
 
+    def add_constraint(self, table):
+        raise NotImplementedError()
+
+    def add_index(self, index):
+        raise NotImplementedError()
+
 
 class FileQuery(Log):
 
@@ -269,7 +301,13 @@ class FileQuery(Log):
 
 
 class Store(Log):
+    # Static, Store-level variables
     _is_upgrade = False
+    __cleanups = {}
+
+    # Static, class-level variables
+    # Either set this to False, or implement _cleanup, in child classes
+    _should_cleanup = True
 
     def __init__(self, config_name=None, database_url=None):
         if config_name is None and database_url is None:
@@ -290,6 +328,60 @@ class Store(Log):
 
         if not self._is_upgrade:
             self._check_database()
+            if self._should_cleanup:
+                self._schedule_cleanup()
+
+    def _schedule_cleanup(self):
+        store_name = self.__class__.__name__
+        if self.is_readonly:
+            # No use in cleanups on a readonly database
+            self.debug('Not scheduling cleanup for %s due to readonly' %
+                       store_name)
+            return
+        if store_name in Store.__cleanups:
+            # This class was already scheduled, skip
+            return
+        self.debug('Scheduling cleanups for %s' % store_name)
+        # Check once every minute whether we need to clean
+        task = cherrypy.process.plugins.BackgroundTask(
+            60, self._maybe_run_cleanup)
+        task.start()
+        Store.__cleanups[store_name] = task
+
+    def _maybe_run_cleanup(self):
+        # Let's see if we need to do cleanup
+        last_clean = self.load_options('dbinfo').get('%s_last_clean' %
+                                                     self.__class__.__name__,
+                                                     {})
+        time_diff = cherrypy.config.get('cleanup_interval', 30) * 60
+        next_ts = int(time.time()) - time_diff
+        self.debug('Considering cleanup for %s: %s. Next at: %s'
+                   % (self.__class__.__name__, last_clean, next_ts))
+        if ('timestamp' not in last_clean or
+                int(last_clean['timestamp']) <= next_ts):
+            # First store the current time so that other servers don't start
+            self.save_options('dbinfo', '%s_last_clean'
+                              % self.__class__.__name__,
+                              {'timestamp': int(time.time()),
+                               'removed_entries': -1})
+
+            # Cleanup has been long enough ago, let's run
+            self.debug('Cleaning up for %s' % self.__class__.__name__)
+            removed_entries = self._cleanup()
+            self.debug('Cleaned up %i entries for %s' %
+                       (removed_entries, self.__class__.__name__))
+            self.save_options('dbinfo', '%s_last_clean'
+                              % self.__class__.__name__,
+                              {'timestamp': int(time.time()),
+                               'removed_entries': removed_entries})
+
+    def _cleanup(self):
+        # The default cleanup is to do nothing
+        # This function should return the number of rows it cleaned up.
+        # This information may be used to automatically tune the clean period.
+        self.error('Cleanup for %s not implemented' %
+                   self.__class__.__name__)
+        return 0
 
     def _code_schema_version(self):
         # This function makes it possible for separate plugins to have
@@ -303,6 +395,7 @@ class Store(Log):
         #  the main codebase, and even in the same database.
         q = self._query(self._db, 'dbinfo', OPTIONS_TABLE, trans=False)
         q.create()
+        q._con.close()  # pylint: disable=protected-access
         cls_name = self.__class__.__name__
         current_version = self.load_options('dbinfo').get('%s_schema'
                                                           % cls_name, {})
@@ -314,7 +407,8 @@ class Store(Log):
             fallback_version = self.load_options('dbinfo').get('scheme',
                                                                {})
             if 'version' in fallback_version:
-                return int(fallback_version['version'])
+                # Explanation for this is in def upgrade_database(self)
+                return -1
             else:
                 return None
 
@@ -350,6 +444,7 @@ class Store(Log):
         #  themselves.
         # They might implement downgrading if that's feasible, or just throw
         #  NotImplementedError
+        # Should return the new schema version
         raise NotImplementedError()
 
     def upgrade_database(self):
@@ -359,10 +454,27 @@ class Store(Log):
             # Just initialize a new schema
             self._initialize_schema()
             self._store_new_schema_version(self._code_schema_version())
+        elif old_schema_version == -1:
+            # This is a special-case from 1.0: we only created tables at the
+            # first time they were actually used, but the upgrade code assumes
+            # that the tables exist. So let's fix this.
+            self._initialize_schema()
+            # The old version was schema version 1
+            self._store_new_schema_version(1)
+            self.upgrade_database()
         elif old_schema_version != self._code_schema_version():
             # Upgrade from old_schema_version to code_schema_version
-            self._upgrade_schema(old_schema_version)
-            self._store_new_schema_version(self._code_schema_version())
+            self.debug('Upgrading from schema version %i' % old_schema_version)
+            new_version = self._upgrade_schema(old_schema_version)
+            if not new_version:
+                error = ('Schema upgrade error: %s did not provide a ' +
+                         'new schema version number!' %
+                         self.__class__.__name__)
+                self.error(error)
+                raise Exception(error)
+            self._store_new_schema_version(new_version)
+            # Check if we are now up-to-date
+            self.upgrade_database()
 
     @property
     def is_readonly(self):
@@ -398,7 +510,8 @@ class Store(Log):
             q = self._query(self._db, table, columns, trans=False)
             rows = q.select(kvfilter)
         except Exception, e:  # pylint: disable=broad-except
-            self.error("Failed to load data for table %s: [%s]" % (table, e))
+            self.error("Failed to load data for table %s for store %s: [%s]"
+                       % (table, self.__class__.__name__, e))
         return self._rows_to_dict_tree(rows)
 
     def load_config(self):
@@ -532,6 +645,7 @@ class Store(Log):
 
 
 class AdminStore(Store):
+    _should_cleanup = False
 
     def __init__(self):
         super(AdminStore, self).__init__('admin.config.db')
@@ -561,12 +675,36 @@ class AdminStore(Store):
                       'provider_config']:
             q = self._query(self._db, table, OPTIONS_TABLE, trans=False)
             q.create()
+            q._con.close()  # pylint: disable=protected-access
 
     def _upgrade_schema(self, old_version):
-        raise NotImplementedError()
+        if old_version == 1:
+            # In schema version 2, we added indexes and primary keys
+            for table in ['config',
+                          'info_config',
+                          'login_config',
+                          'provider_config']:
+                # pylint: disable=protected-access
+                table = self._query(self._db, table, OPTIONS_TABLE,
+                                    trans=False)._table
+                self._db.add_constraint(table.primary_key)
+                for index in table.indexes:
+                    self._db.add_index(index)
+            return 2
+        else:
+            raise NotImplementedError()
+
+    def create_plugin_data_table(self, plugin_name):
+        if not self.is_readonly:
+            table = plugin_name+'_data'
+            q = self._query(self._db, table, UNIQUE_DATA_TABLE,
+                            trans=False)
+            q.create()
+            q._con.close()  # pylint: disable=protected-access
 
 
 class UserStore(Store):
+    _should_cleanup = False
 
     def __init__(self, path=None):
         super(UserStore, self).__init__('user.prefs.db')
@@ -586,23 +724,65 @@ class UserStore(Store):
     def _initialize_schema(self):
         q = self._query(self._db, 'users', OPTIONS_TABLE, trans=False)
         q.create()
+        q._con.close()  # pylint: disable=protected-access
 
     def _upgrade_schema(self, old_version):
-        raise NotImplementedError()
+        if old_version == 1:
+            # In schema version 2, we added indexes and primary keys
+            # pylint: disable=protected-access
+            table = self._query(self._db, 'users', OPTIONS_TABLE,
+                                trans=False)._table
+            self._db.add_constraint(table.primary_key)
+            for index in table.indexes:
+                self._db.add_index(index)
+            return 2
+        else:
+            raise NotImplementedError()
+
+    def create_plugin_data_table(self, plugin_name):
+        if not self.is_readonly:
+            table = plugin_name+'_data'
+            q = self._query(self._db, table, OPTIONS_TABLE,
+                            trans=False)
+            q.create()
+            q._con.close()  # pylint: disable=protected-access
 
 
 class TranStore(Store):
 
     def __init__(self, path=None):
         super(TranStore, self).__init__('transactions.db')
+        self.table = 'transactions'
 
     def _initialize_schema(self):
-        q = self._query(self._db, 'transactions', UNIQUE_DATA_TABLE,
+        q = self._query(self._db, self.table, UNIQUE_DATA_TABLE,
                         trans=False)
         q.create()
+        q._con.close()  # pylint: disable=protected-access
 
     def _upgrade_schema(self, old_version):
-        raise NotImplementedError()
+        if old_version == 1:
+            # In schema version 2, we added indexes and primary keys
+            # pylint: disable=protected-access
+            table = self._query(self._db, self.table, UNIQUE_DATA_TABLE,
+                                trans=False)._table
+            self._db.add_constraint(table.primary_key)
+            for index in table.indexes:
+                self._db.add_index(index)
+            return 2
+        else:
+            raise NotImplementedError()
+
+    def _cleanup(self):
+        # pylint: disable=protected-access
+        table = SqlQuery(self._db, self.table, UNIQUE_DATA_TABLE)._table
+        in_one_hour = datetime.datetime.now() - datetime.timedelta(hours=1)
+        sel = select([table.columns.uuid]). \
+            where(and_(table.c.name == 'origintime',
+                       table.c.value <= in_one_hour))
+        # pylint: disable=no-value-for-parameter
+        d = table.delete().where(table.c.uuid.in_(sel))
+        return d.execute().rowcount
 
 
 class SAML2SessionStore(Store):
@@ -630,7 +810,7 @@ class SAML2SessionStore(Store):
             raise ValueError("Multiple entries returned")
         return data.keys()[0]
 
-    def remove_expired_sessions(self):
+    def _cleanup(self):
         # pylint: disable=protected-access
         table = SqlQuery(self._db, self.table, UNIQUE_DATA_TABLE)._table
         sel = select([table.columns.uuid]). \
@@ -638,7 +818,7 @@ class SAML2SessionStore(Store):
                        table.c.value <= datetime.datetime.now()))
         # pylint: disable=no-value-for-parameter
         d = table.delete().where(table.c.uuid.in_(sel))
-        d.execute()
+        return d.execute().rowcount
 
     def get_data(self, idval=None, name=None, value=None):
         return self.get_unique_data(self.table, idval, name, value)
@@ -691,6 +871,17 @@ class SAML2SessionStore(Store):
         q = self._query(self._db, self.table, UNIQUE_DATA_TABLE,
                         trans=False)
         q.create()
+        q._con.close()  # pylint: disable=protected-access
 
     def _upgrade_schema(self, old_version):
-        raise NotImplementedError()
+        if old_version == 1:
+            # In schema version 2, we added indexes and primary keys
+            # pylint: disable=protected-access
+            table = self._query(self._db, self.table, UNIQUE_DATA_TABLE,
+                                trans=False)._table
+            self._db.add_constraint(table.primary_key)
+            for index in table.indexes:
+                self._db.add_index(index)
+            return 2
+        else:
+            raise NotImplementedError()