Path: blob/master/src/packages/database/postgres-base.coffee
1496 views
#########################################################################1# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2# License: MS-RSL – see LICENSE.md for details3#########################################################################45# PostgreSQL -- basic queries and database interface67exports.DEBUG = true89# If database connection is non-responsive but no error raised directly10# by db client, then we will know and fix, rather than just sitting there...11DEFAULT_TIMEOUS_MS = 600001213# Do not test for non-responsiveness until a while after initial connection14# established, since things tend to work initially, *but* may also be much15# slower, due to tons of clients simultaneously connecting to DB.16DEFAULT_TIMEOUT_DELAY_MS = DEFAULT_TIMEOUS_MS * 41718QUERY_ALERT_THRESH_MS=50001920consts = require('./consts')21DEFAULT_STATEMENT_TIMEOUT_MS = consts.STATEMENT_TIMEOUT_MS2223EventEmitter = require('events')2425fs = require('fs')26async = require('async')27escapeString = require('sql-string-escape')28validator = require('validator')29{callback2} = require('@cocalc/util/async-utils')3031LRU = require('lru-cache')3233pg = require('pg')3435winston = require('@cocalc/backend/logger').getLogger('postgres')36{do_query_with_pg_params} = require('./postgres/set-pg-params')3738{ syncSchema } = require('./postgres/schema')39{ pgType } = require('./postgres/schema/pg-type')40{ quoteField } = require('./postgres/schema/util')41{ primaryKey, primaryKeys } = require('./postgres/schema/table')4243misc_node = require('@cocalc/backend/misc_node')44{ sslConfigToPsqlEnv, pghost, pgdatabase, pguser, pgssl } = require("@cocalc/backend/data")4546{ recordConnected, recordDisconnected } = require("./postgres/record-connect-error")4748{defaults} = misc = require('@cocalc/util/misc')49required = defaults.required5051{SCHEMA, client_db} = require('@cocalc/util/schema')5253metrics = require('@cocalc/backend/metrics')5455exports.PUBLIC_PROJECT_COLUMNS = ['project_id', 'last_edited', 'title', 'description', 'deleted', 'created', 'env']56exports.PROJECT_COLUMNS = ['users'].concat(exports.PUBLIC_PROJECT_COLUMNS)5758dbPassword = require('@cocalc/database/pool/password').default;5960class exports.PostgreSQL extends EventEmitter # emits a 'connect' event whenever we successfully connect to the database and 'disconnect' when connection to postgres fails61constructor: (opts) ->6263super()64opts = defaults opts,65host : pghost # DEPRECATED: or 'hostname:port' or 'host1,host2,...' (multiple hosts) -- TODO -- :port only works for one host.66database : pgdatabase67user : pguser68ssl : pgssl69debug : exports.DEBUG70connect : true71password : undefined72cache_expiry : 5000 # expire cached queries after this many milliseconds73# keep this very short; it's just meant to reduce impact of a bunch of74# identical permission checks in a single user query.75cache_size : 300 # cache this many queries; use @_query(cache:true, ...) to cache result76concurrent_warn : 50077concurrent_heavily_loaded : 70 # when concurrent hits this, consider load "heavy"; this changes home some queries behave to be faster but provide less info78ensure_exists : true # ensure database exists on startup (runs psql in a shell)79timeout_ms : DEFAULT_TIMEOUS_MS # **IMPORTANT: if *any* query takes this long, entire connection is terminated and recreated!**80timeout_delay_ms : DEFAULT_TIMEOUT_DELAY_MS # Only reconnect on timeout this many ms after connect. Motivation: on initial startup queries may take much longer due to competition with other clients.81@setMaxListeners(0) # because of a potentially large number of changefeeds82@_state = 'init'83@_debug = opts.debug84@_timeout_ms = opts.timeout_ms85@_timeout_delay_ms = opts.timeout_delay_ms86@_ensure_exists = opts.ensure_exists87@_init_test_query()88dbg = @_dbg("constructor") # must be after setting @_debug above89dbg(opts)90i = opts.host.indexOf(':')91if i != -192@_host = opts.host.slice(0, i)93@_port = parseInt(opts.host.slice(i+1))94else95@_host = opts.host96@_port = 543297@_concurrent_warn = opts.concurrent_warn98@_concurrent_heavily_loaded = opts.concurrent_heavily_loaded99@_user = opts.user100@_database = opts.database101@_ssl = opts.ssl102@_password = opts.password ? dbPassword()103@_init_metrics()104105if opts.cache_expiry and opts.cache_size106@_query_cache = new LRU({max:opts.cache_size, ttl: opts.cache_expiry})107if opts.connect108@connect() # start trying to connect109110clear_cache: =>111@_query_cache?.reset()112113close: =>114if @_state == 'closed'115return # nothing to do116@_close_test_query()117@_state = 'closed'118@emit('close')119@removeAllListeners()120if @_clients?121for client in @_clients122client.removeAllListeners()123client.end()124delete @_clients125126###127If @_timeout_ms is set, then we periodically do a simple test query,128to ensure that the database connection is working and responding to queries.129If the query below times out, then the connection will get recreated.130###131_do_test_query: =>132dbg = @_dbg('test_query')133dbg('starting')134@_query135query : 'SELECT NOW()'136cb : (err, result) =>137dbg("finished", err, result)138139_init_test_query: =>140if not @_timeout_ms141return142@_test_query = setInterval(@_do_test_query, @_timeout_ms)143144_close_test_query: =>145if @_test_query?146clearInterval(@_test_query)147delete @_test_query148149engine: -> 'postgresql'150151connect: (opts) =>152opts = defaults opts,153max_time : undefined # set to something shorter to not try forever154# Only first max_time is used.155cb : undefined156if @_state == 'closed'157opts.cb?("closed")158return159dbg = @_dbg("connect")160if @_clients?161dbg("already connected")162opts.cb?()163return164if @_connecting?165dbg('already trying to connect')166@_connecting.push(opts.cb)167# keep several times the db-concurrent-warn limit of callbacks168max_connecting = 5 * @_concurrent_warn169while @_connecting.length > max_connecting170@_connecting.shift()171dbg("WARNING: still no DB available, dropping old callbacks (limit: #{max_connecting})")172return173dbg('will try to connect')174@_state = 'init'175if opts.max_time176dbg("for up to #{opts.max_time}ms")177else178dbg("until successful")179@_connecting = [opts.cb]180misc.retry_until_success181f : @_connect182max_delay : 10000183max_time : opts.max_time184start_delay : 500 + 500*Math.random()185log : dbg186cb : (err) =>187v = @_connecting188delete @_connecting189for cb in v190cb?(err)191if not err192@_state = 'connected'193@emit('connect')194recordConnected()195196disconnect: () =>197if @_clients?198for client in @_clients199client.end()200client.removeAllListeners()201delete @_clients202203is_connected: () =>204return @_clients? and @_clients.length > 0205206_connect: (cb) =>207dbg = @_dbg("_connect")208dbg("connect to #{@_host}")209@_clear_listening_state() # definitely not listening210if @_clients?211@disconnect()212locals =213clients : []214hosts : []215@_connect_time = 0216@_concurrent_queries = 0 # can't be any going on now.217async.series([218(cb) =>219if @_ensure_exists220dbg("first make sure db exists")221@_ensure_database_exists(cb)222else223dbg("assuming database exists")224cb()225(cb) =>226if not @_host # undefined if @_host=''227locals.hosts = [undefined]228cb()229return230if @_host.indexOf('/') != -1231dbg("using a local socket file (not a hostname)")232locals.hosts = [@_host]233cb()234return235f = (host, cb) =>236hostname = host.split(':')[0]237winston.debug("Looking up ip addresses of #{hostname}")238require('dns').lookup hostname, {all:true}, (err, ips) =>239if err240winston.debug("Got #{hostname} --> err=#{err}")241# NON-FATAL -- we just don't include these and hope to242# have at least one total working host...243cb()244else245winston.debug("Got #{hostname} --> #{JSON.stringify(ips)}")246# In kubernetes the stateful set service just has247# lots of ip address. We connect to *all* of them,248# and spread queries across them equally.249for x in ips250locals.hosts.push(x.address)251cb()252async.map(@_host.split(','), f, (err) => cb(err))253(cb) =>254dbg("connecting to #{JSON.stringify(locals.hosts)}...")255if locals.hosts.length == 0256dbg("locals.hosts has length 0 -- no available db")257cb("no databases available")258return259260dbg("create client and start connecting...")261locals.clients = []262263# Use a function to initialize the client, to avoid any issues with scope of "client" below.264# Ref: https://node-postgres.com/apis/client265init_client = (host) =>266client = new pg.Client267user : @_user268host : host269port : @_port270password : @_password271database : @_database272ssl : @_ssl273statement_timeout: DEFAULT_STATEMENT_TIMEOUT_MS # we set a statement_timeout, to avoid queries locking up PG274if @_notification?275client.on('notification', @_notification)276onError = (err) =>277# only listen once for error; after that we've278# killed connection and don't care.279client.removeListener('error', onError)280if @_state == 'init'281# already started connecting282return283@emit('disconnect')284recordDisconnected()285dbg("error -- #{err}")286@disconnect()287@connect() # start trying to reconnect288client.on('error', onError)289client.setMaxListeners(0) # there is one emitter for each concurrent query... (see query_cb)290locals.clients.push(client)291292for host in locals.hosts293init_client(host)294295# Connect the clients. If at least one succeeds, we use this.296# If none succeed, we declare failure.297# Obviously, this is NOT optimal -- it's just hopefully sufficiently robust/works.298# I'm going to redo this with experience.299locals.clients_that_worked = []300locals.errors = []301f = (client, c) =>302try303await client.connect()304locals.clients_that_worked.push(client)305catch err306locals.errors.push(err)307c()308async.map locals.clients, f, () =>309if locals.clients_that_worked.length == 0310console.warn("ALL clients failed", locals.errors)311dbg("ALL clients failed", locals.errors)312cb("ALL clients failed to connect")313else314# take what we got315if locals.clients.length == locals.clients_that_worked.length316dbg("ALL clients worked")317else318dbg("ONLY #{locals.clients_that_worked.length} clients worked")319locals.clients = locals.clients_that_worked320dbg("cb = ", cb)321cb()322323(cb) =>324@_connect_time = new Date()325locals.i = 0326327# Weird and unfortunate fact -- this query can and does **HANG** never returning328# in some edge cases. That's why we have to be paranoid about this entire _connect329# function...330f = (client, cb) =>331it_hung = =>332cb?("hung")333cb = undefined334timeout = setTimeout(it_hung, 15000)335dbg("now connected; checking if we can actually query the DB via client #{locals.i}")336locals.i += 1337client.query "SELECT NOW()", (err) =>338clearTimeout(timeout)339cb?(err)340async.map(locals.clients, f, cb)341(cb) =>342dbg("checking if ANY db server is in recovery, i.e., we are doing standby queries only")343@is_standby = false344f = (client, cb) =>345# Is this a read/write or read-only connection?346client.query "SELECT pg_is_in_recovery()", (err, resp) =>347if err348cb(err)349else350# True if ANY db connection is read only.351if resp.rows[0].pg_is_in_recovery352@is_standby = true353cb()354async.map(locals.clients, f, cb)355], (err) =>356if err357mesg = "Failed to connect to database -- #{err}"358dbg(mesg)359console.warn(mesg) # make it clear for interactive users with debugging off -- common mistake with env not setup right.360# If we're unable to connect (or all clients fail), we are disconnected. This tells postgres/record-connect-error.ts about this problem.361# See https://github.com/sagemathinc/cocalc/issues/5997 for some logs related to that.362@emit('disconnect')363recordDisconnected()364cb?(err)365else366@_clients = locals.clients367@_concurrent_queries = 0368dbg("connected!")369cb?(undefined, @)370)371372# Return a native pg client connection. This will373# round robbin through all connections. It returns374# undefined if there are no connections.375_client: =>376if not @_clients?377return378if @_clients.length <= 1379return @_clients[0]380@_client_index ?= -1381@_client_index = @_client_index + 1382if @_client_index >= @_clients.length383@_client_index = 0384return @_clients[@_client_index]385386# Return query function of a database connection.387get_db_query: =>388db = @_client()389return db?.query.bind(db)390391_dbg: (f) =>392if @_debug393return (m) => winston.debug("PostgreSQL.#{f}: #{misc.trunc_middle(JSON.stringify(m), 250)}")394else395return ->396397_init_metrics: =>398# initialize metrics399try400@query_time_histogram = metrics.newHistogram('db', 'query_ms_histogram', 'db queries'401buckets : [1, 5, 10, 20, 50, 100, 200, 500, 1000, 5000, 10000]402labels: ['table']403)404@concurrent_counter = metrics.newCounter('db', 'concurrent_total',405'Concurrent queries (started and finished)',406['state']407)408catch err409@_dbg("_init_metrics")("WARNING -- #{err}")410411async_query: (opts) =>412return await callback2(@_query.bind(@), opts)413414_query: (opts) =>415opts = defaults opts,416query : undefined # can give select and table instead417select : undefined # if given, should be string or array of column names -| can give these418table : undefined # if given, name of table -| two instead of query419params : []420cache : false # Will cache results for a few seconds or use cache. Use this421# when speed is very important, and results that are a few seconds422# out of date are fine.423where : undefined # Used for SELECT: If given, can be424# - a map with keys clauses with $::TYPE (not $1::TYPE!) and values425# the corresponding params. Also, WHERE must not be in the query already.426# If where[cond] is undefined, then cond is completely **ignored**.427# - a string, which is inserted as is as a normal WHERE condition.428# - an array of maps or strings.429set : undefined # Appends a SET clause to the query; same format as values.430values : undefined # Used for INSERT: If given, then params and where must not be given. Values is a map431# {'field1::type1':value, , 'field2::type2':value2, ...} which gets converted to432# ' (field1, field2, ...) VALUES ($1::type1, $2::type2, ...) '433# with corresponding params set. Undefined valued fields are ignored and types may434# be omitted. Javascript null is not ignored and converts to PostgreSQL NULL.435conflict : undefined # If given, then values must also be given; appends this to query:436# ON CONFLICT (name) DO UPDATE SET value=EXCLUDED.value'437# Or, if conflict starts with "ON CONFLICT", then just include as is, e.g.,438# "ON CONFLICT DO NOTHING"439jsonb_set : undefined # Used for setting a field that contains a JSONB javascript map.440# NOTE: This does some merging! If you just want to replace the whole thing use the normal set above.441# Give as input an object442#443# { field1:{key1:val1, key2:val2, ...}, field2:{key3:val3,...}, ...}444#445# In each field, every key has the corresponding value set, unless val is undefined/null, in which446# case that key is deleted from the JSONB object fieldi. Simple as that! This is much, much447# cleaner to use than SQL. Also, if the value in field itself is NULL, it gets448# created automatically.449jsonb_merge : undefined # Exactly like jsonb_set, but when val1 (say) is an object, it merges that object in,450# *instead of* setting field1[key1]=val1. So after this field1[key1] has what was in it451# and also what is in val1. Obviously field1[key1] had better have been an array or NULL.452order_by : undefined453limit : undefined454offset : undefined455safety_check: true456retry_until_success : undefined # if given, should be options to misc.retry_until_success457pg_params : undefined # key/value map of postgres parameters, which will be set for the query in a single transaction458timeout_s : undefined # by default, there is a "statement_timeout" set. set to 0 to disable or a number in seconds459cb : undefined460461# quick check for write query against read-only connection462if @is_standby and (opts.set? or opts.jsonb_set? or opts.jsonb_merge?)463opts.cb?("set queries against standby not allowed")464return465466if opts.retry_until_success467@_query_retry_until_success(opts)468return469470if not @is_connected()471dbg = @_dbg("_query")472dbg("connecting first...")473# 2022-06: below there was {max_time: 45000} set with the note474# "don't try forever; queries could pile up."475# but I think this is rather harmful, since the hub could stop476# trying to connect to the database altogether.477# Rather, hub/health-checks::checkDBConnectivity will478# mark the hub as being bad if it can't connect to the database.479@connect480cb : (err) =>481if err482dbg("FAILED to connect -- #{err}")483opts.cb?("database is down (please try later)")484else485dbg("connected, now doing query")486@__do_query(opts)487else488@__do_query(opts)489490_query_retry_until_success: (opts) =>491retry_opts = opts.retry_until_success492orig_cb = opts.cb493delete opts.retry_until_success494495# f just calls @_do_query, but with a different cb (same opts)496args = undefined497f = (cb) =>498opts.cb = (args0...) =>499args = args0500cb(args[0])501@_query(opts)502503retry_opts.f = f504# When misc.retry_until_success finishes, it calls this, which just505# calls the original cb.506retry_opts.cb = (err) =>507if err508orig_cb?(err)509else510orig_cb?(args...)511512# OK, now start it attempting.513misc.retry_until_success(retry_opts)514515__do_query: (opts) =>516dbg = @_dbg("__do_query('#{misc.trunc(opts.query?.replace(/\n/g, " "),250)}',id='#{misc.uuid().slice(0,6)}')")517if not @is_connected()518# TODO: should also check that client is connected.519opts.cb?("client not yet initialized")520return521if opts.params? and not misc.is_array(opts.params)522opts.cb?("params must be an array")523return524if not opts.query?525if not opts.table?526opts.cb?("if query not given, then table must be given")527return528if not opts.select?529opts.select = '*'530if misc.is_array(opts.select)531opts.select = (quote_field(field) for field in opts.select).join(',')532opts.query = "SELECT #{opts.select} FROM \"#{opts.table}\""533delete opts.select534535push_param = (param, type) ->536if type?.toUpperCase() == 'JSONB'537param = misc.to_json(param) # I don't understand why this is needed by the driver....538opts.params.push(param)539return opts.params.length540541if opts.jsonb_merge?542if opts.jsonb_set?543opts.cb?("if jsonb_merge is set then jsonb_set must not be set")544return545opts.jsonb_set = opts.jsonb_merge546547SET = []548if opts.jsonb_set?549# This little piece of very hard to write (and clever?) code550# makes it so we can set or **merge in at any nested level** (!)551# arbitrary JSON objects. We can also delete any key at any552# level by making the value null or undefined! This is amazingly553# easy to use in queries -- basically making JSONP with postgres554# as expressive as RethinkDB REQL (even better in some ways).555set = (field, data, path) =>556obj = "COALESCE(#{field}#>'{#{path.join(',')}}', '{}'::JSONB)"557for key, val of data558if not val?559# remove key from object560obj = "(#{obj} - '#{key}')"561else562if opts.jsonb_merge? and (typeof(val) == 'object' and not misc.is_date(val))563subobj = set(field, val, path.concat([key]))564obj = "JSONB_SET(#{obj}, '{#{key}}', #{subobj})"565else566# completely replace field[key] with val.567obj = "JSONB_SET(#{obj}, '{#{key}}', $#{push_param(val, 'JSONB')}::JSONB)"568return obj569v = ("#{field}=#{set(field, data, [])}" for field, data of opts.jsonb_set)570SET.push(v...)571572if opts.values?573#dbg("values = #{misc.to_json(opts.values)}")574if opts.where?575opts.cb?("where must not be defined if opts.values is defined")576return577578if misc.is_array(opts.values)579# An array of numerous separate object that we will insert all at once.580# Determine the fields, which as the union of the keys of all values.581fields = {}582for x in opts.values583if not misc.is_object(x)584opts.cb?("if values is an array, every entry must be an object")585return586for k, p of x587fields[k] = true588# convert to array589fields = misc.keys(fields)590fields_to_index = {}591n = 0592for field in fields593fields_to_index[field] = n594n += 1595values = []596for x in opts.values597value = []598for field, param of x599if field.indexOf('::') != -1600[field, type] = field.split('::')601type = type.trim()602y = "$#{push_param(param, type)}::#{type}"603else604y = "$#{push_param(param)}"605value[fields_to_index[field]] = y606values.push(value)607else608# A single entry that we'll insert.609610fields = []611values = []612for field, param of opts.values613if param == undefined614# ignore undefined fields -- makes code cleaner (and makes sense)615continue616if field.indexOf('::') != -1617[field, type] = field.split('::')618fields.push(quote_field(field.trim()))619type = type.trim()620values.push("$#{push_param(param, type)}::#{type}")621continue622else623fields.push(quote_field(field))624values.push("$#{push_param(param)}")625values = [values] # just one626627if values.length > 0628opts.query += " (#{(quote_field(field) for field in fields).join(',')}) VALUES " + (" (#{value.join(',')}) " for value in values).join(',')629630if opts.set?631v = []632for field, param of opts.set633if field.indexOf('::') != -1634[field, type] = field.split('::')635type = type.trim()636v.push("#{quote_field(field.trim())}=$#{push_param(param, type)}::#{type}")637continue638else639v.push("#{quote_field(field.trim())}=$#{push_param(param)}")640if v.length > 0641SET.push(v...)642643if opts.conflict?644if misc.is_string(opts.conflict) and misc.startswith(opts.conflict.toLowerCase().trim(), 'on conflict')645# Straight string inclusion646opts.query += ' ' + opts.conflict + ' '647else648if not opts.values?649opts.cb?("if conflict is specified then values must also be specified")650return651if not misc.is_array(opts.conflict)652if typeof(opts.conflict) != 'string'653opts.cb?("conflict (='#{misc.to_json(opts.conflict)}') must be a string (the field name), for now")654return655else656conflict = [opts.conflict]657else658conflict = opts.conflict659v = ("#{quote_field(field)}=EXCLUDED.#{field}" for field in fields when field not in conflict)660SET.push(v...)661if SET.length == 0662opts.query += " ON CONFLICT (#{conflict.join(',')}) DO NOTHING "663else664opts.query += " ON CONFLICT (#{conflict.join(',')}) DO UPDATE "665666if SET.length > 0667opts.query += " SET " + SET.join(' , ')668669WHERE = []670push_where = (x) =>671if typeof(x) == 'string'672WHERE.push(x)673else if misc.is_array(x)674for v in x675push_where(v)676else if misc.is_object(x)677for cond, param of x678if typeof(cond) != 'string'679opts.cb?("each condition must be a string but '#{cond}' isn't")680return681if not param?682# *IGNORE* where conditions where value is explicitly undefined683# Note that in SQL NULL is not a value and there is no way to use it in placeholder684# anyways, so this can never work.685continue686if cond.indexOf('$') == -1687# where condition is missing it's $ parameter -- default to equality688cond += " = $"689WHERE.push(cond.replace('$', "$#{push_param(param)}"))690691if opts.where?692push_where(opts.where)693694if WHERE.length > 0695if opts.values?696opts.cb?("values must not be given if where clause given")697return698opts.query += " WHERE #{WHERE.join(' AND ')}"699700if opts.order_by?701if opts.order_by.indexOf("'") >= 0702err = "ERROR -- detected ' apostrophe in order_by='#{opts.order_by}'"703dbg(err)704opts.cb?(err)705return706opts.query += " ORDER BY #{opts.order_by}"707708if opts.limit?709if not validator.isInt('' + opts.limit, min:0)710err = "ERROR -- opts.limit = '#{opts.limit}' is not an integer"711dbg(err)712opts.cb?(err)713return714opts.query += " LIMIT #{opts.limit} "715716if opts.offset?717if not validator.isInt('' + opts.offset, min:0)718err = "ERROR -- opts.offset = '#{opts.offset}' is not an integer"719dbg(err)720opts.cb?(err)721return722opts.query += " OFFSET #{opts.offset} "723724725726if opts.safety_check727safety_check = opts.query.toLowerCase().trim()728if (safety_check.startsWith('update') or safety_check.startsWith('delete')) and (safety_check.indexOf('where') == -1 and safety_check.indexOf('trigger') == -1 and safety_check.indexOf('insert') == -1 and safety_check.indexOf('create') == -1)729# This is always a bug.730err = "ERROR -- Dangerous UPDATE or DELETE without a WHERE, TRIGGER, or INSERT: query='#{opts.query}'"731dbg(err)732opts.cb?(err)733return734735if opts.cache and @_query_cache?736# check for cached result737full_query_string = JSON.stringify([opts.query, opts.params])738if (x = @_query_cache.get(full_query_string))?739dbg("using cache for '#{opts.query}'")740opts.cb?(x...)741return742743# params can easily be huge, e.g., a blob. But this may be744# needed at some point for debugging.745#dbg("query='#{opts.query}', params=#{misc.to_json(opts.params)}")746client = @_client()747if not client?748opts.cb?("not connected")749return750@_concurrent_queries ?= 0751@_concurrent_queries += 1752dbg("query='#{opts.query} (concurrent=#{@_concurrent_queries})'")753754@concurrent_counter?.labels('started').inc(1)755try756start = new Date()757if @_timeout_ms and @_timeout_delay_ms758# Create a timer, so that if the query doesn't return within759# timeout_ms time, then the entire connection is destroyed.760# It then gets recreated automatically. I tested761# and all outstanding queries also get an error when this happens.762timeout_error = =>763# Only disconnect with timeout error if it has been sufficiently long764# since connecting. This way when an error is triggered, all the765# outstanding timers at the moment of the error will just get ignored766# when they fire (since @_connect_time is 0 or too recent).767if @_connect_time and new Date() - @_connect_time > @_timeout_delay_ms768client.emit('error', 'timeout')769timer = setTimeout(timeout_error, @_timeout_ms)770771# PAINFUL FACT: In client.query below, if the client is closed/killed/errored772# (especially via client.emit above), then none of the callbacks from773# client.query are called!774finished = false775error_listener = ->776dbg("error_listener fired")777query_cb('error')778client.once('error', error_listener)779query_cb = (err, result) =>780if finished # ensure no matter what that query_cb is called at most once.781dbg("called when finished (ignoring)")782return783finished = true784client.removeListener('error', error_listener)785786if @_timeout_ms787clearTimeout(timer)788query_time_ms = new Date() - start789@_concurrent_queries -= 1790@query_time_histogram?.observe({table:opts.table ? ''}, query_time_ms)791@concurrent_counter?.labels('ended').inc(1)792if err793dbg("done (concurrent=#{@_concurrent_queries}), (query_time_ms=#{query_time_ms}) -- error: #{err}")794## DANGER795# Only uncomment this for low level debugging!796#### dbg("params = #{JSON.stringify(opts.params)}")797##798err = 'postgresql ' + err799else800dbg("done (concurrent=#{@_concurrent_queries}) (query_time_ms=#{query_time_ms}) -- success")801if opts.cache and @_query_cache?802@_query_cache.set(full_query_string, [err, result])803opts.cb?(err, result)804if query_time_ms >= QUERY_ALERT_THRESH_MS805dbg("QUERY_ALERT_THRESH: query_time_ms=#{query_time_ms}\nQUERY_ALERT_THRESH: query='#{opts.query}'\nQUERY_ALERT_THRESH: params='#{misc.to_json(opts.params)}'")806807# set a timeout for one specific query (there is a default when creating the pg.Client, see @_connect)808if opts.timeout_s? and typeof opts.timeout_s == 'number' and opts.timeout_s >= 0809dbg("set query timeout to #{opts.timeout_s}secs")810opts.pg_params ?= {}811# the actual param is in milliseconds812# https://postgresqlco.nf/en/doc/param/statement_timeout/813opts.pg_params.statement_timeout = 1000 * opts.timeout_s814815if opts.pg_params?816dbg("run query with specific postgres parameters in a transaction")817do_query_with_pg_params(client: client, query: opts.query, params: opts.params, pg_params:opts.pg_params, cb: query_cb)818else819client.query(opts.query, opts.params, query_cb)820821catch e822# this should never ever happen823dbg("EXCEPTION in client.query: #{e}")824opts.cb?(e)825@_concurrent_queries -= 1826@concurrent_counter?.labels('ended').inc(1)827return828829# Special case of query for counting entries in a table.830_count: (opts) =>831opts = defaults opts,832table : required833where : undefined # as in _query834cb : required835@_query836query : "SELECT COUNT(*) FROM #{opts.table}"837where : opts.where838cb : count_result(opts.cb)839840_validate_opts: (opts) =>841for k, v of opts842if k.slice(k.length-2) == 'id'843if v? and not misc.is_valid_uuid_string(v)844opts.cb?("invalid #{k} -- #{v}")845return false846if k.slice(k.length-3) == 'ids'847for w in v848if not misc.is_valid_uuid_string(w)849opts.cb?("invalid uuid #{w} in #{k} -- #{misc.to_json(v)}")850return false851if k == 'group' and v not in misc.PROJECT_GROUPS852opts.cb?("unknown project group '#{v}'"); return false853if k == 'groups'854for w in v855if w not in misc.PROJECT_GROUPS856opts.cb?("unknown project group '#{w}' in groups"); return false857return true858859_ensure_database_exists: (cb) =>860dbg = @_dbg("_ensure_database_exists")861dbg("ensure database '#{@_database}' exists")862args = ['--user', @_user, '--host', @_host.split(',')[0], '--port', @_port, '--list', '--tuples-only']863sslEnv = sslConfigToPsqlEnv(@_ssl)864dbg("psql #{args.join(' ')}")865misc_node.execute_code866command : 'psql'867args : args868env : Object.assign sslEnv,869PGPASSWORD : @_password870cb : (err, output) =>871if err872cb(err)873return874databases = (x.split('|')[0].trim() for x in output.stdout.split('\n') when x)875if @_database in databases876dbg("database '#{@_database}' already exists")877cb()878return879dbg("creating database '#{@_database}'")880misc_node.execute_code881command : 'createdb'882args : ['--host', @_host, '--port', @_port, @_database]883env :884PGPASSWORD : @_password885cb : cb886887_confirm_delete: (opts) =>888opts = defaults opts,889confirm : 'no'890cb : required891dbg = @_dbg("confirm")892if opts.confirm != 'yes'893err = "Really delete all data? -- you must explicitly pass in confirm='yes' (but confirm:'#{opts.confirm}')"894dbg(err)895opts.cb(err)896return false897else898return true899900set_random_password: (opts) =>901throw Error("NotImplementedError")902903# This will fail if any other clients have db open.904# This function is very important for automated testing.905delete_entire_database: (opts) =>906dbg = @_dbg("delete_entire_database")907dbg("deleting database '#{@_database}'")908if not @_confirm_delete(opts)909dbg("failed confirmation")910return911async.series([912(cb) =>913dbg("disconnect from db")914@disconnect()915cb()916(cb) =>917misc_node.execute_code918command : 'dropdb'919args : ['--host', @_host, '--port', @_port, @_database]920cb : cb921], opts.cb)922923# Deletes all the contents of the tables in the database. It doesn't924# delete anything about the schema itself: indexes or tables.925delete_all: (opts) =>926dbg = @_dbg("delete_all")927dbg("deleting all contents of tables in '#{@_database}'")928if not @_confirm_delete(opts)929return930931# If the cache is enabled, be sure to also clear it.932@clear_cache()933934tables = undefined935936# Delete anything cached in the db object. Obviously, not putting something here937# is a natural place in which to cause bugs... but they will probably all be bugs938# of the form "the test suite fails", so we'll find them.939delete @_stats_cached940941# Actually delete tables942async.series([943(cb) =>944@_get_tables (err, t) =>945tables = t; cb(err)946(cb) =>947f = (table, cb) =>948@_query949query : "DELETE FROM #{table}"950safety_check : false951cb : cb952async.map(tables, f, cb)953], opts.cb)954955# return list of tables in the database956_get_tables: (cb) =>957@_query958query : "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"959cb : (err, result) =>960if err961cb(err)962else963cb(undefined, (row.table_name for row in result.rows))964965# Return list of columns in a given table966_get_columns: (table, cb) =>967@_query968query : "SELECT column_name FROM information_schema.columns"969where :970"table_name = $::text" : table971cb : (err, result) =>972if err973cb(err)974else975cb(undefined, (row.column_name for row in result.rows))976977_primary_keys: (table) =>978return primaryKeys(table)979980# Return *the* primary key, assuming unique; otherwise raise an exception.981_primary_key: (table) =>982return primaryKey(table)983984_throttle: (name, time_s, key...) =>985key = misc.to_json(key)986x = "_throttle_#{name}"987@[x] ?= {}988if @[x][key]989return true990@[x][key] = true991setTimeout((()=>delete @[x]?[key]), time_s*1000)992return false993994# Ensure that the actual schema in the database matches the one defined in SCHEMA.995# This creates the initial schema, adds new columns, and in a VERY LIMITED996# range of cases, *might be* be able to change the data type of a column.997update_schema: (opts) =>998try999await syncSchema(SCHEMA);1000opts.cb?()1001catch err1002opts.cb?(err)10031004# Return the number of outstanding concurrent queries.1005concurrent: =>1006return @_concurrent_queries ? 010071008is_heavily_loaded: =>1009return @_concurrent_queries >= @_concurrent_heavily_loaded10101011# Compute the sha1 hash (in hex) of the input arguments, which are1012# converted to strings (via json) if they are not strings, then concatenated.1013# This is used for computing compound primary keys in a way that is relatively1014# safe, and in situations where if there were a highly unlikely collision, it1015# wouldn't be the end of the world. There is a similar client-only slower version1016# of this function (in schema.coffee), so don't change it willy nilly.1017sha1: (args...) ->1018v = ((if typeof(x) == 'string' then x else JSON.stringify(x)) for x in args).join('')1019return misc_node.sha1(v)10201021# Go through every table in the schema with a column called "expire", and1022# delete every entry where expire is <= right now.1023# Note: this ignores those rows, where expire is NULL, because comparisons with NULL are NULL1024delete_expired: (opts) =>1025opts = defaults opts,1026count_only : false # if true, only count the number of rows that would be deleted1027table : undefined # only delete from this table1028cb : required1029dbg = @_dbg("delete_expired(...)")1030dbg()1031f = (table, cb) =>1032dbg("table='#{table}'")1033if opts.count_only1034@_query1035query : "SELECT COUNT(*) FROM #{table} WHERE expire <= NOW()"1036cb : (err, result) =>1037if not err1038dbg("COUNT for table #{table} is #{result.rows[0].count}")1039cb(err)1040else1041dbg("deleting expired entries from '#{table}'")1042@_query1043query : "DELETE FROM #{table} WHERE expire <= NOW()"1044cb : (err) =>1045dbg("finished deleting expired entries from '#{table}' -- #{err}")1046cb(err)1047if opts.table1048tables = [opts.table]1049else1050tables = (k for k, v of SCHEMA when v.fields?.expire?.type == 'timestamp' and not v.virtual)1051async.map(tables, f, opts.cb)10521053# count number of entries in a table1054count: (opts) =>1055opts = defaults opts,1056table : required1057cb : required1058@_query1059query : "SELECT COUNT(*) FROM #{opts.table}"1060cb : count_result(opts.cb)10611062# sanitize strings before inserting them into a query string1063sanitize: (s) =>1064escapeString(s)10651066###1067Other misc functions1068###10691070exports.pg_type = pg_type = (info) ->1071return pgType(info)10721073exports.quote_field = quote_field = (field) ->1074return quoteField(field)10751076# Timestamp the given number of seconds **in the future**.1077exports.expire_time = expire_time = (ttl) ->1078if ttl then new Date((new Date() - 0) + ttl*1000)10791080# Returns a function that takes as input the output of doing a SQL query.1081# If there are no results, returns undefined.1082# If there is exactly one result, what is returned depends on pattern:1083# 'a_field' --> returns the value of this field in the result1084# If more than one result, an error1085exports.one_result = one_result = (pattern, cb) ->1086if not cb? and typeof(pattern) == 'function'1087cb = pattern1088pattern = undefined1089if not cb?1090return -> # do nothing -- return function that ignores result1091return (err, result) ->1092if err1093cb(err)1094return1095if not result?.rows?1096cb()1097return1098switch result.rows.length1099when 01100cb()1101when 11102obj = misc.map_without_undefined_and_null(result.rows[0])1103if not pattern?1104cb(undefined, obj)1105return1106switch typeof(pattern)1107when 'string'1108x = obj[pattern]1109if not x? # null or undefined -- SQL returns null, but we want undefined1110cb()1111else1112if obj.expire? and new Date() >= obj.expire1113cb()1114else1115cb(undefined, x)1116when 'object'1117x = {}1118for p in pattern1119if obj[p]?1120x[p] = obj[p]1121cb(undefined, x)1122else1123cb("BUG: unknown pattern -- #{pattern}")1124else1125cb("more than one result")11261127exports.all_results = all_results = (pattern, cb) ->1128if not cb? and typeof(pattern) == 'function'1129cb = pattern1130pattern = undefined1131if not cb?1132return -> # do nothing -- return function that ignores result1133return (err, result) ->1134if err1135cb(err)1136else1137rows = result.rows1138if not pattern?1139# TODO: we use stupid (?) misc.copy to unwrap from pg driver type -- investigate better!1140# Maybe this is fine. I don't know.1141cb(undefined, (misc.copy(x) for x in rows))1142else if typeof(pattern) == 'string'1143cb(undefined, ((x[pattern] ? undefined) for x in rows))1144else1145cb("unsupported pattern type '#{typeof(pattern)}'")114611471148exports.count_result = count_result = (cb) ->1149if not cb?1150return -> # do nothing -- return function that ignores result1151return (err, result) ->1152if err1153cb(err)1154else1155cb(undefined, parseInt(result?.rows?[0]?.count))115611571158