Source code for anadama2.backends
# -*- coding: utf-8 -*-
import os
import sys
import json
import six
import leveldb
from .util import mkdirp
ENV_VAR = "ANADAMA_BACKEND_DIR"
LOCAL_DB_FOLDER = ".anadama"
_default_backend = None
[docs]def default(output_dir=None):
global _default_backend
if output_dir is not None:
return LevelDBBackend(
_try_dir(os.path.abspath(os.path.join(output_dir, LOCAL_DB_FOLDER, "db")))
)
if _default_backend is None:
_default_backend = LevelDBBackend()
return _default_backend
[docs]def discover_data_directory():
if ENV_VAR in os.environ:
return _try_dir(os.environ[ENV_VAR])
elif "HOME" in os.environ:
return _try_dir(os.path.join(os.environ['HOME'], ".config",
"anadama", "db"))
else:
return _fallback_datadir()
[docs]def auto(data_dir, *args, **kwargs):
"""Return the right type of backend for the given data_dir"""
# just one type of backend for now
return LevelDBBackend(data_dir, *args, **kwargs)
def _try_dir(maybe_datadir):
if os.path.isdir(maybe_datadir):
pass
else:
try:
mkdirp(maybe_datadir)
except Exception as e:
msg = six.u("Unable to create anadama "
"database directory `{}': \n"+str(e))
sys.stderr.write(msg.format(maybe_datadir))
fallback = _fallback_datadir()
sys.stderr.write(six.u("Using fallback directory: "+fallback+"\n"))
return fallback
return maybe_datadir
def _fallback_datadir():
try:
folder=os.path.join(LOCAL_DB_FOLDER,"db")
mkdirp(folder)
return os.path.abspath(folder)
except:
mkdirp("/tmp/anadama/db")
return "/tmp/anadama/db"
[docs]class BaseBackend(object):
def __init__(self, data_directory=None, autocreate=True):
self.data_directory = data_directory or discover_data_directory()
if autocreate and not self.exists():
self.create()
[docs] def lookup(self, dep):
raise NotImplementedError()
[docs] def lookup_many(self, deps):
raise NotImplementedError()
[docs] def save(self, dep_keys, dep_vals):
raise NotImplementedError()
[docs] def create(self):
raise NotImplementedError()
[docs] def exists(self):
raise NotImplementedError()
[docs] def keys(self):
raise NotImplementedError()
[docs] def delete(self, key):
raise NotImplementedError()
[docs] def delete_many(self, keys):
raise NotImplementedError()
[docs] def close(self):
raise NotImplementedError()
[docs]class LevelDBBackend(BaseBackend):
def __init__(self, *args, **kwargs):
self.db = None
super(LevelDBBackend, self).__init__(*args, **kwargs)
if not self.db:
self.db = leveldb.LevelDB(self.data_directory,
create_if_missing=False)
[docs] def exists(self):
return all([os.path.exists(os.path.join(self.data_directory, f))
for f in ("CURRENT", "LOCK", "LOG")])
[docs] def create(self):
self.db = leveldb.LevelDB(self.data_directory, create_if_missing=True,
error_if_exists=True)
def _get(self, key):
try:
val = self.db.Get(key.encode("utf-8"))
except KeyError:
return None
return json.loads(val.decode("utf-8"))
[docs] def lookup(self, dep):
return self._get(dep.name)
[docs] def lookup_many(self, deps):
return [self.lookup(dep) for dep in deps]
[docs] def save(self, dep_keys, dep_vals):
if not dep_keys:
return
batch = leveldb.WriteBatch()
for key, val in zip(dep_keys, dep_vals):
batch.Put(key.encode("utf-8"), json.dumps(val).encode("utf-8"))
self.db.Write(batch)
[docs] def keys(self):
return self.db.RangeIter(include_value=False)
[docs] def delete(self, key):
return self.db.Delete(key.encode("utf-8"))
[docs] def delete_many(self, keys):
batch = leveldb.WriteBatch()
for k in keys:
batch.Delete(k.encode("utf-8"))
self.db.Write(batch)
[docs] def close(self):
del self.db
self.db = None