diff --git a/mr_db.py b/mr_db.py index 18ae13e1a72f6e0c4dd6df9bb1a81742307fd7ec..c83b67dfddfbb8b7ddc2eb93d343ee8a9e360082 100755 --- a/mr_db.py +++ b/mr_db.py @@ -1,14 +1,38 @@ #!/usr/bin/env python - +import os +from os.path import expanduser import time +import logging +import atexit +import signal from uuid import uuid4, UUID +import psutil +from psutil import Popen import ZODB, ZODB.FileStorage +from ZEO import ClientStorage from BTrees.OOBTree import OOBTree from BTrees.IOBTree import IOBTree import transaction from persistent import Persistent from persistent.mapping import PersistentMapping +from persistent.list import PersistentList + +root_logger = logging.getLogger("mr_db") +root_logger.setLevel(level=logging.INFO) +log_handler = logging.StreamHandler() +log_formatter = logging.Formatter( + fmt='%(asctime)s [%(name)s](%(levelname)s) %(message)s', + datefmt='%H:%M:%S' + ) +log_handler.setFormatter(log_formatter) +root_logger.addHandler(log_handler) +logging.getLogger('ZODB').addHandler(log_handler) +logging.getLogger('ZEO').addHandler(log_handler) +logger = logging.getLogger(__name__) + +package_directory = os.path.dirname(os.path.abspath(__file__)) +conf_file = os.path.join(package_directory, 'server_conf.xml') class InputError(Exception): """Exception raised for errors in the input. @@ -33,7 +57,9 @@ class KeyExistsError(InputError): class Patient(Persistent): """class for patient data storage. Each application should write data as - experiment entries, defining their own data storage classes. + experiment entries, defining their own data storage objects. N.B. if + classes are used to store data, class definitions must be available when + retrieving object. ---- Arguments: pid: patient id (should be the same as db key)---If not provided, @@ -47,54 +73,112 @@ class Patient(Persistent): self.metadata = PersistentMapping(kwargs) self.experiments = OOBTree() - def add_experiment(self, id=None, timestamp=None, **kwargs): - """Add a new experiment. Will raise KeyExistsError if id is already - in db to avoid unintended collisions. - ---- - Arguments: - id: experiment id---UUID will be generated if not supplied - timestamp: current time---Will be generated if not supplied - kwargs: additional keyword arguments that will be saved in experiment. + def link_experiment(self, db, uuid): + """Hard links an experiment with uuid from db. db should be a OOBTree + or PersistentMapping already in the ZODB. """ - if id is None: - id = uuid4() - - if timestamp is None: - timestamp = time.time() - - if id in self.experiments: - raise InputError(id, - "Experiment ID already exists. Access directly to update") - - kwargs.update({'id':id, 'timestamp':timestamp}) - - self.experiments[id.int] = PersistentMapping(kwargs) - - return self.experiments[id.int] - + self.experiments[uuid] = db[uuid] + class DbConnection(object): """Class for DB connection. Multiple threads/processes can access db simultaneously, but each must have its own connection. Must call commit - method to save data to database. + method to save data to database. Throws ClientStorage.ClientDisconnected + if connection can't be made. ---- Arguments: - database: path to database file + port: port to connect to """ - def __init__(self, database='data.fs'): - storage = ZODB.FileStorage.FileStorage(database) - db = ZODB.DB(storage) - connection = db.open() - self.root = connection.root() + def __init__(self, port=9998, data_path=expanduser('~/.mr_db')): + addr = ('localhost', port) + + full_path = os.path.abspath("%s/blob" % data_path) + if not os.path.exists(full_path): + os.makedirs(full_path) + + self.storage = ClientStorage.ClientStorage(addr, + blob_dir="%s/blob" % data_path, + shared_blob_dir=True, + client_label='dstat-interface', + max_disconnect_poll=4, + wait=False, + wait_timeout=5 + ) + + self.db = ZODB.DB(self.storage) + self.connection = self.db.open() + self.databases = self.connection.root() - if not self.root.has_key('patientdb'): - self.root['patientdb'] = OOBTree() + if not self.databases.has_key('patients'): + self.databases['patients'] = OOBTree() - self.patientdb = self.root['patientdb'] + self.patients = self.databases['patients'] def commit(self): transaction.commit() def abort(self): transaction.abort() - \ No newline at end of file + +def start_server(root_dir=None, port=9998, + conf_file=conf_file): + + if root_dir is None: + root_dir = expanduser('~/.mr_db') + data_dir = "%s/data" % root_dir + log_dir = "%s/logs" % root_dir + pid_file = "%s/zeo.pid" % root_dir + + full_path = os.path.abspath(data_dir) + if not os.path.exists(full_path): + os.makedirs(full_path) + + full_path = os.path.abspath(log_dir) + if not os.path.exists(full_path): + os.makedirs(full_path) + + global pid_f + pid_f = pid_file + + if os.path.isfile(pid_file): + try: + with open(pid_file, 'r') as f: + pid = int(f.readline().strip()) + + if psutil.pid_exists(pid): + p = psutil.Process(pid) + p.terminate() + p.wait(timeout=3) + if p.is_running(): + p.kill() + except ValueError: + pass + + global logfile + logfile = open('%s/zeo.log' % log_dir, 'w') + + global process + process = Popen(['python', '-m', 'ZEO.runzeo', '-C', conf_file], + env=dict(os.environ, PORT=str(port), DATA_DIR=data_dir), + stdout=logfile, + stderr=logfile + ) + + with open(pid_file, 'w') as f: + f.write(str(process.pid)) + + atexit.register(stop_server) # Make sure server is cleaned up on quit + + return process + +def stop_server(): + if process is not None: + if process.is_running(): + process.terminate() + process.wait(timeout=3) + if process.is_running(): + process.kill() + process.wait() + +if __name__ == '__main__': + start_server() \ No newline at end of file diff --git a/server_conf.xml b/server_conf.xml new file mode 100644 index 0000000000000000000000000000000000000000..4ebcedbfa62cdc1444d852a3cfdf29df86e56276 --- /dev/null +++ b/server_conf.xml @@ -0,0 +1,8 @@ + + address localhost:$(PORT) + + + + path $(DATA_DIR)/data.fs + blob-dir $(DATA_DIR)/blob + \ No newline at end of file