Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-base.coffee
1496 views
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
# PostgreSQL -- basic queries and database interface
7
8
exports.DEBUG = true
9
10
# If database connection is non-responsive but no error raised directly
11
# by db client, then we will know and fix, rather than just sitting there...
12
DEFAULT_TIMEOUS_MS = 60000
13
14
# Do not test for non-responsiveness until a while after initial connection
15
# established, since things tend to work initially, *but* may also be much
16
# slower, due to tons of clients simultaneously connecting to DB.
17
DEFAULT_TIMEOUT_DELAY_MS = DEFAULT_TIMEOUS_MS * 4
18
19
QUERY_ALERT_THRESH_MS=5000
20
21
consts = require('./consts')
22
DEFAULT_STATEMENT_TIMEOUT_MS = consts.STATEMENT_TIMEOUT_MS
23
24
EventEmitter = require('events')
25
26
fs = require('fs')
27
async = require('async')
28
escapeString = require('sql-string-escape')
29
validator = require('validator')
30
{callback2} = require('@cocalc/util/async-utils')
31
32
LRU = require('lru-cache')
33
34
pg = require('pg')
35
36
winston = require('@cocalc/backend/logger').getLogger('postgres')
37
{do_query_with_pg_params} = require('./postgres/set-pg-params')
38
39
{ syncSchema } = require('./postgres/schema')
40
{ pgType } = require('./postgres/schema/pg-type')
41
{ quoteField } = require('./postgres/schema/util')
42
{ primaryKey, primaryKeys } = require('./postgres/schema/table')
43
44
misc_node = require('@cocalc/backend/misc_node')
45
{ sslConfigToPsqlEnv, pghost, pgdatabase, pguser, pgssl } = require("@cocalc/backend/data")
46
47
{ recordConnected, recordDisconnected } = require("./postgres/record-connect-error")
48
49
{defaults} = misc = require('@cocalc/util/misc')
50
required = defaults.required
51
52
{SCHEMA, client_db} = require('@cocalc/util/schema')
53
54
metrics = require('@cocalc/backend/metrics')
55
56
exports.PUBLIC_PROJECT_COLUMNS = ['project_id', 'last_edited', 'title', 'description', 'deleted', 'created', 'env']
57
exports.PROJECT_COLUMNS = ['users'].concat(exports.PUBLIC_PROJECT_COLUMNS)
58
59
dbPassword = require('@cocalc/database/pool/password').default;
60
61
class exports.PostgreSQL extends EventEmitter # emits a 'connect' event whenever we successfully connect to the database and 'disconnect' when connection to postgres fails
62
constructor: (opts) ->
63
64
super()
65
opts = defaults opts,
66
host : pghost # DEPRECATED: or 'hostname:port' or 'host1,host2,...' (multiple hosts) -- TODO -- :port only works for one host.
67
database : pgdatabase
68
user : pguser
69
ssl : pgssl
70
debug : exports.DEBUG
71
connect : true
72
password : undefined
73
cache_expiry : 5000 # expire cached queries after this many milliseconds
74
# keep this very short; it's just meant to reduce impact of a bunch of
75
# identical permission checks in a single user query.
76
cache_size : 300 # cache this many queries; use @_query(cache:true, ...) to cache result
77
concurrent_warn : 500
78
concurrent_heavily_loaded : 70 # when concurrent hits this, consider load "heavy"; this changes home some queries behave to be faster but provide less info
79
ensure_exists : true # ensure database exists on startup (runs psql in a shell)
80
timeout_ms : DEFAULT_TIMEOUS_MS # **IMPORTANT: if *any* query takes this long, entire connection is terminated and recreated!**
81
timeout_delay_ms : DEFAULT_TIMEOUT_DELAY_MS # Only reconnect on timeout this many ms after connect. Motivation: on initial startup queries may take much longer due to competition with other clients.
82
@setMaxListeners(0) # because of a potentially large number of changefeeds
83
@_state = 'init'
84
@_debug = opts.debug
85
@_timeout_ms = opts.timeout_ms
86
@_timeout_delay_ms = opts.timeout_delay_ms
87
@_ensure_exists = opts.ensure_exists
88
@_init_test_query()
89
dbg = @_dbg("constructor") # must be after setting @_debug above
90
dbg(opts)
91
i = opts.host.indexOf(':')
92
if i != -1
93
@_host = opts.host.slice(0, i)
94
@_port = parseInt(opts.host.slice(i+1))
95
else
96
@_host = opts.host
97
@_port = 5432
98
@_concurrent_warn = opts.concurrent_warn
99
@_concurrent_heavily_loaded = opts.concurrent_heavily_loaded
100
@_user = opts.user
101
@_database = opts.database
102
@_ssl = opts.ssl
103
@_password = opts.password ? dbPassword()
104
@_init_metrics()
105
106
if opts.cache_expiry and opts.cache_size
107
@_query_cache = new LRU({max:opts.cache_size, ttl: opts.cache_expiry})
108
if opts.connect
109
@connect() # start trying to connect
110
111
clear_cache: =>
112
@_query_cache?.reset()
113
114
close: =>
115
if @_state == 'closed'
116
return # nothing to do
117
@_close_test_query()
118
@_state = 'closed'
119
@emit('close')
120
@removeAllListeners()
121
if @_clients?
122
for client in @_clients
123
client.removeAllListeners()
124
client.end()
125
delete @_clients
126
127
###
128
If @_timeout_ms is set, then we periodically do a simple test query,
129
to ensure that the database connection is working and responding to queries.
130
If the query below times out, then the connection will get recreated.
131
###
132
_do_test_query: =>
133
dbg = @_dbg('test_query')
134
dbg('starting')
135
@_query
136
query : 'SELECT NOW()'
137
cb : (err, result) =>
138
dbg("finished", err, result)
139
140
_init_test_query: =>
141
if not @_timeout_ms
142
return
143
@_test_query = setInterval(@_do_test_query, @_timeout_ms)
144
145
_close_test_query: =>
146
if @_test_query?
147
clearInterval(@_test_query)
148
delete @_test_query
149
150
engine: -> 'postgresql'
151
152
connect: (opts) =>
153
opts = defaults opts,
154
max_time : undefined # set to something shorter to not try forever
155
# Only first max_time is used.
156
cb : undefined
157
if @_state == 'closed'
158
opts.cb?("closed")
159
return
160
dbg = @_dbg("connect")
161
if @_clients?
162
dbg("already connected")
163
opts.cb?()
164
return
165
if @_connecting?
166
dbg('already trying to connect')
167
@_connecting.push(opts.cb)
168
# keep several times the db-concurrent-warn limit of callbacks
169
max_connecting = 5 * @_concurrent_warn
170
while @_connecting.length > max_connecting
171
@_connecting.shift()
172
dbg("WARNING: still no DB available, dropping old callbacks (limit: #{max_connecting})")
173
return
174
dbg('will try to connect')
175
@_state = 'init'
176
if opts.max_time
177
dbg("for up to #{opts.max_time}ms")
178
else
179
dbg("until successful")
180
@_connecting = [opts.cb]
181
misc.retry_until_success
182
f : @_connect
183
max_delay : 10000
184
max_time : opts.max_time
185
start_delay : 500 + 500*Math.random()
186
log : dbg
187
cb : (err) =>
188
v = @_connecting
189
delete @_connecting
190
for cb in v
191
cb?(err)
192
if not err
193
@_state = 'connected'
194
@emit('connect')
195
recordConnected()
196
197
disconnect: () =>
198
if @_clients?
199
for client in @_clients
200
client.end()
201
client.removeAllListeners()
202
delete @_clients
203
204
is_connected: () =>
205
return @_clients? and @_clients.length > 0
206
207
_connect: (cb) =>
208
dbg = @_dbg("_connect")
209
dbg("connect to #{@_host}")
210
@_clear_listening_state() # definitely not listening
211
if @_clients?
212
@disconnect()
213
locals =
214
clients : []
215
hosts : []
216
@_connect_time = 0
217
@_concurrent_queries = 0 # can't be any going on now.
218
async.series([
219
(cb) =>
220
if @_ensure_exists
221
dbg("first make sure db exists")
222
@_ensure_database_exists(cb)
223
else
224
dbg("assuming database exists")
225
cb()
226
(cb) =>
227
if not @_host # undefined if @_host=''
228
locals.hosts = [undefined]
229
cb()
230
return
231
if @_host.indexOf('/') != -1
232
dbg("using a local socket file (not a hostname)")
233
locals.hosts = [@_host]
234
cb()
235
return
236
f = (host, cb) =>
237
hostname = host.split(':')[0]
238
winston.debug("Looking up ip addresses of #{hostname}")
239
require('dns').lookup hostname, {all:true}, (err, ips) =>
240
if err
241
winston.debug("Got #{hostname} --> err=#{err}")
242
# NON-FATAL -- we just don't include these and hope to
243
# have at least one total working host...
244
cb()
245
else
246
winston.debug("Got #{hostname} --> #{JSON.stringify(ips)}")
247
# In kubernetes the stateful set service just has
248
# lots of ip address. We connect to *all* of them,
249
# and spread queries across them equally.
250
for x in ips
251
locals.hosts.push(x.address)
252
cb()
253
async.map(@_host.split(','), f, (err) => cb(err))
254
(cb) =>
255
dbg("connecting to #{JSON.stringify(locals.hosts)}...")
256
if locals.hosts.length == 0
257
dbg("locals.hosts has length 0 -- no available db")
258
cb("no databases available")
259
return
260
261
dbg("create client and start connecting...")
262
locals.clients = []
263
264
# Use a function to initialize the client, to avoid any issues with scope of "client" below.
265
# Ref: https://node-postgres.com/apis/client
266
init_client = (host) =>
267
client = new pg.Client
268
user : @_user
269
host : host
270
port : @_port
271
password : @_password
272
database : @_database
273
ssl : @_ssl
274
statement_timeout: DEFAULT_STATEMENT_TIMEOUT_MS # we set a statement_timeout, to avoid queries locking up PG
275
if @_notification?
276
client.on('notification', @_notification)
277
onError = (err) =>
278
# only listen once for error; after that we've
279
# killed connection and don't care.
280
client.removeListener('error', onError)
281
if @_state == 'init'
282
# already started connecting
283
return
284
@emit('disconnect')
285
recordDisconnected()
286
dbg("error -- #{err}")
287
@disconnect()
288
@connect() # start trying to reconnect
289
client.on('error', onError)
290
client.setMaxListeners(0) # there is one emitter for each concurrent query... (see query_cb)
291
locals.clients.push(client)
292
293
for host in locals.hosts
294
init_client(host)
295
296
# Connect the clients. If at least one succeeds, we use this.
297
# If none succeed, we declare failure.
298
# Obviously, this is NOT optimal -- it's just hopefully sufficiently robust/works.
299
# I'm going to redo this with experience.
300
locals.clients_that_worked = []
301
locals.errors = []
302
f = (client, c) =>
303
try
304
await client.connect()
305
locals.clients_that_worked.push(client)
306
catch err
307
locals.errors.push(err)
308
c()
309
async.map locals.clients, f, () =>
310
if locals.clients_that_worked.length == 0
311
console.warn("ALL clients failed", locals.errors)
312
dbg("ALL clients failed", locals.errors)
313
cb("ALL clients failed to connect")
314
else
315
# take what we got
316
if locals.clients.length == locals.clients_that_worked.length
317
dbg("ALL clients worked")
318
else
319
dbg("ONLY #{locals.clients_that_worked.length} clients worked")
320
locals.clients = locals.clients_that_worked
321
dbg("cb = ", cb)
322
cb()
323
324
(cb) =>
325
@_connect_time = new Date()
326
locals.i = 0
327
328
# Weird and unfortunate fact -- this query can and does **HANG** never returning
329
# in some edge cases. That's why we have to be paranoid about this entire _connect
330
# function...
331
f = (client, cb) =>
332
it_hung = =>
333
cb?("hung")
334
cb = undefined
335
timeout = setTimeout(it_hung, 15000)
336
dbg("now connected; checking if we can actually query the DB via client #{locals.i}")
337
locals.i += 1
338
client.query "SELECT NOW()", (err) =>
339
clearTimeout(timeout)
340
cb?(err)
341
async.map(locals.clients, f, cb)
342
(cb) =>
343
dbg("checking if ANY db server is in recovery, i.e., we are doing standby queries only")
344
@is_standby = false
345
f = (client, cb) =>
346
# Is this a read/write or read-only connection?
347
client.query "SELECT pg_is_in_recovery()", (err, resp) =>
348
if err
349
cb(err)
350
else
351
# True if ANY db connection is read only.
352
if resp.rows[0].pg_is_in_recovery
353
@is_standby = true
354
cb()
355
async.map(locals.clients, f, cb)
356
], (err) =>
357
if err
358
mesg = "Failed to connect to database -- #{err}"
359
dbg(mesg)
360
console.warn(mesg) # make it clear for interactive users with debugging off -- common mistake with env not setup right.
361
# If we're unable to connect (or all clients fail), we are disconnected. This tells postgres/record-connect-error.ts about this problem.
362
# See https://github.com/sagemathinc/cocalc/issues/5997 for some logs related to that.
363
@emit('disconnect')
364
recordDisconnected()
365
cb?(err)
366
else
367
@_clients = locals.clients
368
@_concurrent_queries = 0
369
dbg("connected!")
370
cb?(undefined, @)
371
)
372
373
# Return a native pg client connection. This will
374
# round robbin through all connections. It returns
375
# undefined if there are no connections.
376
_client: =>
377
if not @_clients?
378
return
379
if @_clients.length <= 1
380
return @_clients[0]
381
@_client_index ?= -1
382
@_client_index = @_client_index + 1
383
if @_client_index >= @_clients.length
384
@_client_index = 0
385
return @_clients[@_client_index]
386
387
# Return query function of a database connection.
388
get_db_query: =>
389
db = @_client()
390
return db?.query.bind(db)
391
392
_dbg: (f) =>
393
if @_debug
394
return (m) => winston.debug("PostgreSQL.#{f}: #{misc.trunc_middle(JSON.stringify(m), 250)}")
395
else
396
return ->
397
398
_init_metrics: =>
399
# initialize metrics
400
try
401
@query_time_histogram = metrics.newHistogram('db', 'query_ms_histogram', 'db queries'
402
buckets : [1, 5, 10, 20, 50, 100, 200, 500, 1000, 5000, 10000]
403
labels: ['table']
404
)
405
@concurrent_counter = metrics.newCounter('db', 'concurrent_total',
406
'Concurrent queries (started and finished)',
407
['state']
408
)
409
catch err
410
@_dbg("_init_metrics")("WARNING -- #{err}")
411
412
async_query: (opts) =>
413
return await callback2(@_query.bind(@), opts)
414
415
_query: (opts) =>
416
opts = defaults opts,
417
query : undefined # can give select and table instead
418
select : undefined # if given, should be string or array of column names -| can give these
419
table : undefined # if given, name of table -| two instead of query
420
params : []
421
cache : false # Will cache results for a few seconds or use cache. Use this
422
# when speed is very important, and results that are a few seconds
423
# out of date are fine.
424
where : undefined # Used for SELECT: If given, can be
425
# - a map with keys clauses with $::TYPE (not $1::TYPE!) and values
426
# the corresponding params. Also, WHERE must not be in the query already.
427
# If where[cond] is undefined, then cond is completely **ignored**.
428
# - a string, which is inserted as is as a normal WHERE condition.
429
# - an array of maps or strings.
430
set : undefined # Appends a SET clause to the query; same format as values.
431
values : undefined # Used for INSERT: If given, then params and where must not be given. Values is a map
432
# {'field1::type1':value, , 'field2::type2':value2, ...} which gets converted to
433
# ' (field1, field2, ...) VALUES ($1::type1, $2::type2, ...) '
434
# with corresponding params set. Undefined valued fields are ignored and types may
435
# be omitted. Javascript null is not ignored and converts to PostgreSQL NULL.
436
conflict : undefined # If given, then values must also be given; appends this to query:
437
# ON CONFLICT (name) DO UPDATE SET value=EXCLUDED.value'
438
# Or, if conflict starts with "ON CONFLICT", then just include as is, e.g.,
439
# "ON CONFLICT DO NOTHING"
440
jsonb_set : undefined # Used for setting a field that contains a JSONB javascript map.
441
# NOTE: This does some merging! If you just want to replace the whole thing use the normal set above.
442
# Give as input an object
443
#
444
# { field1:{key1:val1, key2:val2, ...}, field2:{key3:val3,...}, ...}
445
#
446
# In each field, every key has the corresponding value set, unless val is undefined/null, in which
447
# case that key is deleted from the JSONB object fieldi. Simple as that! This is much, much
448
# cleaner to use than SQL. Also, if the value in field itself is NULL, it gets
449
# created automatically.
450
jsonb_merge : undefined # Exactly like jsonb_set, but when val1 (say) is an object, it merges that object in,
451
# *instead of* setting field1[key1]=val1. So after this field1[key1] has what was in it
452
# and also what is in val1. Obviously field1[key1] had better have been an array or NULL.
453
order_by : undefined
454
limit : undefined
455
offset : undefined
456
safety_check: true
457
retry_until_success : undefined # if given, should be options to misc.retry_until_success
458
pg_params : undefined # key/value map of postgres parameters, which will be set for the query in a single transaction
459
timeout_s : undefined # by default, there is a "statement_timeout" set. set to 0 to disable or a number in seconds
460
cb : undefined
461
462
# quick check for write query against read-only connection
463
if @is_standby and (opts.set? or opts.jsonb_set? or opts.jsonb_merge?)
464
opts.cb?("set queries against standby not allowed")
465
return
466
467
if opts.retry_until_success
468
@_query_retry_until_success(opts)
469
return
470
471
if not @is_connected()
472
dbg = @_dbg("_query")
473
dbg("connecting first...")
474
# 2022-06: below there was {max_time: 45000} set with the note
475
# "don't try forever; queries could pile up."
476
# but I think this is rather harmful, since the hub could stop
477
# trying to connect to the database altogether.
478
# Rather, hub/health-checks::checkDBConnectivity will
479
# mark the hub as being bad if it can't connect to the database.
480
@connect
481
cb : (err) =>
482
if err
483
dbg("FAILED to connect -- #{err}")
484
opts.cb?("database is down (please try later)")
485
else
486
dbg("connected, now doing query")
487
@__do_query(opts)
488
else
489
@__do_query(opts)
490
491
_query_retry_until_success: (opts) =>
492
retry_opts = opts.retry_until_success
493
orig_cb = opts.cb
494
delete opts.retry_until_success
495
496
# f just calls @_do_query, but with a different cb (same opts)
497
args = undefined
498
f = (cb) =>
499
opts.cb = (args0...) =>
500
args = args0
501
cb(args[0])
502
@_query(opts)
503
504
retry_opts.f = f
505
# When misc.retry_until_success finishes, it calls this, which just
506
# calls the original cb.
507
retry_opts.cb = (err) =>
508
if err
509
orig_cb?(err)
510
else
511
orig_cb?(args...)
512
513
# OK, now start it attempting.
514
misc.retry_until_success(retry_opts)
515
516
__do_query: (opts) =>
517
dbg = @_dbg("__do_query('#{misc.trunc(opts.query?.replace(/\n/g, " "),250)}',id='#{misc.uuid().slice(0,6)}')")
518
if not @is_connected()
519
# TODO: should also check that client is connected.
520
opts.cb?("client not yet initialized")
521
return
522
if opts.params? and not misc.is_array(opts.params)
523
opts.cb?("params must be an array")
524
return
525
if not opts.query?
526
if not opts.table?
527
opts.cb?("if query not given, then table must be given")
528
return
529
if not opts.select?
530
opts.select = '*'
531
if misc.is_array(opts.select)
532
opts.select = (quote_field(field) for field in opts.select).join(',')
533
opts.query = "SELECT #{opts.select} FROM \"#{opts.table}\""
534
delete opts.select
535
536
push_param = (param, type) ->
537
if type?.toUpperCase() == 'JSONB'
538
param = misc.to_json(param) # I don't understand why this is needed by the driver....
539
opts.params.push(param)
540
return opts.params.length
541
542
if opts.jsonb_merge?
543
if opts.jsonb_set?
544
opts.cb?("if jsonb_merge is set then jsonb_set must not be set")
545
return
546
opts.jsonb_set = opts.jsonb_merge
547
548
SET = []
549
if opts.jsonb_set?
550
# This little piece of very hard to write (and clever?) code
551
# makes it so we can set or **merge in at any nested level** (!)
552
# arbitrary JSON objects. We can also delete any key at any
553
# level by making the value null or undefined! This is amazingly
554
# easy to use in queries -- basically making JSONP with postgres
555
# as expressive as RethinkDB REQL (even better in some ways).
556
set = (field, data, path) =>
557
obj = "COALESCE(#{field}#>'{#{path.join(',')}}', '{}'::JSONB)"
558
for key, val of data
559
if not val?
560
# remove key from object
561
obj = "(#{obj} - '#{key}')"
562
else
563
if opts.jsonb_merge? and (typeof(val) == 'object' and not misc.is_date(val))
564
subobj = set(field, val, path.concat([key]))
565
obj = "JSONB_SET(#{obj}, '{#{key}}', #{subobj})"
566
else
567
# completely replace field[key] with val.
568
obj = "JSONB_SET(#{obj}, '{#{key}}', $#{push_param(val, 'JSONB')}::JSONB)"
569
return obj
570
v = ("#{field}=#{set(field, data, [])}" for field, data of opts.jsonb_set)
571
SET.push(v...)
572
573
if opts.values?
574
#dbg("values = #{misc.to_json(opts.values)}")
575
if opts.where?
576
opts.cb?("where must not be defined if opts.values is defined")
577
return
578
579
if misc.is_array(opts.values)
580
# An array of numerous separate object that we will insert all at once.
581
# Determine the fields, which as the union of the keys of all values.
582
fields = {}
583
for x in opts.values
584
if not misc.is_object(x)
585
opts.cb?("if values is an array, every entry must be an object")
586
return
587
for k, p of x
588
fields[k] = true
589
# convert to array
590
fields = misc.keys(fields)
591
fields_to_index = {}
592
n = 0
593
for field in fields
594
fields_to_index[field] = n
595
n += 1
596
values = []
597
for x in opts.values
598
value = []
599
for field, param of x
600
if field.indexOf('::') != -1
601
[field, type] = field.split('::')
602
type = type.trim()
603
y = "$#{push_param(param, type)}::#{type}"
604
else
605
y = "$#{push_param(param)}"
606
value[fields_to_index[field]] = y
607
values.push(value)
608
else
609
# A single entry that we'll insert.
610
611
fields = []
612
values = []
613
for field, param of opts.values
614
if param == undefined
615
# ignore undefined fields -- makes code cleaner (and makes sense)
616
continue
617
if field.indexOf('::') != -1
618
[field, type] = field.split('::')
619
fields.push(quote_field(field.trim()))
620
type = type.trim()
621
values.push("$#{push_param(param, type)}::#{type}")
622
continue
623
else
624
fields.push(quote_field(field))
625
values.push("$#{push_param(param)}")
626
values = [values] # just one
627
628
if values.length > 0
629
opts.query += " (#{(quote_field(field) for field in fields).join(',')}) VALUES " + (" (#{value.join(',')}) " for value in values).join(',')
630
631
if opts.set?
632
v = []
633
for field, param of opts.set
634
if field.indexOf('::') != -1
635
[field, type] = field.split('::')
636
type = type.trim()
637
v.push("#{quote_field(field.trim())}=$#{push_param(param, type)}::#{type}")
638
continue
639
else
640
v.push("#{quote_field(field.trim())}=$#{push_param(param)}")
641
if v.length > 0
642
SET.push(v...)
643
644
if opts.conflict?
645
if misc.is_string(opts.conflict) and misc.startswith(opts.conflict.toLowerCase().trim(), 'on conflict')
646
# Straight string inclusion
647
opts.query += ' ' + opts.conflict + ' '
648
else
649
if not opts.values?
650
opts.cb?("if conflict is specified then values must also be specified")
651
return
652
if not misc.is_array(opts.conflict)
653
if typeof(opts.conflict) != 'string'
654
opts.cb?("conflict (='#{misc.to_json(opts.conflict)}') must be a string (the field name), for now")
655
return
656
else
657
conflict = [opts.conflict]
658
else
659
conflict = opts.conflict
660
v = ("#{quote_field(field)}=EXCLUDED.#{field}" for field in fields when field not in conflict)
661
SET.push(v...)
662
if SET.length == 0
663
opts.query += " ON CONFLICT (#{conflict.join(',')}) DO NOTHING "
664
else
665
opts.query += " ON CONFLICT (#{conflict.join(',')}) DO UPDATE "
666
667
if SET.length > 0
668
opts.query += " SET " + SET.join(' , ')
669
670
WHERE = []
671
push_where = (x) =>
672
if typeof(x) == 'string'
673
WHERE.push(x)
674
else if misc.is_array(x)
675
for v in x
676
push_where(v)
677
else if misc.is_object(x)
678
for cond, param of x
679
if typeof(cond) != 'string'
680
opts.cb?("each condition must be a string but '#{cond}' isn't")
681
return
682
if not param?
683
# *IGNORE* where conditions where value is explicitly undefined
684
# Note that in SQL NULL is not a value and there is no way to use it in placeholder
685
# anyways, so this can never work.
686
continue
687
if cond.indexOf('$') == -1
688
# where condition is missing it's $ parameter -- default to equality
689
cond += " = $"
690
WHERE.push(cond.replace('$', "$#{push_param(param)}"))
691
692
if opts.where?
693
push_where(opts.where)
694
695
if WHERE.length > 0
696
if opts.values?
697
opts.cb?("values must not be given if where clause given")
698
return
699
opts.query += " WHERE #{WHERE.join(' AND ')}"
700
701
if opts.order_by?
702
if opts.order_by.indexOf("'") >= 0
703
err = "ERROR -- detected ' apostrophe in order_by='#{opts.order_by}'"
704
dbg(err)
705
opts.cb?(err)
706
return
707
opts.query += " ORDER BY #{opts.order_by}"
708
709
if opts.limit?
710
if not validator.isInt('' + opts.limit, min:0)
711
err = "ERROR -- opts.limit = '#{opts.limit}' is not an integer"
712
dbg(err)
713
opts.cb?(err)
714
return
715
opts.query += " LIMIT #{opts.limit} "
716
717
if opts.offset?
718
if not validator.isInt('' + opts.offset, min:0)
719
err = "ERROR -- opts.offset = '#{opts.offset}' is not an integer"
720
dbg(err)
721
opts.cb?(err)
722
return
723
opts.query += " OFFSET #{opts.offset} "
724
725
726
727
if opts.safety_check
728
safety_check = opts.query.toLowerCase().trim()
729
if (safety_check.startsWith('update') or safety_check.startsWith('delete')) and (safety_check.indexOf('where') == -1 and safety_check.indexOf('trigger') == -1 and safety_check.indexOf('insert') == -1 and safety_check.indexOf('create') == -1)
730
# This is always a bug.
731
err = "ERROR -- Dangerous UPDATE or DELETE without a WHERE, TRIGGER, or INSERT: query='#{opts.query}'"
732
dbg(err)
733
opts.cb?(err)
734
return
735
736
if opts.cache and @_query_cache?
737
# check for cached result
738
full_query_string = JSON.stringify([opts.query, opts.params])
739
if (x = @_query_cache.get(full_query_string))?
740
dbg("using cache for '#{opts.query}'")
741
opts.cb?(x...)
742
return
743
744
# params can easily be huge, e.g., a blob. But this may be
745
# needed at some point for debugging.
746
#dbg("query='#{opts.query}', params=#{misc.to_json(opts.params)}")
747
client = @_client()
748
if not client?
749
opts.cb?("not connected")
750
return
751
@_concurrent_queries ?= 0
752
@_concurrent_queries += 1
753
dbg("query='#{opts.query} (concurrent=#{@_concurrent_queries})'")
754
755
@concurrent_counter?.labels('started').inc(1)
756
try
757
start = new Date()
758
if @_timeout_ms and @_timeout_delay_ms
759
# Create a timer, so that if the query doesn't return within
760
# timeout_ms time, then the entire connection is destroyed.
761
# It then gets recreated automatically. I tested
762
# and all outstanding queries also get an error when this happens.
763
timeout_error = =>
764
# Only disconnect with timeout error if it has been sufficiently long
765
# since connecting. This way when an error is triggered, all the
766
# outstanding timers at the moment of the error will just get ignored
767
# when they fire (since @_connect_time is 0 or too recent).
768
if @_connect_time and new Date() - @_connect_time > @_timeout_delay_ms
769
client.emit('error', 'timeout')
770
timer = setTimeout(timeout_error, @_timeout_ms)
771
772
# PAINFUL FACT: In client.query below, if the client is closed/killed/errored
773
# (especially via client.emit above), then none of the callbacks from
774
# client.query are called!
775
finished = false
776
error_listener = ->
777
dbg("error_listener fired")
778
query_cb('error')
779
client.once('error', error_listener)
780
query_cb = (err, result) =>
781
if finished # ensure no matter what that query_cb is called at most once.
782
dbg("called when finished (ignoring)")
783
return
784
finished = true
785
client.removeListener('error', error_listener)
786
787
if @_timeout_ms
788
clearTimeout(timer)
789
query_time_ms = new Date() - start
790
@_concurrent_queries -= 1
791
@query_time_histogram?.observe({table:opts.table ? ''}, query_time_ms)
792
@concurrent_counter?.labels('ended').inc(1)
793
if err
794
dbg("done (concurrent=#{@_concurrent_queries}), (query_time_ms=#{query_time_ms}) -- error: #{err}")
795
## DANGER
796
# Only uncomment this for low level debugging!
797
#### dbg("params = #{JSON.stringify(opts.params)}")
798
##
799
err = 'postgresql ' + err
800
else
801
dbg("done (concurrent=#{@_concurrent_queries}) (query_time_ms=#{query_time_ms}) -- success")
802
if opts.cache and @_query_cache?
803
@_query_cache.set(full_query_string, [err, result])
804
opts.cb?(err, result)
805
if query_time_ms >= QUERY_ALERT_THRESH_MS
806
dbg("QUERY_ALERT_THRESH: query_time_ms=#{query_time_ms}\nQUERY_ALERT_THRESH: query='#{opts.query}'\nQUERY_ALERT_THRESH: params='#{misc.to_json(opts.params)}'")
807
808
# set a timeout for one specific query (there is a default when creating the pg.Client, see @_connect)
809
if opts.timeout_s? and typeof opts.timeout_s == 'number' and opts.timeout_s >= 0
810
dbg("set query timeout to #{opts.timeout_s}secs")
811
opts.pg_params ?= {}
812
# the actual param is in milliseconds
813
# https://postgresqlco.nf/en/doc/param/statement_timeout/
814
opts.pg_params.statement_timeout = 1000 * opts.timeout_s
815
816
if opts.pg_params?
817
dbg("run query with specific postgres parameters in a transaction")
818
do_query_with_pg_params(client: client, query: opts.query, params: opts.params, pg_params:opts.pg_params, cb: query_cb)
819
else
820
client.query(opts.query, opts.params, query_cb)
821
822
catch e
823
# this should never ever happen
824
dbg("EXCEPTION in client.query: #{e}")
825
opts.cb?(e)
826
@_concurrent_queries -= 1
827
@concurrent_counter?.labels('ended').inc(1)
828
return
829
830
# Special case of query for counting entries in a table.
831
_count: (opts) =>
832
opts = defaults opts,
833
table : required
834
where : undefined # as in _query
835
cb : required
836
@_query
837
query : "SELECT COUNT(*) FROM #{opts.table}"
838
where : opts.where
839
cb : count_result(opts.cb)
840
841
_validate_opts: (opts) =>
842
for k, v of opts
843
if k.slice(k.length-2) == 'id'
844
if v? and not misc.is_valid_uuid_string(v)
845
opts.cb?("invalid #{k} -- #{v}")
846
return false
847
if k.slice(k.length-3) == 'ids'
848
for w in v
849
if not misc.is_valid_uuid_string(w)
850
opts.cb?("invalid uuid #{w} in #{k} -- #{misc.to_json(v)}")
851
return false
852
if k == 'group' and v not in misc.PROJECT_GROUPS
853
opts.cb?("unknown project group '#{v}'"); return false
854
if k == 'groups'
855
for w in v
856
if w not in misc.PROJECT_GROUPS
857
opts.cb?("unknown project group '#{w}' in groups"); return false
858
return true
859
860
_ensure_database_exists: (cb) =>
861
dbg = @_dbg("_ensure_database_exists")
862
dbg("ensure database '#{@_database}' exists")
863
args = ['--user', @_user, '--host', @_host.split(',')[0], '--port', @_port, '--list', '--tuples-only']
864
sslEnv = sslConfigToPsqlEnv(@_ssl)
865
dbg("psql #{args.join(' ')}")
866
misc_node.execute_code
867
command : 'psql'
868
args : args
869
env : Object.assign sslEnv,
870
PGPASSWORD : @_password
871
cb : (err, output) =>
872
if err
873
cb(err)
874
return
875
databases = (x.split('|')[0].trim() for x in output.stdout.split('\n') when x)
876
if @_database in databases
877
dbg("database '#{@_database}' already exists")
878
cb()
879
return
880
dbg("creating database '#{@_database}'")
881
misc_node.execute_code
882
command : 'createdb'
883
args : ['--host', @_host, '--port', @_port, @_database]
884
env :
885
PGPASSWORD : @_password
886
cb : cb
887
888
_confirm_delete: (opts) =>
889
opts = defaults opts,
890
confirm : 'no'
891
cb : required
892
dbg = @_dbg("confirm")
893
if opts.confirm != 'yes'
894
err = "Really delete all data? -- you must explicitly pass in confirm='yes' (but confirm:'#{opts.confirm}')"
895
dbg(err)
896
opts.cb(err)
897
return false
898
else
899
return true
900
901
set_random_password: (opts) =>
902
throw Error("NotImplementedError")
903
904
# This will fail if any other clients have db open.
905
# This function is very important for automated testing.
906
delete_entire_database: (opts) =>
907
dbg = @_dbg("delete_entire_database")
908
dbg("deleting database '#{@_database}'")
909
if not @_confirm_delete(opts)
910
dbg("failed confirmation")
911
return
912
async.series([
913
(cb) =>
914
dbg("disconnect from db")
915
@disconnect()
916
cb()
917
(cb) =>
918
misc_node.execute_code
919
command : 'dropdb'
920
args : ['--host', @_host, '--port', @_port, @_database]
921
cb : cb
922
], opts.cb)
923
924
# Deletes all the contents of the tables in the database. It doesn't
925
# delete anything about the schema itself: indexes or tables.
926
delete_all: (opts) =>
927
dbg = @_dbg("delete_all")
928
dbg("deleting all contents of tables in '#{@_database}'")
929
if not @_confirm_delete(opts)
930
return
931
932
# If the cache is enabled, be sure to also clear it.
933
@clear_cache()
934
935
tables = undefined
936
937
# Delete anything cached in the db object. Obviously, not putting something here
938
# is a natural place in which to cause bugs... but they will probably all be bugs
939
# of the form "the test suite fails", so we'll find them.
940
delete @_stats_cached
941
942
# Actually delete tables
943
async.series([
944
(cb) =>
945
@_get_tables (err, t) =>
946
tables = t; cb(err)
947
(cb) =>
948
f = (table, cb) =>
949
@_query
950
query : "DELETE FROM #{table}"
951
safety_check : false
952
cb : cb
953
async.map(tables, f, cb)
954
], opts.cb)
955
956
# return list of tables in the database
957
_get_tables: (cb) =>
958
@_query
959
query : "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
960
cb : (err, result) =>
961
if err
962
cb(err)
963
else
964
cb(undefined, (row.table_name for row in result.rows))
965
966
# Return list of columns in a given table
967
_get_columns: (table, cb) =>
968
@_query
969
query : "SELECT column_name FROM information_schema.columns"
970
where :
971
"table_name = $::text" : table
972
cb : (err, result) =>
973
if err
974
cb(err)
975
else
976
cb(undefined, (row.column_name for row in result.rows))
977
978
_primary_keys: (table) =>
979
return primaryKeys(table)
980
981
# Return *the* primary key, assuming unique; otherwise raise an exception.
982
_primary_key: (table) =>
983
return primaryKey(table)
984
985
_throttle: (name, time_s, key...) =>
986
key = misc.to_json(key)
987
x = "_throttle_#{name}"
988
@[x] ?= {}
989
if @[x][key]
990
return true
991
@[x][key] = true
992
setTimeout((()=>delete @[x]?[key]), time_s*1000)
993
return false
994
995
# Ensure that the actual schema in the database matches the one defined in SCHEMA.
996
# This creates the initial schema, adds new columns, and in a VERY LIMITED
997
# range of cases, *might be* be able to change the data type of a column.
998
update_schema: (opts) =>
999
try
1000
await syncSchema(SCHEMA);
1001
opts.cb?()
1002
catch err
1003
opts.cb?(err)
1004
1005
# Return the number of outstanding concurrent queries.
1006
concurrent: =>
1007
return @_concurrent_queries ? 0
1008
1009
is_heavily_loaded: =>
1010
return @_concurrent_queries >= @_concurrent_heavily_loaded
1011
1012
# Compute the sha1 hash (in hex) of the input arguments, which are
1013
# converted to strings (via json) if they are not strings, then concatenated.
1014
# This is used for computing compound primary keys in a way that is relatively
1015
# safe, and in situations where if there were a highly unlikely collision, it
1016
# wouldn't be the end of the world. There is a similar client-only slower version
1017
# of this function (in schema.coffee), so don't change it willy nilly.
1018
sha1: (args...) ->
1019
v = ((if typeof(x) == 'string' then x else JSON.stringify(x)) for x in args).join('')
1020
return misc_node.sha1(v)
1021
1022
# Go through every table in the schema with a column called "expire", and
1023
# delete every entry where expire is <= right now.
1024
# Note: this ignores those rows, where expire is NULL, because comparisons with NULL are NULL
1025
delete_expired: (opts) =>
1026
opts = defaults opts,
1027
count_only : false # if true, only count the number of rows that would be deleted
1028
table : undefined # only delete from this table
1029
cb : required
1030
dbg = @_dbg("delete_expired(...)")
1031
dbg()
1032
f = (table, cb) =>
1033
dbg("table='#{table}'")
1034
if opts.count_only
1035
@_query
1036
query : "SELECT COUNT(*) FROM #{table} WHERE expire <= NOW()"
1037
cb : (err, result) =>
1038
if not err
1039
dbg("COUNT for table #{table} is #{result.rows[0].count}")
1040
cb(err)
1041
else
1042
dbg("deleting expired entries from '#{table}'")
1043
@_query
1044
query : "DELETE FROM #{table} WHERE expire <= NOW()"
1045
cb : (err) =>
1046
dbg("finished deleting expired entries from '#{table}' -- #{err}")
1047
cb(err)
1048
if opts.table
1049
tables = [opts.table]
1050
else
1051
tables = (k for k, v of SCHEMA when v.fields?.expire?.type == 'timestamp' and not v.virtual)
1052
async.map(tables, f, opts.cb)
1053
1054
# count number of entries in a table
1055
count: (opts) =>
1056
opts = defaults opts,
1057
table : required
1058
cb : required
1059
@_query
1060
query : "SELECT COUNT(*) FROM #{opts.table}"
1061
cb : count_result(opts.cb)
1062
1063
# sanitize strings before inserting them into a query string
1064
sanitize: (s) =>
1065
escapeString(s)
1066
1067
###
1068
Other misc functions
1069
###
1070
1071
exports.pg_type = pg_type = (info) ->
1072
return pgType(info)
1073
1074
exports.quote_field = quote_field = (field) ->
1075
return quoteField(field)
1076
1077
# Timestamp the given number of seconds **in the future**.
1078
exports.expire_time = expire_time = (ttl) ->
1079
if ttl then new Date((new Date() - 0) + ttl*1000)
1080
1081
# Returns a function that takes as input the output of doing a SQL query.
1082
# If there are no results, returns undefined.
1083
# If there is exactly one result, what is returned depends on pattern:
1084
# 'a_field' --> returns the value of this field in the result
1085
# If more than one result, an error
1086
exports.one_result = one_result = (pattern, cb) ->
1087
if not cb? and typeof(pattern) == 'function'
1088
cb = pattern
1089
pattern = undefined
1090
if not cb?
1091
return -> # do nothing -- return function that ignores result
1092
return (err, result) ->
1093
if err
1094
cb(err)
1095
return
1096
if not result?.rows?
1097
cb()
1098
return
1099
switch result.rows.length
1100
when 0
1101
cb()
1102
when 1
1103
obj = misc.map_without_undefined_and_null(result.rows[0])
1104
if not pattern?
1105
cb(undefined, obj)
1106
return
1107
switch typeof(pattern)
1108
when 'string'
1109
x = obj[pattern]
1110
if not x? # null or undefined -- SQL returns null, but we want undefined
1111
cb()
1112
else
1113
if obj.expire? and new Date() >= obj.expire
1114
cb()
1115
else
1116
cb(undefined, x)
1117
when 'object'
1118
x = {}
1119
for p in pattern
1120
if obj[p]?
1121
x[p] = obj[p]
1122
cb(undefined, x)
1123
else
1124
cb("BUG: unknown pattern -- #{pattern}")
1125
else
1126
cb("more than one result")
1127
1128
exports.all_results = all_results = (pattern, cb) ->
1129
if not cb? and typeof(pattern) == 'function'
1130
cb = pattern
1131
pattern = undefined
1132
if not cb?
1133
return -> # do nothing -- return function that ignores result
1134
return (err, result) ->
1135
if err
1136
cb(err)
1137
else
1138
rows = result.rows
1139
if not pattern?
1140
# TODO: we use stupid (?) misc.copy to unwrap from pg driver type -- investigate better!
1141
# Maybe this is fine. I don't know.
1142
cb(undefined, (misc.copy(x) for x in rows))
1143
else if typeof(pattern) == 'string'
1144
cb(undefined, ((x[pattern] ? undefined) for x in rows))
1145
else
1146
cb("unsupported pattern type '#{typeof(pattern)}'")
1147
1148
1149
exports.count_result = count_result = (cb) ->
1150
if not cb?
1151
return -> # do nothing -- return function that ignores result
1152
return (err, result) ->
1153
if err
1154
cb(err)
1155
else
1156
cb(undefined, parseInt(result?.rows?[0]?.count))
1157
1158