/usr/share/pyshared/ZEO/tests/zeo-fan-out.test is in python-zodb 1:3.10.5-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | ZEO Fan Out
===========
We should be able to set up ZEO servers with ZEO clients. Let's see
if we can make it work.
We'll use some helper functions. The first is a helper that starts
ZEO servers for us and another one that picks ports.
We'll start the first server:
>>> (_, port0), adminaddr0 = start_server(
... '<filestorage>\npath fs\nblob-dir blobs\n</filestorage>', keep=1)
Then we'll start 2 others that use this one:
>>> addr1, _ = start_server(
... '<zeoclient>\nserver %s\nblob-dir b1\n</zeoclient>' % port0)
>>> addr2, _ = start_server(
... '<zeoclient>\nserver %s\nblob-dir b2\n</zeoclient>' % port0)
Now, let's create some client storages that connect to these:
>>> import os, ZEO, ZODB.blob, ZODB.POSException, transaction
>>> db0 = ZEO.DB(port0, blob_dir='cb0')
>>> db1 = ZEO.DB(addr1, blob_dir='cb1')
>>> tm1 = transaction.TransactionManager()
>>> c1 = db1.open(transaction_manager=tm1)
>>> r1 = c1.root()
>>> r1
{}
>>> db2 = ZEO.DB(addr2, blob_dir='cb2')
>>> tm2 = transaction.TransactionManager()
>>> c2 = db2.open(transaction_manager=tm2)
>>> r2 = c2.root()
>>> r2
{}
If we update c1, we'll eventually see the change in c2:
>>> import persistent.mapping
>>> r1[1] = persistent.mapping.PersistentMapping()
>>> r1[1].v = 1000
>>> r1[2] = persistent.mapping.PersistentMapping()
>>> r1[2].v = -1000
>>> r1[3] = ZODB.blob.Blob('x'*4111222)
>>> for i in range(1000, 2000):
... r1[i] = persistent.mapping.PersistentMapping()
... r1[i].v = 0
>>> tm1.commit()
>>> blob_id = r1[3]._p_oid, r1[1]._p_serial
>>> import time
>>> for i in range(100):
... t = tm2.begin()
... if 1 in r2:
... break
... time.sleep(0.01)
>>> tm2.abort()
>>> r2[1].v
1000
>>> r2[2].v
-1000
Now, let's see if we can break it. :)
>>> def f():
... c = db1.open(transaction.TransactionManager())
... r = c.root()
... i = 0
... while i < 100:
... r[1].v -= 1
... r[2].v += 1
... try:
... c.transaction_manager.commit()
... i += 1
... except ZODB.POSException.ConflictError:
... c.transaction_manager.abort()
... c.close()
>>> import threading
>>> threadf = threading.Thread(target=f)
>>> threadg = threading.Thread(target=f)
>>> threadf.start()
>>> threadg.start()
>>> s2 = db2.storage
>>> start_time = time.time()
>>> while time.time() - start_time < 999:
... t = tm2.begin()
... if r2[1].v + r2[2].v:
... print 'oops', r2[1], r2[2]
... if r2[1].v == 800:
... break # we caught up
... path = s2.fshelper.getBlobFilename(*blob_id)
... if os.path.exists(path):
... ZODB.blob.remove_committed(path)
... s2._server.sendBlob(*blob_id)
... else: print 'Dang'
>>> threadf.join()
>>> threadg.join()
If we shutdown and restart the source server, the variables will be
invalidated:
>>> stop_server(adminaddr0)
>>> _ = start_server('<filestorage 1>\npath fs\n</filestorage>\n',
... port=port0)
>>> for i in range(1000):
... c1.sync()
... c2.sync()
... if (
... (r1[1]._p_changed is None)
... and
... (r1[2]._p_changed is None)
... and
... (r2[1]._p_changed is None)
... and
... (r2[2]._p_changed is None)
... ):
... print 'Cool'
... break
... time.sleep(0.01)
... else:
... print 'Dang'
Cool
Cleanup:
>>> db0.close()
>>> db1.close()
>>> db2.close()
|