/usr/share/pyshared/ZEO/tests/registerDB.test is in python-zodb 1:3.10.5-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | Storage Servers should call registerDB on storages to propigate invalidations
=============================================================================
Storages servers propagate invalidations from their storages. Among
other things, this allows client storages to be used in storage
servers, allowing storage-server fan out, spreading read load over
multiple storage servers.
We'll create a Faux storage that has a registerDB method.
>>> class FauxStorage:
... invalidations = [('trans0', ['ob0']),
... ('trans1', ['ob0', 'ob1']),
... ]
... def registerDB(self, db):
... self.db = db
... def isReadOnly(self):
... return False
... def getName(self):
... return 'faux'
... def lastTransaction(self):
... return self.invq[0][0]
... def lastInvalidations(self, size):
... return list(self.invalidations)
We dont' want the storage server to try to bind to a socket. We'll
subclass it and give it a do-nothing dispatcher "class":
>>> import ZEO.StorageServer
>>> class StorageServer(ZEO.StorageServer.StorageServer):
... DispatcherClass = lambda *a, **k: None
We'll create a storage instance and a storage server using it:
>>> storage = FauxStorage()
>>> server = StorageServer('addr', dict(t=storage))
Our storage now has a db attribute that provides IStorageDB. It's
references method is just the referencesf function from ZODB.Serialize
>>> import ZODB.serialize
>>> storage.db.references is ZODB.serialize.referencesf
True
To see the effects of the invalidation messages, we'll create a client
stub that implements the client invalidation calls:
>>> class Client:
... def __init__(self, name):
... self.name = name
... def invalidateTransaction(self, tid, invalidated):
... print 'invalidateTransaction', tid, self.name
... print invalidated
>>> class Connection:
... def __init__(self, mgr, obj):
... self.mgr = mgr
... self.obj = obj
... def should_close(self):
... print 'closed', self.obj.name
... self.mgr.close_conn(self)
... def poll(self):
... pass
...
... @property
... def trigger(self):
... return self
...
... def pull_trigger(self):
... pass
>>> class ZEOStorage:
... def __init__(self, server, name):
... self.name = name
... self.connection = Connection(server, self)
... self.client = Client(name)
Now, we'll register the client with the storage server:
>>> _ = server.register_connection('t', ZEOStorage(server, 1))
>>> _ = server.register_connection('t', ZEOStorage(server, 2))
Now, if we call invalidate, we'll see it propigate to the client:
>>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
invalidateTransaction trans2 1
['ob1', 'ob2']
invalidateTransaction trans2 2
['ob1', 'ob2']
>>> storage.db.invalidate('trans3', ['ob1', 'ob2'])
invalidateTransaction trans3 1
['ob1', 'ob2']
invalidateTransaction trans3 2
['ob1', 'ob2']
The storage servers queue will reflect the invalidations:
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
'trans3' ['ob1', 'ob2']
'trans2' ['ob1', 'ob2']
'trans1' ['ob0', 'ob1']
'trans0' ['ob0']
If we call invalidateCache, the storage server will close each of it's
connections:
>>> storage.db.invalidateCache()
closed 1
closed 2
The connections will then reopen and revalidate their caches.
The servers's invalidation queue will get reset
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
'trans1' ['ob0', 'ob1']
'trans0' ['ob0']
|