Browse Source

Merge branch 'replicator' of CoBox/cobox-group into development

kyphae 2 years ago
parent
commit
a8000373e5
4 changed files with 92 additions and 218 deletions
  1. 31
    89
      index.js
  2. 9
    0
      lib/level.js
  3. 2
    1
      package.json
  4. 50
    128
      test/index.test.js

+ 31
- 89
index.js View File

@@ -4,71 +4,46 @@ const swarm = require('cobox-swarm')
4 4
 const mount = require('kappa-drive-mount')
5 5
 const Drive = require('kappa-drive')
6 6
 const Query = require('kappa-view-query')
7
-const mkdirp = require('mkdirp')
8 7
 const fs = require('fs')
9 8
 const path = require('path')
10
-const level = require('level')
11
-const memdb = require('memdb')
12
-const { EventEmitter } = require('events')
13 9
 const thunky = require('thunky')
14 10
 const debug = require('debug')('cobox-group')
15
-const liveStream = require('level-live-stream')
16
-const Base = require('cobox-group-base')
11
+const { Replicator } = require('cobox-replicator')
17 12
 const assert = require('assert')
13
+const constants = require('cobox-constants')
18 14
 
15
+const { setupLevel } = require('cobox-replicator/lib/level')
16
+const { setupLiveStream } = require('./lib/level')
19 17
 const query = require('./query')
20 18
 
21 19
 const LOG_ID = 3
22 20
 
23
-module.exports = (storage, address, opts) => new Group(storage, address, opts)
24
-
25
-class Group extends Base {
21
+class Group extends Replicator {
26 22
   constructor (storage, address, opts = {}) {
27 23
     super(storage, address, opts)
28 24
 
29 25
     this.encryptionKey = crypto.encryptionKey(opts.encryptionKey)
30
-    if (!crypto.isKey(this.encryptionKey)) throw new Error('invalid: encryption key format')
26
+    assert(crypto.isKey(this.encryptionKey), 'invalid: encryption_key format')
31 27
 
32 28
     // this.encryptionKey = opts.encryptionKey
33 29
     //   ? crypto.encryptionKey(opts.encryptionKey)
34 30
     //   : this._loadEncryptionKey() || crypto.encryptionKey()
35 31
     // assert(this._saveEncryptionKey(), 'Problem writing encryptionKey')
36 32
 
37
-    this.deriveKeyPair = crypto.keyPair.bind(null, this.masterKey)
38
-
39 33
     var groupOpts = this.config.groups.get(this.address)
40 34
     this.name = opts.name || (groupOpts && groupOpts.name)
41 35
 
42 36
     this.db = {}
37
+    this.deriveKeyPair = crypto.keyPair.bind(null, this.masterKey)
43 38
 
44
-    this._isMounted = false
45 39
     this.location = null
46
-    this._mountCallback = noop
47
-    this._connection = null
48
-
40
+    this._unmount = async () => {}
49 41
     this._readyCallback = thunky(this._ready.bind(this))
50 42
   }
51 43
 
52
-  swarm (opts = {}) {
53
-    this._connection = swarm(this, Object.assign(opts, {
54
-      logger: this.config.logger
55
-    }))
56
-    this._isSwarming = true
57
-    return true
58
-  }
59
-
60
-  unswarm () {
61
-    assert(this._isSwarming, 'already swarming')
62
-    this._connection.leave(this.discoveryKey)
63
-    this._connection.destroy()
64
-    this._connection = null
65
-    this._isSwarming = false
66
-    return true
67
-  }
68
-
69 44
   async mount (opts = {}) {
70 45
     assert(opts.location, 'provide a mount location')
71
-    this._mountCallback = await mount(this.drive, opts.location, opts)
46
+    this._unmount = await mount(this.drive, opts.location, opts)
72 47
     this._isMounted = true
73 48
     this.location = opts.location
74 49
     return this.location
@@ -76,17 +51,17 @@ class Group extends Base {
76 51
 
77 52
   async unmount () {
78 53
     assert(this._isMounted, 'not mounted')
79
-    await this._mountCallback()
54
+    await this._unmount()
80 55
     this._isMounted = false
81
-    this._mountCallback = noop
56
+    this._unmount = async () => {}
82 57
     var location = this.location
83 58
     this.location = null
84 59
     return location
85 60
   }
86 61
 
87 62
   async ls (subdir = '/') {
88
-    assert(typeof subdir === 'string', 'Path must be a string')
89
-    assert(this.drive._isReady, 'Drive must be ready')
63
+    assert(typeof subdir === 'string', 'path must be a string')
64
+    assert(this.drive._isReady, 'drive must be ready')
90 65
 
91 66
     const files = await new Promise((resolve, reject) => {
92 67
       this.drive.readdir(subdir, (err, files) => {
@@ -132,28 +107,6 @@ class Group extends Base {
132 107
     }
133 108
   }
134 109
 
135
-  _addConnection (peer, callback) {
136
-    debug('group', this.address.toString('hex'), 'add-connection', peer)
137
-
138
-    var payload = query.connection.schema.encode({
139
-      connected: true,
140
-      timestamp: Date.now()
141
-    })
142
-
143
-    this.db.network.put(peer, payload, callback)
144
-  }
145
-
146
-  _dropConnection (peer, callback) {
147
-    debug('group', this.address.toString('hex'), 'drop-connection', peer)
148
-
149
-    var payload = query.connection.schema.encode({
150
-      connected: false,
151
-      timestamp: Date.now()
152
-    })
153
-
154
-    this.db.network.put(peer, payload, callback)
155
-  }
156
-
157 110
   // ----------------------------------------------------------------------------- //
158 111
 
159 112
   _ready (callback) {
@@ -163,32 +116,30 @@ class Group extends Base {
163 116
       }, this._opts))
164 117
     })
165 118
 
119
+    this.db.connections = setupLiveStream(this.db.connections)
166 120
     this.core = kappa(this.storage, { multifeed: this.multifeed })
167 121
     // this.core.use('logs', Query(setupLevel(path.join(this.path, 'views', 'log')), query.log))
168 122
     // this.core.use('state', Query(setupLevel(path.join(this.path, 'views', 'state')), query.state))
169 123
     // this.logs = this.core.api.logs
170 124
     // this.state = this.core.api.state
171 125
 
172
-    this.drive = Drive(this.storage, this.address, {
173
-      core: this.core,
174
-      db: setupLevel(path.join(this.path, 'views', 'drive')),
175
-      logger: this.config.logger ? this.config.logger('kappa-drive') : null,
176
-      keyPair: this.deriveKeyPair
177
-    })
126
+    setupLevel(path.join(this.path, 'views', 'drive'), (err, db) => {
127
+      this.drive = Drive(this.storage, this.address, {
128
+        core: this.core,
129
+        db,
130
+        logger: this.config.logger ? this.config.logger('kappa-drive') : null,
131
+        keyPair: this.deriveKeyPair
132
+      })
178 133
 
179
-    this.db.network = setupLiveStream(setupLevel(path.join(this.path, 'network')))
180
-
181
-    this.drive.ready((err) => {
182
-      if (err) return callback(err)
183
-      // const keypair = this.deriveKeyPair(LOG_ID, this.address)
184
-      // this.multifeed.writer('log', { keypair }, (err, feed) => {
185
-      //   if (err) return callback(err)
186
-      //   this.log = feed
187
-      this.core.ready(() => {
188
-        this.discoveryKey = this.multifeed._root.discoveryKey
189
-        callback()
134
+      this.drive.ready((err) => {
135
+        if (err) return callback(err)
136
+        // const keypair = this.deriveKeyPair(LOG_ID, this.address)
137
+        // this.multifeed.writer('log', { keypair }, (err, feed) => {
138
+        //   if (err) return callback(err)
139
+        //   this.log = feed
140
+        this.core.ready(callback)
141
+        // })
190 142
       })
191
-      // })
192 143
     })
193 144
   }
194 145
 
@@ -217,14 +168,5 @@ class Group extends Base {
217 168
   // }
218 169
 }
219 170
 
220
-function setupLevel (levelPath) {
221
-  mkdirp.sync(levelPath)
222
-  return level(levelPath)
223
-}
224
-
225
-function setupLiveStream (db) {
226
-  liveStream.install(db)
227
-  return db
228
-}
229
-
230
-async function noop () {}
171
+module.exports = (storage, address, opts) => new Group(storage, address, opts)
172
+module.exports.Group = Group

+ 9
- 0
lib/level.js View File

@@ -0,0 +1,9 @@
1
+const liveStream = require('level-live-stream')
2
+
3
+function setupLiveStream (db) {
4
+  if (!db) return
5
+  liveStream.install(db)
6
+  return db
7
+}
8
+
9
+module.exports = { setupLiveStream }

+ 2
- 1
package.json View File

@@ -8,7 +8,7 @@
8 8
     "cobox-config": "^2.1.0",
9 9
     "cobox-constants": "^1.0.0",
10 10
     "cobox-crypto": "^1.2.0",
11
-    "cobox-group-base": "^1.0.0",
11
+    "cobox-replicator": "^1.0.0",
12 12
     "cobox-swarm": "^1.0.0",
13 13
     "kappa-core": "^6.0.0",
14 14
     "kappa-drive": "^1.2.0",
@@ -24,6 +24,7 @@
24 24
     "collect-stream": "^1.2.1",
25 25
     "nyc": "^14.1.1",
26 26
     "random-access-memory": "^3.1.1",
27
+    "random-words": "^1.1.0",
27 28
     "rimraf": "^3.0.0",
28 29
     "tap-spec": "^5.0.0",
29 30
     "tape": "^4.11.0",

+ 50
- 128
test/index.test.js View File

@@ -6,6 +6,7 @@ const Config = require('cobox-config')
6 6
 const path = require('path')
7 7
 const fs = require('fs')
8 8
 const mkdirp = require('mkdirp')
9
+const randomWords = require('random-words')
9 10
 
10 11
 const Group = require('../')
11 12
 const query = require('../query')
@@ -16,8 +17,11 @@ describe('basic', (context) => {
16 17
   context('create a group', (assert, next) => {
17 18
     var storage = tmp()
18 19
     var address = crypto.address().toString('hex')
19
-    var group = Group(storage, address)
20
+    var name = randomWords(1).pop()
21
+    var group = Group(storage, address, { name })
20 22
     assert.ok(group, 'group created')
23
+    assert.same(group.name, name, 'group has a name')
24
+    assert.ok(group.encryptionKey, 'group has an encryption key')
21 25
     cleanup(storage, next)
22 26
   })
23 27
 
@@ -36,7 +40,7 @@ describe('basic', (context) => {
36 40
     var storage = tmp()
37 41
     var address = crypto.address().toString('hex')
38 42
     var encryptionKey = crypto.encryptionKey().toString('hex')
39
-    var group = Group(storage, address, { encryptionKey })
43
+    var group = Group(storage, address, { encryptionKey, name: randomWords(1).pop() })
40 44
     assert.ok(group, 'group loaded')
41 45
     assert.ok(group.address, address, 'same address')
42 46
     assert.ok(group.encryptionKey, encryptionKey, 'same encryptionKey')
@@ -44,10 +48,11 @@ describe('basic', (context) => {
44 48
   })
45 49
 
46 50
   // context('Group stores encryption key', (assert, next) => {
51
+  //   const storage = tmp()
47 52
   //   const config = Config(storage)
48 53
   //   const address = crypto.address()
49 54
   //   const encryptionKey = crypto.encryptionKey()
50
-  //   const group = Group(storage, address, { config, encryptionKey })
55
+  //   const group = Group(storage, address, { config, encryptionKey, name: randomWords(1).pop() })
51 56
   //   assert.ok(group, 'group loaded')
52 57
   //   const encryptionKeyPath = path.join(config.root, 'groups', address.toString('hex'), 'encryption_key')
53 58
   //   assert.equal(
@@ -59,13 +64,14 @@ describe('basic', (context) => {
59 64
   // })
60 65
   //
61 66
   // context('Group loads encryption key', (assert, next) => {
67
+  //   const storage = tmp()
62 68
   //   const config = Config(storage)
63 69
   //   const address = crypto.address()
64 70
   //   const encryptionKey = crypto.encryptionKey()
65 71
   //   const encryptionKeyPath = path.join(config.root, 'groups', address.toString('hex'), 'encryption_key')
66 72
   //   mkdirp.sync(path.dirname(encryptionKeyPath))
67 73
   //   fs.writeFileSync(encryptionKeyPath, encryptionKey, { mode: fs.constants.S_IRUSR })
68
-  //   const group = Group(storage, address, { config })
74
+  //   const group = Group(storage, address, { config, name: randomWords(1).pop()  })
69 75
   //   assert.ok(group, 'group loaded')
70 76
   //   assert.equal(
71 77
   //     group.encryptionKey.toString('hex'),
@@ -75,20 +81,6 @@ describe('basic', (context) => {
75 81
   //   cleanup(storage, next)
76 82
   // })
77 83
 
78
-  context('returns a name', (assert, next) => {
79
-    var storage = tmp()
80
-    var config = Config(storage)
81
-    var address = crypto.address()
82
-    var encryptionKey = crypto.encryptionKey()
83
-    let group = { name: 'magma', address, encryptionKey }
84
-    config.groups.set(group.address, group)
85
-    config.save()
86
-
87
-    group = Group(storage, address, { config, encryptionKey })
88
-    assert.same(group.name, 'magma', 'returns a name')
89
-    cleanup(storage, next)
90
-  })
91
-
92 84
   // context('append to a group log and check its size', async (assert, next) => {
93 85
   //   var storage = tmp()
94 86
   //   var address = crypto.address()
@@ -144,7 +136,7 @@ describe('basic', (context) => {
144 136
   context('check derived keys', (assert, next) => {
145 137
     var storage = tmp()
146 138
     var address = crypto.address()
147
-    var group = Group(storage, address)
139
+    var group = Group(storage, address, { name: randomWords(1).pop() })
148 140
 
149 141
     group.ready(() => {
150 142
       assert.same(group.drive.state.key, crypto.keyPair(group.masterKey, 0, group.address).publicKey, 'State key derived correctly')
@@ -183,11 +175,13 @@ describe('basic', (context) => {
183 175
   // })
184 176
 
185 177
   context('ls drive', (assert, next) => {
186
-    const storage = tmp()
187
-    const config = Config(storage)
188
-    const address = crypto.address()
189
-    const group = Group(storage, address, { config })
178
+    var storage = tmp()
179
+    var config = Config(storage)
180
+    var address = crypto.address()
181
+    var group = Group(storage, address, { config, name: randomWords(1).pop() })
182
+
190 183
     assert.ok(group, 'group loaded')
184
+
191 185
     group.ready(() => {
192 186
       group.drive.writeFile('/hello.txt', 'world', (err) => {
193 187
         assert.error(err, 'no error on writeFile')
@@ -195,9 +189,6 @@ describe('basic', (context) => {
195 189
           assert.equal(Object.keys(dirObj)[0], 'hello.txt', 'lists file')
196 190
           assert.true(dirObj['hello.txt'].mtime > 10000, 'file has a timestamp')
197 191
           cleanup(storage, next)
198
-        }, (err) => {
199
-          assert.error(err, 'no error')
200
-          cleanup(storage, next)
201 192
         })
202 193
       })
203 194
     })
@@ -205,80 +196,15 @@ describe('basic', (context) => {
205 196
 })
206 197
 
207 198
 describe('connection', (context) => {
208
-  context('swarm', async (assert, next) => {
209
-    var storage = tmp()
210
-    var address = crypto.address()
211
-    var group = Group(storage, address)
212
-    assert.notOk(group._connection, 'no connecton')
213
-    assert.notOk(group._isSwarming, 'is not swarming')
214
-
215
-    await group.ready()
216
-
217
-    group.swarm()
218
-
219
-    assert.ok(group._connection, 'opens a connection')
220
-    assert.ok(group._isSwarming, 'is swarming')
221
-
222
-    group.unswarm()
223
-
224
-    assert.notOk(group._connection, 'closes the connection')
225
-    assert.notOk(group._isSwarming, 'is not swarming')
226
-
227
-    cleanup(storage, next)
228
-  })
229
-
230
-  context('add-connection', (assert, next) => {
231
-    var storage = tmp()
232
-    var address = crypto.address()
233
-    var group = Group(storage, address)
234
-    var peerID = crypto.randomBytes(16).toString('hex')
235
-
236
-    group.ready(() => {
237
-      group._addConnection(peerID, (err) => {
238
-        assert.error(err, 'no error')
239
-
240
-        group.db.network.get(peerID, (err, payload) => {
241
-          assert.error(err, 'no error')
242
-          var buffer = Buffer.from(payload)
243
-          var msg = query.connection.schema.decode(buffer)
244
-          assert.ok(msg.connected, 'stored connected as true')
245
-          assert.ok(msg.timestamp, 'stored a timestamp for last seen at')
246
-          cleanup(storage, next)
247
-        })
248
-      })
249
-    })
250
-  })
251
-
252
-  context('drop-connection', (assert, next) => {
253
-    var storage = tmp()
254
-    var address = crypto.address()
255
-    var group = Group(storage, address)
256
-    var peerID = crypto.randomBytes(16).toString('hex')
257
-
258
-    group.ready(() => {
259
-      group._dropConnection(peerID, (err) => {
260
-        assert.error(err, 'no error')
261
-        group.db.network.get(peerID, (err, payload) => {
262
-          assert.error(err, 'no error')
263
-          var buffer = Buffer.from(payload)
264
-          var msg = query.connection.schema.decode(buffer)
265
-          assert.notOk(msg.connected, 'stored connected as false')
266
-          assert.ok(msg.timestamp, 'stored a timestamp for last seen at')
267
-          cleanup(storage, next)
268
-        })
269
-      })
270
-    })
271
-  })
272
-
273 199
   context('stream', async (assert, next) => {
274 200
     var storage = tmp()
275 201
     var address = crypto.address()
276
-    var group = Group(storage, address)
202
+    var group = Group(storage, address, { name: randomWords(1).pop() })
277 203
     var peerID = 'magma'
278 204
     var count = 0
279 205
 
280 206
     group.ready(() => {
281
-      var stream = group.db.network.createLiveStream()
207
+      var stream = group.db.connections.createLiveStream()
282 208
 
283 209
       stream.on('data', (payload) => {
284 210
         if (payload && payload.sync) return done()
@@ -289,10 +215,10 @@ describe('connection', (context) => {
289 215
         done()
290 216
       })
291 217
 
292
-      group._addConnection(peerID, (err) => {
218
+      group._onPeerConnection(peerID, (err) => {
293 219
         assert.error(err, 'no error')
294 220
 
295
-        group._dropConnection(peerID, (err) => {
221
+        group._onPeerDisconnection(peerID, (err) => {
296 222
           assert.error(err, 'no error')
297 223
         })
298 224
       })
@@ -309,10 +235,12 @@ describe('replication', (context) => {
309 235
     var storage1 = tmp(),
310 236
       storage2 = tmp(),
311 237
       address = crypto.address(),
238
+      name1 = randomWords(1).pop(),
239
+      name2 = randomWords(1).pop(),
312 240
       encryptionKey = crypto.encryptionKey()
313 241
 
314
-    var base1 = Group(storage1, address, { encryptionKey })
315
-    var base2 = Group(storage2, address, { encryptionKey })
242
+    var base1 = Group(storage1, address, { encryptionKey, name: name1 })
243
+    var base2 = Group(storage2, address, { encryptionKey, name: name2 })
316 244
 
317 245
     await base1.ready()
318 246
     await base2.ready()
@@ -467,54 +395,48 @@ describe('replication', (context) => {
467 395
   // })
468 396
 
469 397
   context('defaults to latest value test from kappa-drive', async (assert, next) => {
470
-    var storage1 = tmp()
471
-    var storage2 = tmp()
472
-    var config1 = Config(storage1)
473
-    var config2 = Config(storage2)
474
-    const address = crypto.address()
475
-    const encryptionKey = crypto.encryptionKey()
398
+    var storage1 = tmp(),
399
+      storage2 = tmp(),
400
+      address = crypto.address(),
401
+      encryptionKey = crypto.encryptionKey(),
402
+      name1 = randomWords(1).pop(),
403
+      name2 = randomWords(1).pop(),
404
+      config1 = Config(storage1),
405
+      config2 = Config(storage2)
476 406
 
477
-    var group1 = Group(storage1, address, { encryptionKey, config: config1 })
478
-    var group2 = Group(storage2, address, { encryptionKey, config: config2 })
407
+    var group1 = Group(storage1, address, { encryptionKey, name: name1, config: config1 }),
408
+      group2 = Group(storage2, address, { encryptionKey, name: name2, config: config2 })
479 409
 
480 410
     await group1.ready()
481 411
     await group2.ready()
482 412
 
483
-    const drive = group1.drive
484
-    const drive2 = group2.drive
485
-
486
-    drive.ready((err) => {
487
-      assert.error(err, 'No error on drive.ready')
488
-      drive.writeFile('/hello.txt', 'world', (err) => {
413
+    group1.drive.writeFile('/hello.txt', 'world', (err) => {
414
+      assert.error(err, 'no error')
415
+      group1.drive.writeFile('/hello.txt', 'mundo', (err) => {
489 416
         assert.error(err, 'no error')
490
-        drive.writeFile('/hello.txt', 'mundo', (err) => {
491
-          assert.error(err, 'no error')
492
-          sync()
493
-        })
417
+        sync()
494 418
       })
495 419
     })
496 420
 
497 421
     function writeSecond (cb) {
498
-      drive2.writeFile('/hello.txt', 'verden', (err) => {
422
+      group2.drive.writeFile('/hello.txt', 'verden', (err) => {
499 423
         assert.error(err, 'no error')
500 424
         cb()
501 425
       })
502 426
     }
503 427
 
504 428
     function sync () {
505
-      drive2.ready(() => {
506
-        drive2.writeFile('test.txt', 'testing', (err) => {
507
-          assert.error(err, 'no error')
508
-          replicate(drive, drive2, (err) => {
509
-            assert.error(err, 'no error')
510
-            writeSecond(() => {
511
-              replicate(drive, drive2, (err) => {
429
+      replicate(group1, group2, (err) => {
430
+        assert.error(err, 'no error')
431
+        group2.drive.readFile('/hello.txt', (err, data) => {
432
+          assert.same(data, Buffer.from('mundo'), 'gets latest value')
433
+          writeSecond(() => {
434
+            replicate(group1, group2, (err) => {
435
+              assert.error(err, 'no error')
436
+              group1.drive.readFile('/hello.txt', (err, data) => {
512 437
                 assert.error(err, 'no error')
513
-                drive.readFile('/hello.txt', (err, data) => {
514
-                  assert.error(err, 'no error')
515
-                  assert.same(data, Buffer.from('verden'), 'gets latest value')
516
-                  cleanup([storage1, storage2], next)
517
-                })
438
+                assert.same(data, Buffer.from('verden'), 'gets latest value')
439
+                cleanup([storage1, storage2], next)
518 440
               })
519 441
             })
520 442
           })