Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-user-queries.coffee
1496 views
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
"""
7
User (and project) client queries
8
9
COPYRIGHT : (c) 2017 SageMath, Inc.
10
LICENSE : MS-RSL
11
"""
12
13
MAX_CHANGEFEEDS_PER_CLIENT = 2000
14
15
# Reject all patches that have timestamp that is more than 3 minutes in the future.
16
MAX_PATCH_FUTURE_MS = 1000*60*3
17
18
EventEmitter = require('events')
19
async = require('async')
20
lodash = require('lodash')
21
22
{one_result, all_results, count_result, pg_type, quote_field} = require('./postgres-base')
23
24
{UserQueryQueue} = require('./postgres-user-query-queue')
25
26
{defaults} = misc = require('@cocalc/util/misc')
27
required = defaults.required
28
29
{PROJECT_UPGRADES, SCHEMA, OPERATORS, isToOperand} = require('@cocalc/util/schema')
30
{queryIsCmp, userGetQueryFilter} = require("./user-query/user-get-query")
31
32
{updateRetentionData} = require('./postgres/retention')
33
34
{ checkProjectName } = require("@cocalc/util/db-schema/name-rules");
35
{callback2} = require('@cocalc/util/async-utils')
36
37
38
exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
39
# Cancel all queued up queries by the given client
40
cancel_user_queries: (opts) =>
41
opts = defaults opts,
42
client_id : required
43
@_user_query_queue?.cancel_user_queries(opts)
44
45
user_query: (opts) =>
46
opts = defaults opts,
47
client_id : undefined # if given, uses to control number of queries at once by one client.
48
priority : undefined # (NOT IMPLEMENTED) priority for this query (an integer [-10,...,19] like in UNIX)
49
account_id : undefined
50
project_id : undefined
51
query : required
52
options : []
53
changes : undefined
54
cb : undefined
55
56
if opts.account_id?
57
# Check for "sudo" by admin to query as a different user, which is done by specifying
58
# options = [..., {account_id:'uuid'}, ...].
59
for x in opts.options
60
if x.account_id?
61
# Check user is an admin, then change opts.account_id
62
@get_account
63
columns : ['groups']
64
account_id : opts.account_id
65
cb : (err, r) =>
66
if err
67
opts.cb?(err)
68
else if r['groups']? and 'admin' in r['groups']
69
opts.account_id = x.account_id
70
opts.options = (y for y in opts.options when not y['account_id']?)
71
# now do query with new opts and options not including account_id sudo.
72
@user_query(opts)
73
else
74
opts.cb?('user must be admin to sudo')
75
return
76
77
if not opts.client_id?
78
# No client_id given, so do not use query queue.
79
delete opts.priority
80
delete opts.client_id
81
@_user_query(opts)
82
return
83
84
if not @_user_query_queue?
85
o =
86
do_query : @_user_query
87
dbg : @_dbg('user_query_queue')
88
concurrent : @concurrent
89
@_user_query_queue ?= new UserQueryQueue(o)
90
91
@_user_query_queue.user_query(opts)
92
93
_user_query: (opts) =>
94
opts = defaults opts,
95
account_id : undefined
96
project_id : undefined
97
query : required
98
options : [] # used for initial query; **IGNORED** by changefeed!;
99
# - Use [{set:true}] or [{set:false}] to force get or set query
100
# - For a set query, use {delete:true} to delete instead of set. This is the only way
101
# to delete a record, and won't work unless delete:true is set in the schema
102
# for the table to explicitly allow deleting.
103
changes : undefined # id of change feed
104
cb : undefined # cb(err, result) # WARNING -- this *will* get called multiple times when changes is true!
105
id = misc.uuid().slice(0,6)
106
dbg = @_dbg("_user_query(id=#{id})")
107
dbg(misc.to_json(opts.query))
108
if misc.is_array(opts.query)
109
dbg('array query instead')
110
@_user_query_array(opts)
111
return
112
113
subs =
114
'{account_id}' : opts.account_id
115
'{project_id}' : opts.project_id
116
'{now}' : new Date()
117
118
if opts.changes?
119
changes =
120
id : opts.changes
121
cb : opts.cb
122
123
v = misc.keys(opts.query)
124
if v.length > 1
125
dbg("FATAL no key")
126
opts.cb?('FATAL: must specify exactly one key in the query')
127
return
128
table = v[0]
129
query = opts.query[table]
130
if misc.is_array(query)
131
if query.length > 1
132
dbg("FATAL not implemented")
133
opts.cb?("FATAL: array of length > 1 not yet implemented")
134
return
135
multi = true
136
query = query[0]
137
else
138
multi = false
139
is_set_query = undefined
140
if opts.options?
141
if not misc.is_array(opts.options)
142
dbg("FATAL options")
143
opts.cb?("FATAL: options (=#{misc.to_json(opts.options)}) must be an array")
144
return
145
for x in opts.options
146
if x.set?
147
is_set_query = !!x.set
148
options = (x for x in opts.options when not x.set?)
149
else
150
options = []
151
152
if misc.is_object(query)
153
query = misc.deep_copy(query)
154
misc.obj_key_subs(query, subs)
155
if not is_set_query?
156
is_set_query = not misc.has_null_leaf(query)
157
if is_set_query
158
dbg("do a set query")
159
if changes
160
dbg("FATAL: changefeed")
161
opts.cb?("FATAL: changefeeds only for read queries")
162
return
163
if not opts.account_id? and not opts.project_id?
164
dbg("FATAL: anon set")
165
opts.cb?("FATAL: no anonymous set queries")
166
return
167
dbg("user_set_query")
168
@user_set_query
169
account_id : opts.account_id
170
project_id : opts.project_id
171
table : table
172
query : query
173
options : opts.options
174
cb : (err, x) =>
175
dbg("returned #{err}")
176
opts.cb?(err, {"#{table}":x})
177
else
178
# do a get query
179
if changes and not multi
180
dbg("FATAL: changefeed multi")
181
opts.cb?("FATAL: changefeeds only implemented for multi-document queries")
182
return
183
184
if changes
185
err = @_inc_changefeed_count(opts.account_id, opts.project_id, table, changes.id)
186
if err
187
dbg("err changefeed count -- #{err}")
188
opts.cb?(err)
189
return
190
191
dbg("user_get_query")
192
@user_get_query
193
account_id : opts.account_id
194
project_id : opts.project_id
195
table : table
196
query : query
197
options : options
198
multi : multi
199
changes : changes
200
cb : (err, x) =>
201
dbg("returned #{err}")
202
if err and changes
203
# didn't actually make the changefeed, so don't count it.
204
@_dec_changefeed_count(changes.id, table)
205
opts.cb?(err, if not err then {"#{table}":x})
206
else
207
dbg("FATAL - invalid table")
208
opts.cb?("FATAL: invalid user_query of '#{table}' -- query must be an object")
209
210
###
211
TRACK CHANGEFEED COUNTS
212
213
_inc and dec below are evidently broken, in that it's CRITICAL that they match up exactly, or users will be
214
locked out until they just happen to switch to another hub with different tracking, which is silly.
215
216
TODO: DISABLED FOR NOW!
217
###
218
219
# Increment a count of the number of changefeeds by a given client so we can cap it.
220
_inc_changefeed_count: (account_id, project_id, table, changefeed_id) =>
221
return
222
client_name = "#{account_id}-#{project_id}"
223
cnt = @_user_get_changefeed_counts ?= {}
224
ids = @_user_get_changefeed_id_to_user ?= {}
225
if not cnt[client_name]?
226
cnt[client_name] = 1
227
else if cnt[client_name] >= MAX_CHANGEFEEDS_PER_CLIENT
228
return "user may create at most #{MAX_CHANGEFEEDS_PER_CLIENT} changefeeds; please close files, refresh browser, restart project"
229
else
230
# increment before successfully making get_query to prevent huge bursts causing trouble!
231
cnt[client_name] += 1
232
@_dbg("_inc_changefeed_count(table='#{table}')")("{#{client_name}:#{cnt[client_name]} ...}")
233
ids[changefeed_id] = client_name
234
return false
235
236
# Corresponding decrement of count of the number of changefeeds by a given client.
237
_dec_changefeed_count: (id, table) =>
238
return
239
client_name = @_user_get_changefeed_id_to_user[id]
240
if client_name?
241
@_user_get_changefeed_counts?[client_name] -= 1
242
delete @_user_get_changefeed_id_to_user[id]
243
cnt = @_user_get_changefeed_counts
244
if table?
245
t = "(table='#{table}')"
246
else
247
t = ""
248
@_dbg("_dec_changefeed_count#{t}")("counts={#{client_name}:#{cnt[client_name]} ...}")
249
250
# Handle user_query when opts.query is an array. opts below are as for user_query.
251
_user_query_array: (opts) =>
252
if opts.changes and opts.query.length > 1
253
opts.cb?("FATAL: changefeeds only implemented for single table")
254
return
255
result = []
256
f = (query, cb) =>
257
@user_query
258
account_id : opts.account_id
259
project_id : opts.project_id
260
query : query
261
options : opts.options
262
cb : (err, x) =>
263
result.push(x); cb(err)
264
async.mapSeries(opts.query, f, (err) => opts.cb(err, result))
265
266
user_query_cancel_changefeed: (opts) =>
267
opts = defaults opts,
268
id : required
269
cb : undefined # not really asynchronous
270
dbg = @_dbg("user_query_cancel_changefeed(id='#{opts.id}')")
271
feed = @_changefeeds?[opts.id]
272
if feed?
273
dbg("actually canceling feed")
274
@_dec_changefeed_count(opts.id)
275
delete @_changefeeds[opts.id]
276
feed.close()
277
else
278
dbg("already canceled before (no such feed)")
279
opts.cb?()
280
281
_user_get_query_columns: (query, remove_from_query) =>
282
v = misc.keys(query)
283
if remove_from_query?
284
# If remove_from_query is specified it should be an array of strings
285
# and we do not includes these in what is returned.
286
v = lodash.difference(v, remove_from_query)
287
return v
288
289
_require_is_admin: (account_id, cb) =>
290
if not account_id?
291
cb("FATAL: user must be an admin")
292
return
293
@is_admin
294
account_id : account_id
295
cb : (err, is_admin) =>
296
if err
297
cb(err)
298
else if not is_admin
299
cb("FATAL: user must be an admin")
300
else
301
cb()
302
303
# Ensure that each project_id in project_ids is such that the account is in one of the given
304
# groups for the project, or that the account is an admin. If not, cb(err).
305
_require_project_ids_in_groups: (account_id, project_ids, groups, cb) =>
306
s = {"#{account_id}": true}
307
require_admin = false
308
@_query
309
query : "SELECT project_id, users#>'{#{account_id}}' AS user FROM projects"
310
where : "project_id = ANY($)":project_ids
311
cache : true
312
cb : all_results (err, x) =>
313
if err
314
cb(err)
315
else
316
known_project_ids = {} # we use this to ensure that each of the given project_ids exists.
317
for p in x
318
known_project_ids[p.project_id] = true
319
if p.user?.group not in groups
320
require_admin = true
321
# If any of the project_ids don't exist, reject the query.
322
for project_id in project_ids
323
if not known_project_ids[project_id]
324
cb("FATAL: unknown project_id '#{misc.trunc(project_id,100)}'")
325
return
326
if require_admin
327
@_require_is_admin(account_id, cb)
328
else
329
cb()
330
331
_query_parse_options: (options) =>
332
r = {}
333
for x in options
334
for name, value of x
335
switch name
336
when 'only_changes'
337
r.only_changes = !!value
338
when 'limit'
339
r.limit = parseInt(value)
340
when 'slice'
341
r.slice = value
342
when 'order_by'
343
if value[0] == '-'
344
value = value.slice(1) + " DESC "
345
if r.order_by
346
r.order_by = r.order_by + ', ' + value
347
else
348
r.order_by = value
349
when 'delete'
350
null
351
# ignore delete here - is parsed elsewhere
352
when 'heartbeat'
353
@_dbg("_query_parse_options")("TODO/WARNING -- ignoring heartbeat option from old client")
354
else
355
r.err = "unknown option '#{name}'"
356
# Guard rails: no matter what, all queries are capped with a limit of 100000.
357
# TODO: If somehow somebody has, e.g., more than 100K projects, or maybe more
358
# than 100K edits of a single file, they could hit this and not realize it. I
359
# had this set at 1000 for a few minutes and it caused me to randomly not have
360
# some of my projects.
361
MAX_LIMIT = 100000
362
try
363
if not isFinite(r.limit)
364
r.limit = MAX_LIMIT
365
else if r.limit > MAX_LIMIT
366
r.limit = MAX_LIMIT
367
catch
368
r.limit = MAX_LIMIT
369
return r
370
371
###
372
SET QUERIES
373
###
374
_parse_set_query_opts: (opts) =>
375
r = {}
376
377
if opts.project_id?
378
dbg = r.dbg = @_dbg("user_set_query(project_id='#{opts.project_id}', table='#{opts.table}')")
379
else if opts.account_id?
380
dbg = r.dbg = @_dbg("user_set_query(account_id='#{opts.account_id}', table='#{opts.table}')")
381
else
382
return {err:"FATAL: account_id or project_id must be specified to set query on table='#{opts.table}'"}
383
384
if not SCHEMA[opts.table]?
385
return {err:"FATAL: table '#{opts.table}' does not exist"}
386
387
dbg(misc.to_json(opts.query))
388
389
if opts.options
390
dbg("options=#{misc.to_json(opts.options)}")
391
392
r.query = misc.copy(opts.query)
393
r.table = opts.table
394
r.db_table = SCHEMA[opts.table].virtual ? opts.table
395
r.account_id = opts.account_id
396
r.project_id = opts.project_id
397
398
s = SCHEMA[opts.table]
399
400
if opts.account_id?
401
r.client_query = s?.user_query
402
else
403
r.client_query = s?.project_query
404
405
if not r.client_query?.set?.fields?
406
return {err:"FATAL: user set queries not allowed for table '#{opts.table}'"}
407
408
if not @_mod_fields(opts.query, r.client_query)
409
dbg("shortcut -- no fields will be modified, so nothing to do")
410
return
411
412
for field in misc.keys(r.client_query.set.fields)
413
if r.client_query.set.fields[field] == undefined
414
return {err: "FATAL: user set query not allowed for #{opts.table}.#{field}"}
415
val = r.client_query.set.fields[field]
416
417
if typeof(val) == 'function'
418
try
419
r.query[field] = val(r.query, @)
420
catch err
421
return {err:"FATAL: error setting '#{field}' -- #{err}"}
422
else
423
switch val
424
when 'account_id'
425
if not r.account_id?
426
return {err: "FATAL: account_id must be specified -- make sure you are signed in"}
427
r.query[field] = r.account_id
428
when 'project_id'
429
if not r.project_id?
430
return {err: "FATAL: project_id must be specified"}
431
r.query[field] = r.project_id
432
when 'time_id'
433
r.query[field] = uuid.v1()
434
when 'project_write'
435
if not r.query[field]?
436
return {err: "FATAL: must specify #{opts.table}.#{field}"}
437
r.require_project_ids_write_access = [r.query[field]]
438
when 'project_owner'
439
if not r.query[field]?
440
return {err:"FATAL: must specify #{opts.table}.#{field}"}
441
r.require_project_ids_owner = [r.query[field]]
442
443
if r.client_query.set.admin
444
r.require_admin = true
445
446
r.primary_keys = @_primary_keys(r.db_table)
447
448
r.json_fields = @_json_fields(r.db_table, r.query)
449
450
for k, v of r.query
451
if k in r.primary_keys
452
continue
453
if r.client_query?.set?.fields?[k] != undefined
454
continue
455
if s.admin_query?.set?.fields?[k] != undefined
456
r.require_admin = true
457
continue
458
return {err: "FATAL: changing #{r.table}.#{k} not allowed"}
459
460
# HOOKS which allow for running arbitrary code in response to
461
# user set queries. In each case, new_val below is only the part
462
# of the object that the user requested to change.
463
464
# 0. CHECK: Runs before doing any further processing; has callback, so this
465
# provides a generic way to quickly check whether or not this query is allowed
466
# for things that can't be done declaratively. The check_hook can also
467
# mutate the obj (the user query), e.g., to enforce limits on input size.
468
r.check_hook = r.client_query.set.check_hook
469
470
# 1. BEFORE: If before_change is set, it is called with input
471
# (database, old_val, new_val, account_id, cb)
472
# before the actual change to the database is made.
473
r.before_change_hook = r.client_query.set.before_change
474
475
# 2. INSTEAD OF: If instead_of_change is set, then instead_of_change_hook
476
# is called with input
477
# (database, old_val, new_val, account_id, cb)
478
# *instead* of actually doing the update/insert to
479
# the database. This makes it possible to run arbitrary
480
# code whenever the user does a certain type of set query.
481
# Obviously, if that code doesn't set the new_val in the
482
# database, then new_val won't be the new val.
483
r.instead_of_change_hook = r.client_query.set.instead_of_change
484
485
# 3. AFTER: If set, the on_change_hook is called with
486
# (database, old_val, new_val, account_id, cb)
487
# after everything the database has been modified.
488
r.on_change_hook = r.client_query.set.on_change
489
490
# 4. instead of query
491
r.instead_of_query = r.client_query.set.instead_of_query
492
493
#dbg("on_change_hook=#{on_change_hook?}, #{misc.to_json(misc.keys(client_query.set))}")
494
495
# Set the query options -- order doesn't matter for set queries (unlike for get), so we
496
# just merge the options into a single dictionary.
497
# NOTE: As I write this, there is just one supported option: {delete:true}.
498
r.options = {}
499
if r.client_query.set.options?
500
for x in r.client_query.set.options
501
for y, z of x
502
r.options[y] = z
503
if opts.options?
504
for x in opts.options
505
for y, z of x
506
r.options[y] = z
507
dbg("options = #{misc.to_json(r.options)}")
508
509
if r.options.delete and not r.client_query.set.delete
510
# delete option is set, but deletes aren't explicitly allowed on this table. ERROR.
511
return {err: "FATAL: delete from #{r.table} not allowed"}
512
513
return r
514
515
_user_set_query_enforce_requirements: (r, cb) =>
516
async.parallel([
517
(cb) =>
518
if r.require_admin
519
@_require_is_admin(r.account_id, cb)
520
else
521
cb()
522
(cb) =>
523
if r.require_project_ids_write_access?
524
if r.project_id?
525
err = undefined
526
for x in r.require_project_ids_write_access
527
if x != r.project_id
528
err = "FATAL: can only query same project"
529
break
530
cb(err)
531
else
532
@_require_project_ids_in_groups(r.account_id, r.require_project_ids_write_access,\
533
['owner', 'collaborator'], cb)
534
else
535
cb()
536
(cb) =>
537
if r.require_project_ids_owner?
538
@_require_project_ids_in_groups(r.account_id, r.require_project_ids_owner,\
539
['owner'], cb)
540
else
541
cb()
542
], cb)
543
544
_user_set_query_where: (r) =>
545
where = {}
546
for primary_key in @_primary_keys(r.db_table)
547
value = r.query[primary_key]
548
if SCHEMA[r.db_table].fields[primary_key].noCoerce
549
where["#{primary_key}=$"] = value
550
else
551
type = pg_type(SCHEMA[r.db_table].fields[primary_key])
552
if type == 'TIMESTAMP' and not misc.is_date(value)
553
# Javascript is better at parsing its own dates than PostgreSQL
554
# isNaN test so NOW(), etc. work still
555
x = new Date(value)
556
if not isNaN(x)
557
value = x
558
where["#{primary_key}=$::#{type}"] = value
559
return where
560
561
_user_set_query_values: (r) =>
562
values = {}
563
s = SCHEMA[r.db_table]
564
for key, value of r.query
565
type = pg_type(s?.fields?[key])
566
if value? and type? and not s?.fields?[key]?.noCoerce
567
if type == 'TIMESTAMP' and not misc.is_date(value)
568
# (as above) Javascript is better at parsing its own dates than PostgreSQL
569
x = new Date(value)
570
if not isNaN(x)
571
value = x
572
values["#{key}::#{type}"] = value
573
else
574
values[key] = value
575
return values
576
577
_user_set_query_hooks_prepare: (r, cb) =>
578
if r.on_change_hook? or r.before_change_hook? or r.instead_of_change_hook?
579
for primary_key in r.primary_keys
580
if not r.query[primary_key]?
581
# this is fine -- it just means the old_val isn't defined.
582
# this can happen, e.g., when creating a new object with a primary key that is a generated id.
583
cb()
584
return
585
# get the old value before changing it
586
# TODO: optimization -- can we restrict columns below?
587
@_query
588
query : "SELECT * FROM #{r.db_table}"
589
where : @_user_set_query_where(r)
590
cb : one_result (err, x) =>
591
r.old_val = x; cb(err)
592
else
593
cb()
594
595
_user_query_set_count: (r, cb) =>
596
@_query
597
query : "SELECT COUNT(*) FROM #{r.db_table}"
598
where : @_user_set_query_where(r)
599
cb : count_result(cb)
600
601
_user_query_set_delete: (r, cb) =>
602
@_query
603
query : "DELETE FROM #{r.db_table}"
604
where : @_user_set_query_where(r)
605
cb : cb
606
607
_user_set_query_conflict: (r) =>
608
return r.primary_keys
609
610
_user_query_set_upsert: (r, cb) =>
611
# r.dbg("_user_query_set_upsert #{JSON.stringify(r.query)}")
612
@_query
613
query : "INSERT INTO #{r.db_table}"
614
values : @_user_set_query_values(r)
615
conflict : @_user_set_query_conflict(r)
616
cb : cb
617
618
# Record is already in DB, so we update it:
619
# this function handles a case that involves both
620
# a jsonb_merge and an update.
621
_user_query_set_upsert_and_jsonb_merge: (r, cb) =>
622
jsonb_merge = {}
623
for k of r.json_fields
624
v = r.query[k]
625
if v?
626
jsonb_merge[k] = v
627
set = {}
628
for k, v of r.query
629
if k not in r.primary_keys and not jsonb_merge[k]?
630
set[k] = v
631
@_query
632
query : "UPDATE #{r.db_table}"
633
jsonb_merge : jsonb_merge
634
set : set
635
where : @_user_set_query_where(r)
636
cb : cb
637
638
_user_set_query_main_query: (r, cb) =>
639
r.dbg("_user_set_query_main_query")
640
641
if not r.client_query.set.allow_field_deletes
642
# allow_field_deletes not set, so remove any null/undefined
643
# fields from the query
644
for key of r.query
645
if not r.query[key]?
646
delete r.query[key]
647
648
if r.options.delete
649
for primary_key in r.primary_keys
650
if not r.query[primary_key]?
651
cb("FATAL: delete query must set primary key")
652
return
653
r.dbg("delete based on primary key")
654
@_user_query_set_delete(r, cb)
655
return
656
if r.instead_of_change_hook?
657
r.instead_of_change_hook(@, r.old_val, r.query, r.account_id, cb)
658
else
659
if misc.len(r.json_fields) == 0
660
# easy case -- there are no jsonb merge fields; just do an upsert.
661
@_user_query_set_upsert(r, cb)
662
return
663
# HARD CASE -- there are json_fields... so we are doing an insert
664
# if the object isn't already in the database, and an update
665
# if it is. This is ugly because I don't know how to do both
666
# a JSON merge as an upsert.
667
cnt = undefined # will equal number of records having the primary key (so 0 or 1)
668
async.series([
669
(cb) =>
670
@_user_query_set_count r, (err, n) =>
671
cnt = n; cb(err)
672
(cb) =>
673
r.dbg("do the set query")
674
if cnt == 0
675
# Just insert (do as upsert to avoid error in case of race)
676
@_user_query_set_upsert(r, cb)
677
else
678
# Do as an update -- record is definitely already in db since cnt > 0.
679
# This would fail in the unlikely (but possible) case that somebody deletes
680
# the record between the above count and when we do the UPDATE.
681
# Using a transaction could avoid this.
682
# Maybe such an error is reasonable and it's good to report it as such.
683
@_user_query_set_upsert_and_jsonb_merge(r, cb)
684
], cb)
685
686
user_set_query: (opts) =>
687
opts = defaults opts,
688
account_id : undefined
689
project_id : undefined
690
table : required
691
query : required
692
options : undefined # options=[{delete:true}] is the only supported nontrivial option here.
693
cb : required # cb(err)
694
695
# TODO: it would be nice to return the primary key part of the created object on creation.
696
# That's not implemented and will be somewhat nontrivial, and will use the RETURNING clause
697
# of postgres's INSERT - https://www.postgresql.org/docs/current/sql-insert.html
698
699
if @is_standby
700
opts.cb("set queries against standby not allowed")
701
return
702
r = @_parse_set_query_opts(opts)
703
704
# Only uncomment for debugging -- too big/verbose/dangerous
705
# r.dbg("parsed query opts = #{JSON.stringify(r)}")
706
707
if not r? # nothing to do
708
opts.cb()
709
return
710
if r.err
711
opts.cb(r.err)
712
return
713
714
async.series([
715
(cb) =>
716
@_user_set_query_enforce_requirements(r, cb)
717
(cb) =>
718
if r.check_hook?
719
r.check_hook(@, r.query, r.account_id, r.project_id, cb)
720
else
721
cb()
722
(cb) =>
723
@_user_set_query_hooks_prepare(r, cb)
724
(cb) =>
725
if r.before_change_hook?
726
r.before_change_hook @, r.old_val, r.query, r.account_id, (err, stop) =>
727
r.done = stop
728
cb(err)
729
else
730
cb()
731
(cb) =>
732
if r.done
733
cb()
734
return
735
if r.instead_of_query?
736
opts1 = misc.copy_without(opts, ['cb', 'changes', 'table'])
737
r.instead_of_query(@, opts1, cb)
738
else
739
@_user_set_query_main_query(r, cb)
740
(cb) =>
741
if r.done
742
cb()
743
return
744
if r.on_change_hook?
745
r.on_change_hook(@, r.old_val, r.query, r.account_id, cb)
746
else
747
cb()
748
], (err) => opts.cb(err))
749
750
# mod_fields counts the fields in query that might actually get modified
751
# in the database when we do the query; e.g., account_id won't since it gets
752
# filled in with the user's account_id, and project_write won't since it must
753
# refer to an existing project. We use mod_field **only** to skip doing
754
# no-op queries. It's just an optimization.
755
_mod_fields: (query, client_query) =>
756
for field in misc.keys(query)
757
if client_query.set.fields[field] not in ['account_id', 'project_write']
758
return true
759
return false
760
761
_user_get_query_json_timestamps: (obj, fields) =>
762
# obj is an object returned from the database via a query
763
# Postgres JSONB doesn't support timestamps, so we convert
764
# every json leaf node of obj that looks like JSON of a timestamp
765
# to a Javascript Date.
766
for k, v of obj
767
if fields[k]
768
obj[k] = misc.fix_json_dates(v, fields[k])
769
770
# fill in the default values for obj using the client_query spec.
771
_user_get_query_set_defaults: (client_query, obj, fields) =>
772
if not misc.is_array(obj)
773
obj = [obj]
774
else if obj.length == 0
775
return
776
s = client_query?.get?.fields ? {}
777
for k in fields
778
v = s[k]
779
if v?
780
# k is a field for which a default value (=v) is provided in the schema
781
for x in obj
782
# For each obj pulled from the database that is defined...
783
if x?
784
# We check to see if the field k was set on that object.
785
y = x[k]
786
if not y?
787
# It was NOT set, so we deep copy the default value for the field k.
788
x[k] = misc.deep_copy(v)
789
else if typeof(v) == 'object' and typeof(y) == 'object' and not misc.is_array(v)
790
# y *is* defined and is an object, so we merge in the provided defaults.
791
for k0, v0 of v
792
if not y[k0]?
793
y[k0] = v0
794
795
_user_set_query_project_users: (obj, account_id) =>
796
dbg = @_dbg("_user_set_query_project_users")
797
if not obj.users?
798
# nothing to do -- not changing users.
799
return
800
##dbg("disabled")
801
##return obj.users
802
# - ensures all keys of users are valid uuid's (though not that they are valid users).
803
# - and format is:
804
# {group:'owner' or 'collaborator', hide:bool, upgrades:{a map}}
805
# with valid upgrade fields.
806
upgrade_fields = PROJECT_UPGRADES.params
807
users = {}
808
# TODO: we obviously should check that a user is only changing the part
809
# of this object involving themselves... or adding/removing collaborators.
810
# That is not currently done below. TODO TODO TODO SECURITY.
811
for id, x of obj.users
812
if misc.is_valid_uuid_string(id)
813
for key in misc.keys(x)
814
if key not in ['group', 'hide', 'upgrades', 'ssh_keys']
815
throw Error("unknown field '#{key}")
816
if x.group? and (x.group not in ['owner', 'collaborator'])
817
throw Error("invalid value for field 'group'")
818
if x.hide? and typeof(x.hide) != 'boolean'
819
throw Error("invalid type for field 'hide'")
820
if x.upgrades?
821
if not misc.is_object(x.upgrades)
822
throw Error("invalid type for field 'upgrades'")
823
for k,_ of x.upgrades
824
if not upgrade_fields[k]
825
throw Error("invalid upgrades field '#{k}'")
826
if x.ssh_keys
827
# do some checks.
828
if not misc.is_object(x.ssh_keys)
829
throw Error("ssh_keys must be an object")
830
for fingerprint, key of x.ssh_keys
831
if not key # deleting
832
continue
833
if not misc.is_object(key)
834
throw Error("each key in ssh_keys must be an object")
835
for k, v of key
836
# the two dates are just numbers not actual timestamps...
837
if k not in ['title', 'value', 'creation_date', 'last_use_date']
838
throw Error("invalid ssh_keys field '#{k}'")
839
users[id] = x
840
return users
841
842
project_action: (opts) =>
843
opts = defaults opts,
844
project_id : required
845
action_request : required # action is object {action:?, time:?}
846
cb : required
847
if opts.action_request.action == 'test'
848
# used for testing -- shouldn't trigger anything to happen.
849
opts.cb()
850
return
851
dbg = @_dbg("project_action(project_id='#{opts.project_id}',action_request=#{misc.to_json(opts.action_request)})")
852
dbg()
853
project = undefined
854
action_request = misc.copy(opts.action_request)
855
set_action_request = (cb) =>
856
dbg("set action_request to #{misc.to_json(action_request)}")
857
@_query
858
query : "UPDATE projects"
859
where : 'project_id = $::UUID':opts.project_id
860
jsonb_set : {action_request : action_request}
861
cb : cb
862
async.series([
863
(cb) =>
864
action_request.started = new Date()
865
set_action_request(cb)
866
(cb) =>
867
dbg("get project")
868
try
869
project = await @projectControl(opts.project_id)
870
cb()
871
catch err
872
cb(err)
873
(cb) =>
874
dbg("doing action")
875
try
876
switch action_request.action
877
when 'restart'
878
await project.restart()
879
when 'stop'
880
await project.stop()
881
when 'start'
882
await project.start()
883
else
884
throw Error("FATAL: action '#{opts.action_request.action}' not implemented")
885
cb()
886
catch err
887
cb(err)
888
], (err) =>
889
if err
890
action_request.err = err
891
action_request.finished = new Date()
892
dbg("finished!")
893
set_action_request(opts.cb)
894
)
895
896
# This hook is called *before* the user commits a change to a project in the database
897
# via a user set query.
898
# TODO: Add a pre-check here as well that total upgrade isn't going to be exceeded.
899
# This will avoid a possible subtle edge case if user is cheating and always somehow
900
# crashes server...?
901
_user_set_query_project_change_before: (old_val, new_val, account_id, cb) =>
902
#dbg = @_dbg("_user_set_query_project_change_before #{account_id}, #{misc.to_json(old_val)} --> #{misc.to_json(new_val)}")
903
# I've seen MASSIVE OUTPUT from this, e.g., when setting avatar.
904
dbg = @_dbg("_user_set_query_project_change_before #{account_id}")
905
dbg()
906
907
if new_val?.name and (new_val?.name != old_val?.name)
908
# Changing or setting the name of the project to something nontrivial.
909
try
910
checkProjectName(new_val.name);
911
catch err
912
cb(err.toString())
913
return
914
if new_val.name
915
# Setting name to something nontrivial, so we must check uniqueness
916
# among all projects this user owns.
917
result = await callback2 @_query,
918
query : 'SELECT COUNT(*) FROM projects'
919
where :
920
"users#>>'{#{account_id},group}' = $::TEXT" : 'owner'
921
"project_id != $::UUID" : new_val.project_id
922
"LOWER(name) = $::TEXT":new_val.name.toLowerCase()
923
if result.rows[0].count > 0
924
cb("There is already a project with the same owner as this project and name='#{new_val.name}'. Names are not case sensitive.")
925
return
926
# A second constraint is that only the project owner can change the project name.
927
result = await callback2 @_query,
928
query : 'SELECT COUNT(*) FROM projects'
929
where :
930
"users#>>'{#{account_id},group}' = $::TEXT" : 'owner'
931
"project_id = $::UUID" : new_val.project_id
932
if result.rows[0].count == 0
933
cb("Only the owner of the project can currently change the project name.")
934
return
935
936
if new_val?.action_request? and JSON.stringify(new_val.action_request.time) != JSON.stringify(old_val?.action_request?.time)
937
# Requesting an action, e.g., save, restart, etc.
938
dbg("action_request -- #{misc.to_json(new_val.action_request)}")
939
#
940
# WARNING: Above, we take the difference of times below, since != doesn't work as we want with
941
# separate Date objects, as it will say equal dates are not equal. Example:
942
# coffee> x = JSON.stringify(new Date()); {from_json}=require('misc'); a=from_json(x); b=from_json(x); [a!=b, a-b]
943
# [ true, 0 ]
944
945
# Launch the action -- success or failure communicated back to all clients through changes to state.
946
# Also, we don't have to worry about permissions here; that this function got called at all means
947
# the user has write access to the projects table entry with given project_id, which gives them permission
948
# to do any action with the project.
949
@project_action
950
project_id : new_val.project_id
951
action_request : misc.copy_with(new_val.action_request, ['action', 'time'])
952
cb : (err) =>
953
dbg("action_request #{misc.to_json(new_val.action_request)} completed -- #{err}")
954
# true means -- do nothing further. We don't want to the user to
955
# set this same thing since we already dealt with it properly.
956
cb(err, true)
957
return
958
959
if not new_val.users? # not changing users
960
cb(); return
961
old_val = old_val?.users ? {}
962
new_val = new_val?.users ? {}
963
for id in misc.keys(old_val).concat(new_val)
964
if account_id != id
965
# make sure user doesn't change anybody else's allocation
966
if not lodash.isEqual(old_val?[id]?.upgrades, new_val?[id]?.upgrades)
967
err = "FATAL: user '#{account_id}' tried to change user '#{id}' allocation toward a project"
968
dbg(err)
969
cb(err)
970
return
971
cb()
972
973
# This hook is called *after* the user commits a change to a project in the database
974
# via a user set query. It could undo changes the user isn't allowed to make, which
975
# might require doing various async calls, or take actions (e.g., setting quotas,
976
# starting projects, etc.).
977
_user_set_query_project_change_after: (old_val, new_val, account_id, cb) =>
978
dbg = @_dbg("_user_set_query_project_change_after #{account_id}, #{misc.to_json(old_val)} --> #{misc.to_json(new_val)}")
979
dbg()
980
old_upgrades = old_val.users?[account_id]?.upgrades
981
new_upgrades = new_val.users?[account_id]?.upgrades
982
if new_upgrades? and not lodash.isEqual(old_upgrades, new_upgrades)
983
dbg("upgrades changed for #{account_id} from #{misc.to_json(old_upgrades)} to #{misc.to_json(new_upgrades)}")
984
project = undefined
985
async.series([
986
(cb) =>
987
@ensure_user_project_upgrades_are_valid
988
account_id : account_id
989
cb : cb
990
(cb) =>
991
if not @projectControl?
992
cb()
993
else
994
dbg("get project")
995
try
996
project = await @projectControl(new_val.project_id)
997
cb()
998
catch err
999
cb(err)
1000
(cb) =>
1001
if not project?
1002
cb()
1003
else
1004
dbg("determine total quotas and apply")
1005
try
1006
await project.setAllQuotas()
1007
cb()
1008
catch err
1009
cb(err)
1010
], cb)
1011
else
1012
cb()
1013
1014
###
1015
GET QUERIES
1016
###
1017
1018
# Make any functional substitutions defined by the schema.
1019
# This may mutate query in place.
1020
_user_get_query_functional_subs: (query, fields) =>
1021
if fields?
1022
for field, val of fields
1023
if typeof(val) == 'function'
1024
query[field] = val(query, @)
1025
1026
_parse_get_query_opts: (opts) =>
1027
if opts.changes? and not opts.changes.cb?
1028
return {err: "FATAL: user_get_query -- if opts.changes is specified, then opts.changes.cb must also be specified"}
1029
1030
r = {}
1031
# get data about user queries on this table
1032
if opts.project_id?
1033
r.client_query = SCHEMA[opts.table]?.project_query
1034
else
1035
r.client_query = SCHEMA[opts.table]?.user_query
1036
1037
if not r.client_query?.get?
1038
return {err: "FATAL: get queries not allowed for table '#{opts.table}'"}
1039
1040
if not opts.account_id? and not opts.project_id? and not SCHEMA[opts.table].anonymous
1041
return {err: "FATAL: anonymous get queries not allowed for table '#{opts.table}'"}
1042
1043
r.table = SCHEMA[opts.table].virtual ? opts.table
1044
1045
r.primary_keys = @_primary_keys(opts.table)
1046
1047
# Are only admins allowed any get access to this table?
1048
r.require_admin = !!r.client_query.get.admin
1049
1050
# Verify that all requested fields may be read by users
1051
for field in misc.keys(opts.query)
1052
if r.client_query.get.fields?[field] == undefined
1053
return {err: "FATAL: user get query not allowed for #{opts.table}.#{field}"}
1054
1055
# Functional substitutions defined by schema
1056
@_user_get_query_functional_subs(opts.query, r.client_query.get?.fields)
1057
1058
if r.client_query.get?.instead_of_query?
1059
return r
1060
1061
# Sanity check: make sure there is something in the query
1062
# that gets only things in this table that this user
1063
# is allowed to see, or at least a check_hook. This is not required
1064
# for admins.
1065
if not r.client_query.get.pg_where? and not r.client_query.get.check_hook? and not r.require_admin
1066
return {err: "FATAL: user get query not allowed for #{opts.table} (no getAll filter - pg_where or check_hook)"}
1067
1068
# Apply default options to the get query (don't impact changefeed)
1069
# The user can override these, e.g., if they were to want to explicitly increase a limit
1070
# to get more file use history.
1071
user_options = {}
1072
for x in opts.options
1073
for y, z of x
1074
user_options[y] = true
1075
1076
get_options = undefined
1077
if @is_heavily_loaded() and r.client_query.get.options_load?
1078
get_options = r.client_query.get.options_load
1079
else if r.client_query.get.options?
1080
get_options = r.client_query.get.options
1081
if get_options?
1082
# complicated since options is a list of {opt:val} !
1083
for x in get_options
1084
for y, z of x
1085
if not user_options[y]
1086
opts.options.push(x)
1087
break
1088
1089
r.json_fields = @_json_fields(opts.table, opts.query)
1090
return r
1091
1092
# _json_fields: map from field names to array of fields that should be parsed as timestamps
1093
# These keys of his map are also used by _user_query_set_upsert_and_jsonb_merge to determine
1094
# JSON deep merging for set queries.
1095
_json_fields: (table, query) =>
1096
json_fields = {}
1097
for field, info of SCHEMA[table].fields
1098
if (query[field]? or query[field] == null) and (info.type == 'map' or info.pg_type == 'JSONB')
1099
json_fields[field] = info.date ? []
1100
return json_fields
1101
1102
_user_get_query_where: (client_query, account_id, project_id, user_query, table, cb) =>
1103
dbg = @_dbg("_user_get_query_where")
1104
dbg()
1105
1106
pg_where = client_query.get.pg_where
1107
1108
if @is_heavily_loaded() and client_query.get.pg_where_load?
1109
# use a different query if load is heavy
1110
pg_where = client_query.get.pg_where_load
1111
1112
if not pg_where?
1113
pg_where = []
1114
if pg_where == 'projects'
1115
pg_where = ['projects']
1116
1117
if typeof(pg_where) == 'function'
1118
pg_where = pg_where(user_query, @)
1119
if not misc.is_array(pg_where)
1120
cb("FATAL: pg_where must be an array (of strings or objects)")
1121
return
1122
1123
# Do NOT mutate the schema itself!
1124
pg_where = misc.deep_copy(pg_where)
1125
1126
# expand 'projects' in query, depending on whether project_id is specified or not.
1127
# This is just a convenience to make the db schema simpler.
1128
for i in [0...pg_where.length]
1129
if pg_where[i] == 'projects'
1130
if user_query.project_id
1131
pg_where[i] = {"project_id = $::UUID" : 'project_id'}
1132
else
1133
pg_where[i] = {"project_id = ANY(select project_id from projects where users ? $::TEXT)" : 'account_id'}
1134
1135
# Now we fill in all the parametrized substitutions in the pg_where list.
1136
subs = {}
1137
for x in pg_where
1138
if misc.is_object(x)
1139
for _, value of x
1140
subs[value] = value
1141
1142
sub_value = (value, cb) =>
1143
switch value
1144
when 'account_id'
1145
if not account_id?
1146
cb('FATAL: account_id must be given')
1147
return
1148
subs[value] = account_id
1149
cb()
1150
when 'project_id'
1151
if project_id?
1152
subs[value] = project_id
1153
cb()
1154
else if not user_query.project_id
1155
cb("FATAL: must specify project_id")
1156
else if SCHEMA[table].anonymous
1157
subs[value] = user_query.project_id
1158
cb()
1159
else
1160
@user_is_in_project_group
1161
account_id : account_id
1162
project_id : user_query.project_id
1163
groups : ['owner', 'collaborator']
1164
cb : (err, in_group) =>
1165
if err
1166
cb(err)
1167
else if in_group
1168
subs[value] = user_query.project_id
1169
cb()
1170
else
1171
cb("FATAL: you do not have read access to this project -- account_id=#{account_id}, project_id_=#{project_id}")
1172
when 'project_id-public'
1173
if not user_query.project_id?
1174
cb("FATAL: must specify project_id")
1175
else
1176
if SCHEMA[table].anonymous
1177
@has_public_path
1178
project_id : user_query.project_id
1179
cb : (err, has_public_path) =>
1180
if err
1181
cb(err)
1182
else if not has_public_path
1183
cb("project does not have any public paths")
1184
else
1185
subs[value] = user_query.project_id
1186
cb()
1187
else
1188
cb("FATAL: table must allow anonymous queries")
1189
else
1190
cb()
1191
1192
async.map misc.keys(subs), sub_value, (err) =>
1193
if err
1194
cb(err)
1195
return
1196
for x in pg_where
1197
if misc.is_object(x)
1198
for key, value of x
1199
x[key] = subs[value]
1200
1201
# impose further restrictions (more where conditions)
1202
pg_where.push(userGetQueryFilter(user_query, client_query))
1203
1204
cb(undefined, pg_where)
1205
1206
_user_get_query_options: (options, multi, schema_options) =>
1207
r = {}
1208
1209
if schema_options?
1210
options = options.concat(schema_options)
1211
1212
# Parse option part of the query
1213
{limit, order_by, slice, only_changes, err} = @_query_parse_options(options)
1214
1215
if err
1216
return {err: err}
1217
if only_changes
1218
r.only_changes = true
1219
if limit?
1220
r.limit = limit
1221
else if not multi
1222
r.limit = 1
1223
if order_by?
1224
r.order_by = order_by
1225
if slice?
1226
return {err: "slice not implemented"}
1227
return r
1228
1229
_user_get_query_do_query: (query_opts, client_query, user_query, multi, json_fields, cb) =>
1230
query_opts.cb = all_results (err, x) =>
1231
if err
1232
cb(err)
1233
else
1234
if misc.len(json_fields) > 0
1235
# Convert timestamps to Date objects, if **explicitly** specified in the schema
1236
for obj in x
1237
@_user_get_query_json_timestamps(obj, json_fields)
1238
1239
if not multi
1240
x = x[0]
1241
# Fill in default values and remove null's
1242
@_user_get_query_set_defaults(client_query, x, misc.keys(user_query))
1243
# Get rid of undefined fields -- that's the default and wastes memory and bandwidth
1244
if x?
1245
for obj in x
1246
misc.map_mutate_out_undefined_and_null(obj)
1247
cb(undefined, x)
1248
@_query(query_opts)
1249
1250
_user_get_query_query: (table, user_query, remove_from_query) =>
1251
return "SELECT #{(quote_field(field) for field in @_user_get_query_columns(user_query, remove_from_query)).join(',')} FROM #{table}"
1252
1253
_user_get_query_satisfied_by_obj: (user_query, obj, possible_time_fields) =>
1254
#dbg = @_dbg("_user_get_query_satisfied_by_obj)
1255
#dbg(user_query, obj)
1256
for field, value of obj
1257
date_keys = possible_time_fields[field]
1258
if date_keys
1259
value = misc.fix_json_dates(value, date_keys)
1260
if (q = user_query[field])?
1261
if (op = queryIsCmp(q))
1262
#dbg(value:value, op: op, q:q)
1263
x = q[op]
1264
switch op
1265
when '=='
1266
if value != x
1267
return false
1268
when '!='
1269
if value == x
1270
return false
1271
when '>='
1272
if value < x
1273
return false
1274
when '<='
1275
if value > x
1276
return false
1277
when '>'
1278
if value <= x
1279
return false
1280
when '<'
1281
if value >= x
1282
return false
1283
else if value != q
1284
return false
1285
return true
1286
1287
_user_get_query_handle_field_deletes: (client_query, new_val) =>
1288
if client_query.get.allow_field_deletes
1289
# leave in the nulls that might be in new_val
1290
return
1291
# remove all nulls from new_val. Right now we
1292
# just can't support this due to default values.
1293
# TODO: completely get rid of default values (?) or
1294
# maybe figure out how to implement this. The symptom
1295
# of not doing this is a normal user will do things like
1296
# delete the users field of their projects. Not good.
1297
for key of new_val
1298
if not new_val[key]?
1299
delete new_val[key]
1300
1301
_user_get_query_changefeed: (changes, table, primary_keys, user_query,
1302
where, json_fields, account_id, client_query, orig_table, cb) =>
1303
dbg = @_dbg("_user_get_query_changefeed(table='#{table}')")
1304
dbg()
1305
# WARNING: always call changes.cb! Do not do something like f = changes.cb, then call f!!!!
1306
# This is because the value of changes.cb may be changed by the caller.
1307
if not misc.is_object(changes)
1308
cb("FATAL: changes must be an object with keys id and cb")
1309
return
1310
if not misc.is_valid_uuid_string(changes.id)
1311
cb("FATAL: changes.id must be a uuid")
1312
return
1313
if typeof(changes.cb) != 'function'
1314
cb("FATAL: changes.cb must be a function")
1315
return
1316
for primary_key in primary_keys
1317
if not user_query[primary_key]? and user_query[primary_key] != null
1318
cb("FATAL: changefeed MUST include primary key (='#{primary_key}') in query")
1319
return
1320
watch = []
1321
select = {}
1322
init_tracker = tracker = free_tracker = undefined
1323
possible_time_fields = misc.deep_copy(json_fields)
1324
feed = undefined
1325
1326
changefeed_keys = SCHEMA[orig_table]?.changefeed_keys ? SCHEMA[table]?.changefeed_keys ? []
1327
for field, val of user_query
1328
type = pg_type(SCHEMA[table]?.fields?[field])
1329
if type == 'TIMESTAMP'
1330
possible_time_fields[field] = 'all'
1331
if val == null and field not in primary_keys and field not in changefeed_keys
1332
watch.push(field)
1333
else
1334
select[field] = type
1335
1336
if misc.len(possible_time_fields) > 0
1337
# Convert (likely) timestamps to Date objects; fill in defaults for inserts
1338
process = (x) =>
1339
if not x?
1340
return
1341
if x.new_val?
1342
@_user_get_query_json_timestamps(x.new_val, possible_time_fields)
1343
if x.action == 'insert' # do not do this for delete or update actions!
1344
@_user_get_query_set_defaults(client_query, x.new_val, misc.keys(user_query))
1345
else if x.action == 'update'
1346
@_user_get_query_handle_field_deletes(client_query, x.new_val)
1347
if x.old_val?
1348
@_user_get_query_json_timestamps(x.old_val, possible_time_fields)
1349
else
1350
process = (x) =>
1351
if not x?
1352
return
1353
if x.new_val?
1354
if x.action == 'insert' # do not do this for delete or update actions!
1355
@_user_get_query_set_defaults(client_query, x.new_val, misc.keys(user_query))
1356
else if x.action == 'update'
1357
@_user_get_query_handle_field_deletes(client_query, x.new_val)
1358
1359
async.series([
1360
(cb) =>
1361
# check for alternative where test for changefeed.
1362
pg_changefeed = client_query?.get?.pg_changefeed
1363
if not pg_changefeed?
1364
cb(); return
1365
1366
if pg_changefeed == 'projects'
1367
tracker_add = (project_id) => feed.insert({project_id:project_id})
1368
tracker_remove = (project_id) => feed.delete({project_id:project_id})
1369
1370
# Any tracker error means this changefeed is now broken and
1371
# has to be recreated.
1372
tracker_error = () => changes.cb("tracker error - ${err}")
1373
1374
pg_changefeed = (db, account_id) =>
1375
where : (obj) =>
1376
# Check that this is a project we have read access to
1377
if not db._project_and_user_tracker?.get_projects(account_id)[obj.project_id]
1378
return false
1379
# Now check our actual query conditions on the object.
1380
# This would normally be done by the changefeed, but since
1381
# we are passing in a custom where, we have to do it.
1382
if not @_user_get_query_satisfied_by_obj(user_query, obj, possible_time_fields)
1383
return false
1384
return true
1385
1386
select : {'project_id':'UUID'}
1387
1388
init_tracker : (tracker) =>
1389
tracker.on "add_user_to_project-#{account_id}", tracker_add
1390
tracker.on "remove_user_from_project-#{account_id}", tracker_remove
1391
tracker.once 'error', tracker_error
1392
1393
1394
free_tracker : (tracker) =>
1395
dbg("freeing project tracker events")
1396
tracker.removeListener("add_user_to_project-#{account_id}", tracker_add)
1397
tracker.removeListener("remove_user_from_project-#{account_id}", tracker_remove)
1398
tracker.removeListener("error", tracker_error)
1399
1400
1401
else if pg_changefeed == 'news'
1402
pg_changefeed = ->
1403
where : (obj) ->
1404
if obj.date?
1405
date_obj = new Date(obj.date)
1406
# we send future news items to the frontend, but filter it based on the server time
1407
return date_obj >= misc.months_ago(3)
1408
else
1409
return true
1410
select : {id: 'SERIAL UNIQUE', date: 'TIMESTAMP'}
1411
1412
else if pg_changefeed == 'one-hour'
1413
pg_changefeed = ->
1414
where : (obj) ->
1415
if obj.time?
1416
return new Date(obj.time) >= misc.hours_ago(1)
1417
else
1418
return true
1419
select : {id:'UUID', time:'TIMESTAMP'}
1420
1421
else if pg_changefeed == 'five-minutes'
1422
pg_changefeed = ->
1423
where : (obj) ->
1424
if obj.time?
1425
return new Date(obj.time) >= misc.minutes_ago(5)
1426
else
1427
return true
1428
select : {id:'UUID', time:'TIMESTAMP'}
1429
1430
else if pg_changefeed == 'collaborators'
1431
if not account_id?
1432
cb("FATAL: account_id must be given")
1433
return
1434
tracker_add = (collab_id) => feed.insert({account_id:collab_id})
1435
tracker_remove = (collab_id) => feed.delete({account_id:collab_id})
1436
tracker_error = () => changes.cb("tracker error - ${err}")
1437
pg_changefeed = (db, account_id) ->
1438
shared_tracker = undefined
1439
where : (obj) -> # test of "is a collab with me"
1440
return shared_tracker.get_collabs(account_id)?[obj.account_id]
1441
init_tracker : (tracker) =>
1442
shared_tracker = tracker
1443
tracker.on "add_collaborator-#{account_id}", tracker_add
1444
tracker.on "remove_collaborator-#{account_id}", tracker_remove
1445
tracker.once 'error', tracker_error
1446
free_tracker : (tracker) =>
1447
dbg("freeing collab tracker events")
1448
tracker.removeListener("add_collaborator-#{account_id}", tracker_add)
1449
tracker.removeListener("remove_collaborator-#{account_id}", tracker_remove)
1450
tracker.removeListener("error", tracker_error)
1451
1452
1453
x = pg_changefeed(@, account_id)
1454
if x.init_tracker?
1455
init_tracker = x.init_tracker
1456
if x.free_tracker?
1457
free_tracker = x.free_tracker
1458
if x.select?
1459
for k, v of x.select
1460
select[k] = v
1461
1462
if x.where? or x.init_tracker?
1463
where = x.where
1464
if not account_id?
1465
cb()
1466
return
1467
# initialize user tracker is needed for where tests...
1468
@project_and_user_tracker cb : (err, _tracker) =>
1469
if err
1470
cb(err)
1471
else
1472
tracker = _tracker
1473
try
1474
await tracker.register(account_id)
1475
cb()
1476
catch err
1477
cb(err)
1478
else
1479
cb()
1480
(cb) =>
1481
@changefeed
1482
table : table
1483
select : select
1484
where : where
1485
watch : watch
1486
cb : (err, _feed) =>
1487
# there *is* a glboal variable feed that we set here:
1488
feed = _feed
1489
if err
1490
cb(err)
1491
return
1492
feed.on 'change', (x) ->
1493
process(x)
1494
changes.cb(undefined, x)
1495
feed.on 'close', ->
1496
changes.cb(undefined, {action:'close'})
1497
dbg("feed close")
1498
if tracker? and free_tracker?
1499
dbg("free_tracker")
1500
free_tracker(tracker)
1501
else
1502
dbg("do NOT free_tracker")
1503
feed.on 'error', (err) ->
1504
changes.cb("feed error - #{err}")
1505
@_changefeeds ?= {}
1506
@_changefeeds[changes.id] = feed
1507
init_tracker?(tracker)
1508
cb()
1509
], cb)
1510
1511
user_get_query: (opts) =>
1512
opts = defaults opts,
1513
account_id : undefined
1514
project_id : undefined
1515
table : required
1516
query : required
1517
multi : required
1518
options : required # used for initial query; **IGNORED** by changefeed,
1519
# which ensures that *something* is sent every n minutes, in case no
1520
# changes are coming out of the changefeed. This is an additional
1521
# measure in case the client somehow doesn't get a "this changefeed died" message.
1522
# Use [{delete:true}] to instead delete the selected records (must
1523
# have delete:true in schema).
1524
changes : undefined # {id:?, cb:?}
1525
cb : required # cb(err, result)
1526
###
1527
The general idea is that user get queries are of the form
1528
1529
SELECT [columns] FROM table WHERE [get_all] AND [further restrictions] LIMIT/slice
1530
1531
Using the whitelist rules specified in SCHEMA, we
1532
determine each of the above, then run the query.
1533
1534
If no error in query, and changes is a given uuid, set up a change
1535
feed that calls opts.cb on changes as well.
1536
###
1537
id = misc.uuid().slice(0,6)
1538
#dbg = @_dbg("user_get_query(id=#{id})")
1539
dbg = -> # Logging below is just too verbose, and turns out to not be useful...
1540
dbg("account_id='#{opts.account_id}', project_id='#{opts.project_id}', query=#{misc.to_json(opts.query)}, multi=#{opts.multi}, options=#{misc.to_json(opts.options)}, changes=#{misc.to_json(opts.changes)}")
1541
{err, table, client_query, require_admin, primary_keys, json_fields} = @_parse_get_query_opts(opts)
1542
1543
if err
1544
dbg("error parsing query opts -- #{err}")
1545
opts.cb(err)
1546
return
1547
1548
_query_opts = {} # this will be the input to the @_query command.
1549
locals =
1550
result : undefined
1551
changes_cb : undefined
1552
1553
async.series([
1554
(cb) =>
1555
if client_query.get.check_hook?
1556
dbg("do check hook")
1557
client_query.get.check_hook(@, opts.query, opts.account_id, opts.project_id, cb)
1558
else
1559
cb()
1560
(cb) =>
1561
if require_admin
1562
dbg('require admin')
1563
@_require_is_admin(opts.account_id, cb)
1564
else
1565
cb()
1566
(cb) =>
1567
# NOTE: _user_get_query_where may mutate opts.query (for 'null' params)
1568
# so it is important that this is called before @_user_get_query_query below.
1569
# See the TODO in userGetQueryFilter.
1570
dbg("get_query_where")
1571
@_user_get_query_where client_query, opts.account_id, opts.project_id, opts.query, opts.table, (err, where) =>
1572
_query_opts.where = where
1573
cb(err)
1574
(cb) =>
1575
if client_query.get.instead_of_query?
1576
cb();
1577
return
1578
_query_opts.query = @_user_get_query_query(table, opts.query, client_query.get.remove_from_query)
1579
x = @_user_get_query_options(opts.options, opts.multi, client_query.options)
1580
if x.err
1581
dbg("error in get_query_options, #{x.err}")
1582
cb(x.err)
1583
return
1584
misc.merge(_query_opts, x)
1585
1586
nestloop = SCHEMA[opts.table]?.pg_nestloop # true, false or undefined
1587
if typeof nestloop == 'boolean'
1588
val = if nestloop then 'on' else 'off'
1589
_query_opts.pg_params = {enable_nestloop : val}
1590
1591
indexscan = SCHEMA[opts.table]?.pg_indexscan # true, false or undefined
1592
if typeof indexscan == 'boolean'
1593
val = if indexscan then 'on' else 'off'
1594
_query_opts.pg_params = {enable_indexscan : val}
1595
1596
if opts.changes?
1597
locals.changes_cb = opts.changes.cb
1598
locals.changes_queue = []
1599
# see note about why we do the following at the bottom of this file
1600
opts.changes.cb = (err, obj) ->
1601
locals.changes_queue.push({err:err, obj:obj})
1602
dbg("getting changefeed")
1603
@_user_get_query_changefeed(opts.changes, table, primary_keys,
1604
opts.query, _query_opts.where, json_fields,
1605
opts.account_id, client_query, opts.table,
1606
cb)
1607
else
1608
cb()
1609
1610
(cb) =>
1611
if client_query.get.instead_of_query?
1612
if opts.changes?
1613
cb("changefeeds are not supported for querying this table")
1614
return
1615
# Custom version: instead of doing a full query, we instead
1616
# call a function and that's it.
1617
dbg("do instead_of_query instead")
1618
opts1 = misc.copy_without(opts, ['cb', 'changes', 'table'])
1619
client_query.get.instead_of_query @, opts1, (err, result) =>
1620
locals.result = result
1621
cb(err)
1622
return
1623
1624
if _query_opts.only_changes
1625
dbg("skipping query")
1626
locals.result = undefined
1627
cb()
1628
else
1629
dbg("finally doing query")
1630
@_user_get_query_do_query _query_opts, client_query, opts.query, opts.multi, json_fields, (err, result) =>
1631
if err
1632
cb(err)
1633
return
1634
locals.result = result
1635
cb()
1636
], (err) =>
1637
if err
1638
dbg("series failed -- err=#{err}")
1639
opts.cb(err)
1640
return
1641
dbg("series succeeded")
1642
opts.cb(undefined, locals.result)
1643
if opts.changes?
1644
dbg("sending change queue")
1645
opts.changes.cb = locals.changes_cb
1646
##dbg("sending queued #{JSON.stringify(locals.changes_queue)}")
1647
for {err, obj} in locals.changes_queue
1648
##dbg("sending queued changes #{JSON.stringify([err, obj])}")
1649
opts.changes.cb(err, obj)
1650
)
1651
1652
###
1653
Synchronized strings
1654
###
1655
_user_set_query_syncstring_change_after: (old_val, new_val, account_id, cb) =>
1656
dbg = @_dbg("_user_set_query_syncstring_change_after")
1657
cb() # return immediately -- stuff below can happen as side effect in the background.
1658
# Now do the following reactions to this syncstring change in the background:
1659
# 1. Awaken the relevant project.
1660
project_id = old_val?.project_id ? new_val?.project_id
1661
if project_id? and (new_val?.save?.state == 'requested' or (new_val?.last_active? and new_val?.last_active != old_val?.last_active))
1662
dbg("awakening project #{project_id}")
1663
awaken_project(@, project_id)
1664
1665
1666
# Verify that writing a patch is allowed.
1667
_user_set_query_patches_check: (obj, account_id, project_id, cb) =>
1668
# Reject any patch that is too new
1669
if obj.time - new Date() > MAX_PATCH_FUTURE_MS
1670
cb("clock") # this exact error is assumed in synctable!
1671
return
1672
# Write access
1673
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1674
1675
# Verify that writing a patch is allowed.
1676
_user_get_query_patches_check: (obj, account_id, project_id, cb) =>
1677
# Write access (no notion of read only yet -- will be easy to add later)
1678
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1679
1680
# Verify that writing a patch is allowed.
1681
_user_set_query_cursors_check: (obj, account_id, project_id, cb) =>
1682
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1683
1684
# Verify that writing a patch is allowed.
1685
_user_get_query_cursors_check: (obj, account_id, project_id, cb) =>
1686
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1687
1688
_syncstring_access_check: (string_id, account_id, project_id, cb) =>
1689
# Check that string_id is the id of a syncstring the given account_id or
1690
# project_id is allowed to write to. NOTE: We do not concern ourselves (for now at least)
1691
# with proof of identity (i.e., one user with full read/write access to a project
1692
# claiming they are another users of that SAME project), since our security model
1693
# is that any user of a project can edit anything there. In particular, the
1694
# synctable lets any user with write access to the project edit the users field.
1695
if string_id?.length != 40
1696
cb("FATAL: string_id (='#{string_id}') must be a string of length 40")
1697
return
1698
@_query
1699
query : "SELECT project_id FROM syncstrings"
1700
where : "string_id = $::CHAR(40)" : string_id
1701
cache : false # *MUST* leave as false (not true), since unfortunately, if this returns no, due to FATAL below this would break opening the file until cache clears.
1702
cb : one_result 'project_id', (err, x) =>
1703
if err
1704
cb(err)
1705
else if not x
1706
# There is no such syncstring with this id -- fail
1707
cb("FATAL: no such syncstring")
1708
else if account_id?
1709
# Attempt to read or write by a user browser client
1710
@_require_project_ids_in_groups(account_id, [x], ['owner', 'collaborator'], cb)
1711
else if project_id?
1712
# Attempt to read or write by a *project*
1713
if project_id == x
1714
cb()
1715
else
1716
cb("FATAL: project not allowed to write to syncstring in different project")
1717
1718
1719
# Check permissions for querying for syncstrings in a project
1720
_syncstrings_check: (obj, account_id, project_id, cb) =>
1721
#dbg = @dbg("_syncstrings_check")
1722
#dbg(misc.to_json([obj, account_id, project_id]))
1723
if not misc.is_valid_uuid_string(obj?.project_id)
1724
cb("FATAL: project_id (='#{obj?.project_id}') must be a valid uuid")
1725
return
1726
if project_id?
1727
if project_id == obj.project_id
1728
# The project can access its own syncstrings
1729
cb()
1730
else
1731
cb("FATAL: projects can only access their own syncstrings") # for now at least!
1732
return
1733
if account_id?
1734
# Access request by a client user
1735
@_require_project_ids_in_groups(account_id, [obj.project_id], ['owner', 'collaborator'], cb)
1736
else
1737
cb("FATAL: only users and projects can access syncstrings")
1738
1739
# Other functions that are needed to implement various use queries,
1740
# e.g., for virtual queries like file_use_times.
1741
# ASYNC FUNCTION with no callback.
1742
updateRetentionData: (opts) =>
1743
return await updateRetentionData(opts)
1744
1745
_last_awaken_time = {}
1746
awaken_project = (db, project_id, cb) ->
1747
# throttle so that this gets called *for a given project* at most once every 30s.
1748
now = new Date()
1749
if _last_awaken_time[project_id]? and now - _last_awaken_time[project_id] < 30000
1750
return
1751
_last_awaken_time[project_id] = now
1752
dbg = db._dbg("_awaken_project(project_id=#{project_id})")
1753
if not db.projectControl?
1754
dbg("skipping since no projectControl defined")
1755
return
1756
dbg("doing it...")
1757
async.series([
1758
(cb) ->
1759
try
1760
project = db.projectControl(project_id)
1761
await project.start()
1762
cb()
1763
catch err
1764
cb("error starting project = #{err}")
1765
(cb) ->
1766
if not db.ensure_connection_to_project?
1767
cb()
1768
return
1769
dbg("also make sure there is a connection from hub to project")
1770
# This is so the project can find out that the user wants to save a file (etc.)
1771
db.ensure_connection_to_project(project_id, cb)
1772
], (err) ->
1773
if err
1774
dbg("awaken project error -- #{err}")
1775
else
1776
dbg("success awakening project")
1777
cb?(err)
1778
)
1779
###
1780
Note about opts.changes.cb:
1781
1782
Regarding sync, what was happening I think is:
1783
- (a) https://github.com/sagemathinc/cocalc/blob/master/src/packages/hub/postgres-user-queries.coffee#L1384 starts sending changes
1784
- (b) https://github.com/sagemathinc/cocalc/blob/master/src/packages/hub/postgres-user-queries.coffee#L1393 sends the full table.
1785
1786
(a) could result in changes actually getting to the client before the table itself has been initialized. The client code assumes that it only gets changes *after* the table is initialized. The browser client seems to be smart enough that it detects this situation and resets itself, so the browser never gets messed up as a result.
1787
However, the project definitely does NOT do so well, and it can get messed up. Then it has a broken version of the table, missing some last minute change. It is broken until the project forgets about that table entirely, which is can be a pretty long time (or project restart).
1788
1789
My fix is to queue up those changes on the server, then only start sending them to the client **after** the (b) query is done. I tested this by using setTimeout to manually delay (b) for a few seconds, and fully seeing the "file won't save problem". The other approach would make it so clients are more robust against getting changes first. However, it would take a long time for all clients to update (restart all projects), and it's an annoying assumption to make in general -- we may have entirely new clients later and they could make the same bad assumptions about order...
1790
###
1791
1792