lock.c
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:55k
- /*-------------------------------------------------------------------------
- *
- * lock.c
- * simple lock acquisition
- *
- * Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- * $Header: /usr/local/cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.55 1999/05/29 06:14:42 vadim Exp $
- *
- * NOTES
- * Outside modules can create a lock table and acquire/release
- * locks. A lock table is a shared memory hash table. When
- * a process tries to acquire a lock of a type that conflictRs
- * with existing locks, it is put to sleep using the routines
- * in storage/lmgr/proc.c.
- *
- * Interface:
- *
- * LockAcquire(), LockRelease(), LockMethodTableInit(),
- * LockMethodTableRename(), LockReleaseAll, LockOwners()
- * LockResolveConflicts(), GrantLock()
- *
- * NOTE: This module is used to define new lock tables. The
- * multi-level lock table (multi.c) used by the heap
- * access methods calls these routines. See multi.c for
- * examples showing how to use this interface.
- *
- *-------------------------------------------------------------------------
- */
- #include <stdio.h> /* for sprintf() */
- #include <string.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <signal.h>
- #include "postgres.h"
- #include "miscadmin.h"
- #include "storage/shmem.h"
- #include "storage/sinvaladt.h"
- #include "storage/spin.h"
- #include "storage/proc.h"
- #include "storage/lock.h"
- #include "utils/hsearch.h"
- #include "utils/memutils.h"
- #include "utils/palloc.h"
- #include "access/xact.h"
- #include "access/transam.h"
- #include "utils/trace.h"
- #include "utils/ps_status.h"
- static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
- /*
- * lockDebugRelation can be used to trace unconditionally a single relation,
- * for example pg_listener, if you suspect there are locking problems.
- *
- * lockDebugOidMin is is used to avoid tracing postgres relations, which
- * would produce a lot of output. Unfortunately most system relations are
- * created after bootstrap and have oid greater than BootstrapObjectIdData.
- * If you are using tprintf you should specify a value greater than the max
- * oid of system relations, which can be found with the following query:
- *
- * select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_';
- *
- * To get a useful lock trace you can use the following pg_options:
- *
- * -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500"
- */
- #define LOCKDEBUG(lockmethod) (pg_options[TRACE_SHORTLOCKS+lockmethod])
- #define lockDebugRelation (pg_options[TRACE_LOCKRELATION])
- #define lockDebugOidMin (pg_options[TRACE_LOCKOIDMIN])
- #define lockReadPriority (pg_options[OPT_LOCKREADPRIORITY])
- #ifdef LOCK_MGR_DEBUG
- #define LOCK_PRINT(where,lock,type)
- if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1)
- && (lock->tag.relId >= lockDebugOidMin))
- ||
- (lockDebugRelation && (lock->tag.relId == lockDebugRelation)))
- LOCK_PRINT_AUX(where,lock,type)
- #define LOCK_PRINT_AUX(where,lock,type)
- TPRINTF(TRACE_ALL,
- "%s: lock(%x) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) "
- "hold(%d,%d,%d,%d,%d,%d,%d)=%d "
- "act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
- where,
- MAKE_OFFSET(lock),
- lock->tag.lockmethod,
- lock->tag.relId,
- lock->tag.dbId,
- lock->tag.objId.blkno,
- lock->mask,
- lock->holders[1],
- lock->holders[2],
- lock->holders[3],
- lock->holders[4],
- lock->holders[5],
- lock->holders[6],
- lock->holders[7],
- lock->nHolding,
- lock->activeHolders[1],
- lock->activeHolders[2],
- lock->activeHolders[3],
- lock->activeHolders[4],
- lock->activeHolders[5],
- lock->activeHolders[6],
- lock->activeHolders[7],
- lock->nActive,
- lock->waitProcs.size,
- lock_types[type])
- #define XID_PRINT(where,xidentP)
- if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1)
- && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId
- >= lockDebugOidMin))
- || (lockDebugRelation &&
- (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId
- == lockDebugRelation)))
- XID_PRINT_AUX(where,xidentP)
- #define XID_PRINT_AUX(where,xidentP)
- TPRINTF(TRACE_ALL,
- "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%u) "
- "hold(%d,%d,%d,%d,%d,%d,%d)=%d",
- where,
- MAKE_OFFSET(xidentP),
- xidentP->tag.lock,
- XIDENT_LOCKMETHOD(*(xidentP)),
- xidentP->tag.pid,
- xidentP->tag.xid,
- xidentP->holders[1],
- xidentP->holders[2],
- xidentP->holders[3],
- xidentP->holders[4],
- xidentP->holders[5],
- xidentP->holders[6],
- xidentP->holders[7],
- xidentP->nHolding)
- #else /* !LOCK_MGR_DEBUG */
- #define LOCK_PRINT(where,lock,type)
- #define LOCK_PRINT_AUX(where,lock,type)
- #define XID_PRINT(where,xidentP)
- #define XID_PRINT_AUX(where,xidentP)
- #endif /* !LOCK_MGR_DEBUG */
- static char *lock_types[] = {
- "INVALID",
- "AccessShareLock",
- "RowShareLock",
- "RowExclusiveLock",
- "ShareLock",
- "ShareRowExclusiveLock",
- "ExclusiveLock",
- "AccessExclusiveLock"
- };
- SPINLOCK LockMgrLock; /* in Shmem or created in
- * CreateSpinlocks() */
- /* This is to simplify/speed up some bit arithmetic */
- static MASK BITS_OFF[MAX_LOCKMODES];
- static MASK BITS_ON[MAX_LOCKMODES];
- /* -----------------
- * XXX Want to move this to this file
- * -----------------
- */
- static bool LockingIsDisabled;
- /* -------------------
- * map from lockmethod to the lock table structure
- * -------------------
- */
- static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
- static int NumLockMethods;
- /* -------------------
- * InitLocks -- Init the lock module. Create a private data
- * structure for constructing conflict masks.
- * -------------------
- */
- void
- InitLocks()
- {
- int i;
- int bit;
- bit = 1;
- /* -------------------
- * remember 0th lockmode is invalid
- * -------------------
- */
- for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
- {
- BITS_ON[i] = bit;
- BITS_OFF[i] = ~bit;
- }
- #ifdef LOCK_MGR_DEBUG
- /*
- * If lockDebugOidMin value has not been specified in pg_options set a
- * default value.
- */
- if (!lockDebugOidMin)
- lockDebugOidMin = BootstrapObjectIdData;
- #endif
- }
- /* -------------------
- * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
- * ------------------
- */
- void
- LockDisable(int status)
- {
- LockingIsDisabled = status;
- }
- /*
- * LockMethodInit -- initialize the lock table's lock type
- * structures
- *
- * Notes: just copying. Should only be called once.
- */
- static void
- LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
- MASK *conflictsP,
- int *prioP,
- int numModes)
- {
- int i;
- lockMethodTable->ctl->numLockModes = numModes;
- numModes++;
- for (i = 0; i < numModes; i++, prioP++, conflictsP++)
- {
- lockMethodTable->ctl->conflictTab[i] = *conflictsP;
- lockMethodTable->ctl->prio[i] = *prioP;
- }
- }
- /*
- * LockMethodTableInit -- initialize a lock table structure
- *
- * Notes:
- * (a) a lock table has four separate entries in the shmem index
- * table. This is because every shared hash table and spinlock
- * has its name stored in the shmem index at its creation. It
- * is wasteful, in this case, but not much space is involved.
- *
- */
- LOCKMETHOD
- LockMethodTableInit(char *tabName,
- MASK *conflictsP,
- int *prioP,
- int numModes)
- {
- LOCKMETHODTABLE *lockMethodTable;
- char *shmemName;
- HASHCTL info;
- int hash_flags;
- bool found;
- int status = TRUE;
- if (numModes > MAX_LOCKMODES)
- {
- elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
- numModes, MAX_LOCKMODES);
- return INVALID_LOCKMETHOD;
- }
- /* allocate a string for the shmem index table lookup */
- shmemName = (char *) palloc((unsigned) (strlen(tabName) + 32));
- if (!shmemName)
- {
- elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s n", tabName);
- return INVALID_LOCKMETHOD;
- }
- /* each lock table has a non-shared header */
- lockMethodTable = (LOCKMETHODTABLE *) palloc((unsigned) sizeof(LOCKMETHODTABLE));
- if (!lockMethodTable)
- {
- elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %sn", tabName);
- pfree(shmemName);
- return INVALID_LOCKMETHOD;
- }
- /* ------------------------
- * find/acquire the spinlock for the table
- * ------------------------
- */
- SpinAcquire(LockMgrLock);
- /* -----------------------
- * allocate a control structure from shared memory or attach to it
- * if it already exists.
- * -----------------------
- */
- sprintf(shmemName, "%s (ctl)", tabName);
- lockMethodTable->ctl = (LOCKMETHODCTL *)
- ShmemInitStruct(shmemName, (unsigned) sizeof(LOCKMETHODCTL), &found);
- if (!lockMethodTable->ctl)
- {
- elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
- status = FALSE;
- }
- /* -------------------
- * no zero-th table
- * -------------------
- */
- NumLockMethods = 1;
- /* ----------------
- * we're first - initialize
- * ----------------
- */
- if (!found)
- {
- MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
- lockMethodTable->ctl->masterLock = LockMgrLock;
- lockMethodTable->ctl->lockmethod = NumLockMethods;
- }
- /* --------------------
- * other modules refer to the lock table by a lockmethod
- * --------------------
- */
- LockMethodTable[NumLockMethods] = lockMethodTable;
- NumLockMethods++;
- Assert(NumLockMethods <= MAX_LOCK_METHODS);
- /* ----------------------
- * allocate a hash table for the lock tags. This is used
- * to find the different locks.
- * ----------------------
- */
- info.keysize = SHMEM_LOCKTAB_KEYSIZE;
- info.datasize = SHMEM_LOCKTAB_DATASIZE;
- info.hash = tag_hash;
- hash_flags = (HASH_ELEM | HASH_FUNCTION);
- sprintf(shmemName, "%s (lock hash)", tabName);
- lockMethodTable->lockHash = (HTAB *) ShmemInitHash(shmemName,
- INIT_TABLE_SIZE, MAX_TABLE_SIZE,
- &info, hash_flags);
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- if (!lockMethodTable->lockHash)
- {
- elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
- status = FALSE;
- }
- /* -------------------------
- * allocate an xid table. When different transactions hold
- * the same lock, additional information must be saved (locks per tx).
- * -------------------------
- */
- info.keysize = SHMEM_XIDTAB_KEYSIZE;
- info.datasize = SHMEM_XIDTAB_DATASIZE;
- info.hash = tag_hash;
- hash_flags = (HASH_ELEM | HASH_FUNCTION);
- sprintf(shmemName, "%s (xid hash)", tabName);
- lockMethodTable->xidHash = (HTAB *) ShmemInitHash(shmemName,
- INIT_TABLE_SIZE, MAX_TABLE_SIZE,
- &info, hash_flags);
- if (!lockMethodTable->xidHash)
- {
- elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
- status = FALSE;
- }
- /* init ctl data structures */
- LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
- SpinRelease(LockMgrLock);
- pfree(shmemName);
- if (status)
- return lockMethodTable->ctl->lockmethod;
- else
- return INVALID_LOCKMETHOD;
- }
- /*
- * LockMethodTableRename -- allocate another lockmethod to the same
- * lock table.
- *
- * NOTES: Both the lock module and the lock chain (lchain.c)
- * module use table id's to distinguish between different
- * kinds of locks. Short term and long term locks look
- * the same to the lock table, but are handled differently
- * by the lock chain manager. This function allows the
- * client to use different lockmethods when acquiring/releasing
- * short term and long term locks.
- */
- LOCKMETHOD
- LockMethodTableRename(LOCKMETHOD lockmethod)
- {
- LOCKMETHOD newLockMethod;
- if (NumLockMethods >= MAX_LOCK_METHODS)
- return INVALID_LOCKMETHOD;
- if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
- return INVALID_LOCKMETHOD;
- /* other modules refer to the lock table by a lockmethod */
- newLockMethod = NumLockMethods;
- NumLockMethods++;
- LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
- return newLockMethod;
- }
- /*
- * LockAcquire -- Check for lock conflicts, sleep if conflict found,
- * set lock if/when no conflicts.
- *
- * Returns: TRUE if parameters are correct, FALSE otherwise.
- *
- * Side Effects: The lock is always acquired. No way to abort
- * a lock acquisition other than aborting the transaction.
- * Lock is recorded in the lkchain.
- *
- #ifdef USER_LOCKS
- *
- * Note on User Locks:
- *
- * User locks are handled totally on the application side as
- * long term cooperative locks which extend beyond the normal
- * transaction boundaries. Their purpose is to indicate to an
- * application that someone is `working' on an item. So it is
- * possible to put an user lock on a tuple's oid, retrieve the
- * tuple, work on it for an hour and then update it and remove
- * the lock. While the lock is active other clients can still
- * read and write the tuple but they can be aware that it has
- * been locked at the application level by someone.
- * User locks use lock tags made of an uint16 and an uint32, for
- * example 0 and a tuple oid, or any other arbitrary pair of
- * numbers following a convention established by the application.
- * In this sense tags don't refer to tuples or database entities.
- * User locks and normal locks are completely orthogonal and
- * they don't interfere with each other, so it is possible
- * to acquire a normal lock on an user-locked tuple or user-lock
- * a tuple for which a normal write lock already exists.
- * User locks are always non blocking, therefore they are never
- * acquired if already held by another process. They must be
- * released explicitly by the application but they are released
- * automatically when a backend terminates.
- * They are indicated by a lockmethod 2 which is an alias for the
- * normal lock table, and are distinguished from normal locks
- * for the following differences:
- *
- * normal lock user lock
- *
- * lockmethod 1 2
- * tag.relId rel oid 0
- * tag.ItemPointerData.ip_blkid block id lock id2
- * tag.ItemPointerData.ip_posid tuple offset lock id1
- * xid.pid 0 backend pid
- * xid.xid xid or 0 0
- * persistence transaction user or backend
- *
- * The lockmode parameter can have the same values for normal locks
- * although probably only WRITE_LOCK can have some practical use.
- *
- * DZ - 22 Nov 1997
- #endif
- */
- bool
- LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
- {
- XIDLookupEnt *result,
- item;
- HTAB *xidTable;
- bool found;
- LOCK *lock = NULL;
- SPINLOCK masterLock;
- LOCKMETHODTABLE *lockMethodTable;
- int status;
- TransactionId xid;
- #ifdef USER_LOCKS
- int is_user_lock;
- is_user_lock = (lockmethod == USER_LOCKMETHOD);
- if (is_user_lock)
- {
- #ifdef USER_LOCKS_DEBUG
- TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u] %s",
- locktag->objId.blkno,
- lock_types[lockmode]);
- #endif
- }
- #endif
- /* ???????? This must be changed when short term locks will be used */
- locktag->lockmethod = lockmethod;
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
- if (!lockMethodTable)
- {
- elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
- return FALSE;
- }
- if (LockingIsDisabled)
- return TRUE;
- masterLock = lockMethodTable->ctl->masterLock;
- SpinAcquire(masterLock);
- /*
- * Find or create a lock with this tag
- */
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
- HASH_ENTER, &found);
- if (!lock)
- {
- SpinRelease(masterLock);
- elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
- return FALSE;
- }
- /* --------------------
- * if there was nothing else there, complete initialization
- * --------------------
- */
- if (!found)
- {
- lock->mask = 0;
- lock->nHolding = 0;
- lock->nActive = 0;
- MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
- MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
- ProcQueueInit(&(lock->waitProcs));
- Assert(lock->tag.objId.blkno == locktag->objId.blkno);
- LOCK_PRINT("LockAcquire: new", lock, lockmode);
- }
- else
- {
- LOCK_PRINT("LockAcquire: found", lock, lockmode);
- Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
- Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
- }
- /* ------------------
- * add an element to the lock queue so that we can clear the
- * locks at end of transaction.
- * ------------------
- */
- xidTable = lockMethodTable->xidHash;
- /* ------------------
- * Zero out all of the tag bytes (this clears the padding bytes for long
- * word alignment and ensures hashing consistency).
- * ------------------
- */
- MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
- item.tag.lock = MAKE_OFFSET(lock);
- #ifdef USE_XIDTAG_LOCKMETHOD
- item.tag.lockmethod = lockmethod;
- #endif
- #ifdef USER_LOCKS
- if (is_user_lock)
- {
- item.tag.pid = MyProcPid;
- item.tag.xid = xid = 0;
- }
- else
- {
- xid = GetCurrentTransactionId();
- TransactionIdStore(xid, &item.tag.xid);
- }
- #else
- xid = GetCurrentTransactionId();
- TransactionIdStore(xid, &item.tag.xid);
- #endif
- /*
- * Find or create an xid entry with this tag
- */
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
- HASH_ENTER, &found);
- if (!result)
- {
- elog(NOTICE, "LockAcquire: xid table corrupted");
- return STATUS_ERROR;
- }
- /*
- * If not found initialize the new entry
- */
- if (!found)
- {
- result->nHolding = 0;
- MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
- ProcAddLock(&result->queue);
- XID_PRINT("LockAcquire: new", result);
- }
- else
- {
- XID_PRINT("LockAcquire: found", result);
- Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
- Assert(result->nHolding <= lock->nActive);
- }
- /* ----------------
- * lock->nholding tells us how many processes have _tried_ to
- * acquire this lock, Regardless of whether they succeeded or
- * failed in doing so.
- * ----------------
- */
- lock->nHolding++;
- lock->holders[lockmode]++;
- Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
- /* --------------------
- * If I'm the only one holding a lock, then there
- * cannot be a conflict. The same is true if we already
- * hold this lock.
- * --------------------
- */
- if (result->nHolding == lock->nActive || result->holders[lockmode] != 0)
- {
- result->holders[lockmode]++;
- result->nHolding++;
- XID_PRINT("LockAcquire: owning", result);
- Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
- GrantLock(lock, lockmode);
- SpinRelease(masterLock);
- return TRUE;
- }
- /*
- * If lock requested conflicts with locks requested by waiters...
- */
- if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
- {
- int i = 1;
- /*
- * If I don't hold locks or my locks don't conflict with waiters
- * then force to sleep.
- */
- if (result->nHolding > 0)
- {
- for (; i <= lockMethodTable->ctl->numLockModes; i++)
- {
- if (result->holders[i] > 0 &&
- lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
- break; /* conflict */
- }
- }
- if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
- {
- XID_PRINT("LockAcquire: higher priority proc waiting",
- result);
- status = STATUS_FOUND;
- }
- else
- status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
- }
- else
- status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
- if (status == STATUS_OK)
- GrantLock(lock, lockmode);
- else if (status == STATUS_FOUND)
- {
- #ifdef USER_LOCKS
- /*
- * User locks are non blocking. If we can't acquire a lock we must
- * remove the xid entry and return FALSE without waiting.
- */
- if (is_user_lock)
- {
- if (!result->nHolding)
- {
- SHMQueueDelete(&result->queue);
- result = (XIDLookupEnt *) hash_search(xidTable,
- (Pointer) result,
- HASH_REMOVE, &found);
- if (!result || !found)
- elog(NOTICE, "LockAcquire: remove xid, table corrupted");
- }
- else
- XID_PRINT_AUX("LockAcquire: NHOLDING", result);
- lock->nHolding--;
- lock->holders[lockmode]--;
- LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
- Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
- SpinRelease(masterLock);
- return FALSE;
- }
- #endif
- /*
- * Construct bitmask of locks we hold before going to sleep.
- */
- MyProc->holdLock = 0;
- if (result->nHolding > 0)
- {
- int i,
- tmpMask = 2;
- for (i = 1; i <= lockMethodTable->ctl->numLockModes;
- i++, tmpMask <<= 1)
- {
- if (result->holders[i] > 0)
- MyProc->holdLock |= tmpMask;
- }
- Assert(MyProc->holdLock != 0);
- }
- status = WaitOnLock(lockmethod, lock, lockmode);
- /*
- * Check the xid entry status, in case something in the ipc
- * communication doesn't work correctly.
- */
- if (!((result->nHolding > 0) && (result->holders[lockmode] > 0)))
- {
- XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result);
- LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode);
- /* Should we retry ? */
- return FALSE;
- }
- XID_PRINT("LockAcquire: granted", result);
- LOCK_PRINT("LockAcquire: granted", lock, lockmode);
- }
- SpinRelease(masterLock);
- return status == STATUS_OK;
- }
- /* ----------------------------
- * LockResolveConflicts -- test for lock conflicts
- *
- * NOTES:
- * Here's what makes this complicated: one transaction's
- * locks don't conflict with one another. When many processes
- * hold locks, each has to subtract off the other's locks when
- * determining whether or not any new lock acquired conflicts with
- * the old ones.
- *
- * ----------------------------
- */
- int
- LockResolveConflicts(LOCKMETHOD lockmethod,
- LOCK *lock,
- LOCKMODE lockmode,
- TransactionId xid,
- XIDLookupEnt *xidentP) /* xident ptr or NULL */
- {
- XIDLookupEnt *result,
- item;
- int *myHolders;
- int numLockModes;
- HTAB *xidTable;
- bool found;
- int bitmask;
- int i,
- tmpMask;
- #ifdef USER_LOCKS
- int is_user_lock;
- #endif
- numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
- xidTable = LockMethodTable[lockmethod]->xidHash;
- if (xidentP)
- {
- /*
- * A pointer to the xid entry was supplied from the caller.
- * Actually only LockAcquire can do it.
- */
- result = xidentP;
- }
- else
- {
- /* ---------------------
- * read my own statistics from the xid table. If there
- * isn't an entry, then we'll just add one.
- *
- * Zero out the tag, this clears the padding bytes for long
- * word alignment and ensures hashing consistency.
- * ------------------
- */
- MemSet(&item, 0, XID_TAGSIZE);
- item.tag.lock = MAKE_OFFSET(lock);
- #ifdef USE_XIDTAG_LOCKMETHOD
- item.tag.lockmethod = lockmethod;
- #endif
- #ifdef USER_LOCKS
- is_user_lock = (lockmethod == 2);
- if (is_user_lock)
- {
- item.tag.pid = MyProcPid;
- item.tag.xid = 0;
- }
- else
- TransactionIdStore(xid, &item.tag.xid);
- #else
- TransactionIdStore(xid, &item.tag.xid);
- #endif
- /*
- * Find or create an xid entry with this tag
- */
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
- HASH_ENTER, &found);
- if (!result)
- {
- elog(NOTICE, "LockResolveConflicts: xid table corrupted");
- return STATUS_ERROR;
- }
- /*
- * If not found initialize the new entry. THIS SHOULD NEVER
- * HAPPEN, if we are trying to resolve a conflict we must already
- * have allocated an xid entry for this lock. dz 21-11-1997
- */
- if (!found)
- {
- /* ---------------
- * we're not holding any type of lock yet. Clear
- * the lock stats.
- * ---------------
- */
- MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
- result->nHolding = 0;
- XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result);
- }
- else
- XID_PRINT("LockResolveConflicts: found", result);
- }
- Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
- /* ----------------------------
- * first check for global conflicts: If no locks conflict
- * with mine, then I get the lock.
- *
- * Checking for conflict: lock->mask represents the types of
- * currently held locks. conflictTable[lockmode] has a bit
- * set for each type of lock that conflicts with mine. Bitwise
- * compare tells if there is a conflict.
- * ----------------------------
- */
- if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
- {
- result->holders[lockmode]++;
- result->nHolding++;
- XID_PRINT("LockResolveConflicts: no conflict", result);
- Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
- return STATUS_OK;
- }
- /* ------------------------
- * Rats. Something conflicts. But it could still be my own
- * lock. We have to construct a conflict mask
- * that does not reflect our own locks.
- * ------------------------
- */
- myHolders = result->holders;
- bitmask = 0;
- tmpMask = 2;
- for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
- {
- if (lock->activeHolders[i] != myHolders[i])
- bitmask |= tmpMask;
- }
- /* ------------------------
- * now check again for conflicts. 'bitmask' describes the types
- * of locks held by other processes. If one of these
- * conflicts with the kind of lock that I want, there is a
- * conflict and I have to sleep.
- * ------------------------
- */
- if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
- {
- /* no conflict. Get the lock and go on */
- result->holders[lockmode]++;
- result->nHolding++;
- XID_PRINT("LockResolveConflicts: resolved", result);
- Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
- return STATUS_OK;
- }
- XID_PRINT("LockResolveConflicts: conflicting", result);
- return STATUS_FOUND;
- }
- /*
- * GrantLock -- update the lock data structure to show
- * the new lock holder.
- */
- void
- GrantLock(LOCK *lock, LOCKMODE lockmode)
- {
- lock->nActive++;
- lock->activeHolders[lockmode]++;
- lock->mask |= BITS_ON[lockmode];
- LOCK_PRINT("GrantLock", lock, lockmode);
- Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
- Assert(lock->nActive <= lock->nHolding);
- }
- static int
- WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
- {
- PROC_QUEUE *waitQueue = &(lock->waitProcs);
- LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
- char old_status[64],
- new_status[64];
- Assert(lockmethod < NumLockMethods);
- /*
- * the waitqueue is ordered by priority. I insert myself according to
- * the priority of the lock I am acquiring.
- *
- * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
- * synchronization for this queue. That will not be true if/when
- * people can be deleted from the queue by a SIGINT or something.
- */
- LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
- strcpy(old_status, PS_STATUS);
- strcpy(new_status, PS_STATUS);
- strcat(new_status, " waiting");
- PS_SET_STATUS(new_status);
- if (ProcSleep(waitQueue,
- lockMethodTable->ctl,
- lockmode,
- lock) != NO_ERROR)
- {
- /* -------------------
- * This could have happend as a result of a deadlock,
- * see HandleDeadLock().
- * Decrement the lock nHolding and holders fields as
- * we are no longer waiting on this lock.
- * -------------------
- */
- lock->nHolding--;
- lock->holders[lockmode]--;
- LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
- Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
- if (lock->activeHolders[lockmode] == lock->holders[lockmode])
- lock->waitMask &= BITS_OFF[lockmode];
- SpinRelease(lockMethodTable->ctl->masterLock);
- elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
- /* not reached */
- }
- if (lock->activeHolders[lockmode] == lock->holders[lockmode])
- lock->waitMask &= BITS_OFF[lockmode];
- PS_SET_STATUS(old_status);
- LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
- return STATUS_OK;
- }
- /*
- * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
- * release it.
- *
- * Side Effects: if the lock no longer conflicts with the highest
- * priority waiting process, that process is granted the lock
- * and awoken. (We have to grant the lock here to avoid a
- * race between the waking process and any new process to
- * come along and request the lock).
- */
- bool
- LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
- {
- LOCK *lock = NULL;
- SPINLOCK masterLock;
- bool found;
- LOCKMETHODTABLE *lockMethodTable;
- XIDLookupEnt *result,
- item;
- HTAB *xidTable;
- TransactionId xid;
- bool wakeupNeeded = true;
- int trace_flag;
- #ifdef USER_LOCKS
- int is_user_lock;
- is_user_lock = (lockmethod == USER_LOCKMETHOD);
- if (is_user_lock)
- {
- TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u] %d",
- locktag->objId.blkno,
- lockmode);
- }
- #endif
- /* ???????? This must be changed when short term locks will be used */
- locktag->lockmethod = lockmethod;
- #ifdef USER_LOCKS
- trace_flag =
- (lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS;
- #else
- trace_flag = TRACE_LOCKS;
- #endif
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
- if (!lockMethodTable)
- {
- elog(NOTICE, "lockMethodTable is null in LockRelease");
- return FALSE;
- }
- if (LockingIsDisabled)
- return TRUE;
- masterLock = lockMethodTable->ctl->masterLock;
- SpinAcquire(masterLock);
- /*
- * Find a lock with this tag
- */
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
- HASH_FIND, &found);
- /*
- * let the caller print its own error message, too. Do not
- * elog(ERROR).
- */
- if (!lock)
- {
- SpinRelease(masterLock);
- elog(NOTICE, "LockRelease: locktable corrupted");
- return FALSE;
- }
- if (!found)
- {
- SpinRelease(masterLock);
- #ifdef USER_LOCKS
- if (is_user_lock)
- {
- TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
- return FALSE;
- }
- #endif
- elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
- return FALSE;
- }
- LOCK_PRINT("LockRelease: found", lock, lockmode);
- Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
- Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
- /* ------------------
- * Zero out all of the tag bytes (this clears the padding bytes for long
- * word alignment and ensures hashing consistency).
- * ------------------
- */
- MemSet(&item, 0, XID_TAGSIZE);
- item.tag.lock = MAKE_OFFSET(lock);
- #ifdef USE_XIDTAG_LOCKMETHOD
- item.tag.lockmethod = lockmethod;
- #endif
- #ifdef USER_LOCKS
- if (is_user_lock)
- {
- item.tag.pid = MyProcPid;
- item.tag.xid = xid = 0;
- }
- else
- {
- xid = GetCurrentTransactionId();
- TransactionIdStore(xid, &item.tag.xid);
- }
- #else
- xid = GetCurrentTransactionId();
- TransactionIdStore(xid, &item.tag.xid);
- #endif
- /*
- * Find an xid entry with this tag
- */
- xidTable = lockMethodTable->xidHash;
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
- HASH_FIND_SAVE, &found);
- if (!result || !found)
- {
- SpinRelease(masterLock);
- #ifdef USER_LOCKS
- if (!found && is_user_lock)
- TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
- else
- #endif
- elog(NOTICE, "LockReplace: xid table corrupted");
- return FALSE;
- }
- XID_PRINT("LockRelease: found", result);
- Assert(result->tag.lock == MAKE_OFFSET(lock));
- /*
- * Check that we are actually holding a lock of the type we want to
- * release.
- */
- if (!(result->holders[lockmode] > 0))
- {
- SpinRelease(masterLock);
- XID_PRINT_AUX("LockAcquire: WRONGTYPE", result);
- elog(NOTICE, "LockRelease: you don't own a lock of type %s",
- lock_types[lockmode]);
- Assert(result->holders[lockmode] >= 0);
- return FALSE;
- }
- Assert(result->nHolding > 0);
- /*
- * fix the general lock stats
- */
- lock->nHolding--;
- lock->holders[lockmode]--;
- lock->nActive--;
- lock->activeHolders[lockmode]--;
- #ifdef NOT_USED
- /* --------------------------
- * If there are still active locks of the type I just released, no one
- * should be woken up. Whoever is asleep will still conflict
- * with the remaining locks.
- * --------------------------
- */
- if (lock->activeHolders[lockmode])
- wakeupNeeded = false;
- else
- #endif
- /*
- * Above is not valid any more (due to MVCC lock modes). Actually
- * we should compare activeHolders[lockmode] with number of
- * waiters holding lock of this type and try to wakeup only if
- * these numbers are equal (and lock released conflicts with locks
- * requested by waiters). For the moment we only check the last
- * condition.
- */
- if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
- wakeupNeeded = true;
- if (!(lock->activeHolders[lockmode]))
- {
- /* change the conflict mask. No more of this lock type. */
- lock->mask &= BITS_OFF[lockmode];
- }
- LOCK_PRINT("LockRelease: updated", lock, lockmode);
- Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
- Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
- if (!lock->nHolding)
- {
- /* ------------------
- * if there's no one waiting in the queue,
- * we just released the last lock.
- * Delete it from the lock table.
- * ------------------
- */
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash,
- (Pointer) &(lock->tag),
- HASH_REMOVE,
- &found);
- Assert(lock && found);
- wakeupNeeded = false;
- }
- /*
- * now check to see if I have any private locks. If I do, decrement
- * the counts associated with them.
- */
- result->holders[lockmode]--;
- result->nHolding--;
- XID_PRINT("LockRelease: updated", result);
- Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
- /*
- * If this was my last hold on this lock, delete my entry in the XID
- * table.
- */
- if (!result->nHolding)
- {
- if (result->queue.prev == INVALID_OFFSET)
- elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
- if (result->queue.next == INVALID_OFFSET)
- elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
- if (result->queue.next != INVALID_OFFSET)
- SHMQueueDelete(&result->queue);
- XID_PRINT("LockRelease: deleting", result);
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
- HASH_REMOVE_SAVED, &found);
- if (!result || !found)
- {
- SpinRelease(masterLock);
- elog(NOTICE, "LockRelease: remove xid, table corrupted");
- return FALSE;
- }
- }
- if (wakeupNeeded)
- ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
- else
- {
- if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1)
- &&(lock->tag.relId >= lockDebugOidMin))
- ||
- (lockDebugRelation && (lock->tag.relId == lockDebugRelation)))
- TPRINTF(TRACE_ALL, "LockRelease: no wakeup needed");
- }
- SpinRelease(masterLock);
- return TRUE;
- }
- /*
- * LockReleaseAll -- Release all locks in a process lock queue.
- */
- bool
- LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
- {
- PROC_QUEUE *waitQueue;
- int done;
- XIDLookupEnt *xidLook = NULL;
- XIDLookupEnt *tmp = NULL;
- XIDLookupEnt *result;
- SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
- SPINLOCK masterLock;
- LOCKMETHODTABLE *lockMethodTable;
- int i,
- numLockModes;
- LOCK *lock;
- bool found;
- int trace_flag;
- int xidtag_lockmethod;
- #ifdef USER_LOCKS
- int is_user_lock_table,
- count,
- nleft;
- count = nleft = 0;
- is_user_lock_table = (lockmethod == USER_LOCKMETHOD);
- trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS;
- #else
- trace_flag = TRACE_LOCKS;
- #endif
- TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d",
- lockmethod, MyProcPid);
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
- if (!lockMethodTable)
- {
- elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
- return FALSE;
- }
- if (SHMQueueEmpty(lockQueue))
- return TRUE;
- numLockModes = lockMethodTable->ctl->numLockModes;
- masterLock = lockMethodTable->ctl->masterLock;
- SpinAcquire(masterLock);
- SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
- for (;;)
- {
- bool wakeupNeeded = false;
- /*
- * Sometimes the queue appears to be messed up.
- */
- if (count++ > 1000)
- {
- elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
- nleft = 0;
- break;
- }
- /* ---------------------------
- * XXX Here we assume the shared memory queue is circular and
- * that we know its internal structure. Should have some sort of
- * macros to allow one to walk it. mer 20 July 1991
- * ---------------------------
- */
- done = (xidLook->queue.next == end);
- lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
- xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
- if ((xidtag_lockmethod == lockmethod) && pg_options[trace_flag])
- {
- XID_PRINT("LockReleaseAll", xidLook);
- LOCK_PRINT("LockReleaseAll", lock, 0);
- }
- #ifdef USE_XIDTAG_LOCKMETHOD
- if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
- elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
- xidtag_lockmethod, lock->tag.lockmethod);
- #endif
- if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2))
- {
- TPRINTF(trace_flag, "LockReleaseAll: skipping other table");
- nleft++;
- goto next_item;
- }
- Assert(lock->nHolding > 0);
- Assert(lock->nActive > 0);
- Assert(lock->nActive <= lock->nHolding);
- Assert(xidLook->nHolding >= 0);
- Assert(xidLook->nHolding <= lock->nHolding);
- #ifdef USER_LOCKS
- if (is_user_lock_table)
- {
- if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
- {
- TPRINTF(TRACE_USERLOCKS,
- "LockReleaseAll: skiping normal lock [%d,%d,%d]",
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
- nleft++;
- goto next_item;
- }
- if (xidLook->tag.pid != MyProcPid)
- {
- /* Should never happen */
- elog(NOTICE,
- "LockReleaseAll: INVALID PID: [%u] [%d,%d,%d]",
- lock->tag.objId.blkno,
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
- nleft++;
- goto next_item;
- }
- TPRINTF(TRACE_USERLOCKS,
- "LockReleaseAll: releasing user lock [%u] [%d,%d,%d]",
- lock->tag.objId.blkno,
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
- }
- else
- {
- /*
- * Can't check xidLook->tag.xid, can be 0 also for normal
- * locks
- */
- if (xidLook->tag.pid != 0)
- {
- TPRINTF(TRACE_LOCKS,
- "LockReleaseAll: skiping user lock [%u] [%d,%d,%d]",
- lock->tag.objId.blkno,
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
- nleft++;
- goto next_item;
- }
- }
- #endif
- /* ------------------
- * fix the general lock stats
- * ------------------
- */
- if (lock->nHolding != xidLook->nHolding)
- {
- for (i = 1; i <= numLockModes; i++)
- {
- Assert(xidLook->holders[i] >= 0);
- lock->holders[i] -= xidLook->holders[i];
- lock->activeHolders[i] -= xidLook->holders[i];
- Assert((lock->holders[i] >= 0)
- &&(lock->activeHolders[i] >= 0));
- if (!lock->activeHolders[i])
- lock->mask &= BITS_OFF[i];
- /*
- * Read comments in LockRelease
- */
- if (!wakeupNeeded && xidLook->holders[i] > 0 &&
- lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
- wakeupNeeded = true;
- }
- lock->nHolding -= xidLook->nHolding;
- lock->nActive -= xidLook->nHolding;
- Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
- Assert(lock->nActive <= lock->nHolding);
- }
- else
- {
- /* --------------
- * set nHolding to zero so that we can garbage collect the lock
- * down below...
- * --------------
- */
- lock->nHolding = 0;
- /* Fix the lock status, just for next LOCK_PRINT message. */
- for (i = 1; i <= numLockModes; i++)
- {
- Assert(lock->holders[i] == lock->activeHolders[i]);
- lock->holders[i] = lock->activeHolders[i] = 0;
- }
- }
- LOCK_PRINT("LockReleaseAll: updated", lock, 0);
- /*
- * Remove the xid from the process lock queue
- */
- SHMQueueDelete(&xidLook->queue);
- /* ----------------
- * always remove the xidLookup entry, we're done with it now
- * ----------------
- */
- XID_PRINT("LockReleaseAll: deleting", xidLook);
- result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
- (Pointer) xidLook,
- HASH_REMOVE,
- &found);
- if (!result || !found)
- {
- SpinRelease(masterLock);
- elog(NOTICE, "LockReleaseAll: xid table corrupted");
- return FALSE;
- }
- if (!lock->nHolding)
- {
- /* --------------------
- * if there's no one waiting in the queue, we've just released
- * the last lock.
- * --------------------
- */
- LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash,
- (Pointer) &(lock->tag),
- HASH_REMOVE, &found);
- if ((!lock) || (!found))
- {
- SpinRelease(masterLock);
- elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
- return FALSE;
- }
- }
- else if (wakeupNeeded)
- {
- waitQueue = &(lock->waitProcs);
- ProcLockWakeup(waitQueue, lockmethod, lock);
- }
- #ifdef USER_LOCKS
- next_item:
- #endif
- if (done)
- break;
- SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
- xidLook = tmp;
- }
- /*
- * Reinitialize the queue only if nothing has been left in.
- */
- if (nleft == 0)
- {
- TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue");
- SHMQueueInit(lockQueue);
- }
- SpinRelease(masterLock);
- TPRINTF(trace_flag, "LockReleaseAll: done");
- return TRUE;
- }
- int
- LockShmemSize(int maxBackends)
- {
- int size = 0;
- size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
- size += MAXALIGN(maxBackends * sizeof(PROC)); /* each MyProc */
- size += MAXALIGN(maxBackends * sizeof(LOCKMETHODCTL)); /* each
- * lockMethodTable->ctl */
- /* lockHash table */
- size += hash_estimate_size(NLOCKENTS(maxBackends),
- SHMEM_LOCKTAB_KEYSIZE,
- SHMEM_LOCKTAB_DATASIZE);
- /* xidHash table */
- size += hash_estimate_size(NLOCKENTS(maxBackends),
- SHMEM_XIDTAB_KEYSIZE,
- SHMEM_XIDTAB_DATASIZE);
- /*
- * Since the lockHash entry count above is only an estimate, add 10%
- * safety margin.
- */
- size += size / 10;
- return size;
- }
- /* -----------------
- * Boolean function to determine current locking status
- * -----------------
- */
- bool
- LockingDisabled()
- {
- return LockingIsDisabled;
- }
- /*
- * DeadlockCheck -- Checks for deadlocks for a given process
- *
- * We can't block on user locks, so no sense testing for deadlock
- * because there is no blocking, and no timer for the block.
- *
- * This code takes a list of locks a process holds, and the lock that
- * the process is sleeping on, and tries to find if any of the processes
- * waiting on its locks hold the lock it is waiting for. If no deadlock
- * is found, it goes on to look at all the processes waiting on their locks.
- *
- * We have already locked the master lock before being called.
- */
- bool
- DeadLockCheck(void *proc, LOCK *findlock)
- {
- XIDLookupEnt *xidLook = NULL;
- XIDLookupEnt *tmp = NULL;
- PROC *thisProc = (PROC *) proc,
- *waitProc;
- SHM_QUEUE *lockQueue = &(thisProc->lockQueue);
- SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
- LOCK *lock;
- PROC_QUEUE *waitQueue;
- int i,
- j;
- bool first_run = (thisProc == MyProc),
- done;
- static PROC *checked_procs[MAXBACKENDS];
- static int nprocs;
- /* initialize at start of recursion */
- if (first_run)
- {
- checked_procs[0] = MyProc;
- nprocs = 1;
- }
- if (SHMQueueEmpty(lockQueue))
- return false;
- SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
- XID_PRINT("DeadLockCheck", xidLook);
- for (;;)
- {
- done = (xidLook->queue.next == end);
- lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
- LOCK_PRINT("DeadLockCheck", lock, 0);
- if (lock->tag.relId == 0) /* user' lock */
- goto nxtl;
- /*
- * waitLock is always in lockQueue of waiting proc, if !first_run
- * then upper caller will handle waitProcs queue of waitLock.
- */
- if (thisProc->waitLock == lock && !first_run)
- goto nxtl;
- /*
- * If we found proc holding findlock and sleeping on some my other
- * lock then we have to check does it block me or another waiters.
- */
- if (lock == findlock && !first_run)
- {
- LOCKMETHODCTL *lockctl =
- LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
- int lm;
- Assert(xidLook->nHolding > 0);
- for (lm = 1; lm <= lockctl->numLockModes; lm++)
- {
- if (xidLook->holders[lm] > 0 &&
- lockctl->conflictTab[lm] & findlock->waitMask)
- return true;
- }
- /*
- * Else - get the next lock from thisProc's lockQueue
- */
- goto nxtl;
- }
- waitQueue = &(lock->waitProcs);
- waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
- for (i = 0; i < waitQueue->size; i++)
- {
- if (waitProc == thisProc)
- {
- Assert(waitProc->waitLock == lock);
- Assert(waitProc == MyProc);
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- continue;
- }
- if (lock == findlock) /* first_run also true */
- {
- LOCKMETHODCTL *lockctl =
- LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
- /*
- * If me blocked by his holdlock...
- */
- if (lockctl->conflictTab[MyProc->token] & waitProc->holdLock)
- {
- /* and he blocked by me -> deadlock */
- if (lockctl->conflictTab[waitProc->token] & MyProc->holdLock)
- return true;
- /* we shouldn't look at lockQueue of our blockers */
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- continue;
- }
- /*
- * If he isn't blocked by me and we request
- * non-conflicting lock modes - no deadlock here because
- * of he isn't blocked by me in any sence (explicitle or
- * implicitly). Note that we don't do like test if
- * !first_run (when thisProc is holder and non-waiter on
- * lock) and so we call DeadLockCheck below for every
- * waitProc in thisProc->lockQueue, even for waitProc-s
- * un-blocked by thisProc. Should we? This could save us
- * some time...
- */
- if (!(lockctl->conflictTab[waitProc->token] & MyProc->holdLock) &&
- !(lockctl->conflictTab[waitProc->token] & (1 << MyProc->token)))
- {
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- continue;
- }
- }
- /*
- * Look in lockQueue of this waitProc, if didn't do this
- * before.
- */
- for (j = 0; j < nprocs; j++)
- {
- if (checked_procs[j] == waitProc)
- break;
- }
- if (j >= nprocs)
- {
- Assert(nprocs < MAXBACKENDS);
- checked_procs[nprocs++] = waitProc;
- if (DeadLockCheck(waitProc, findlock))
- {
- LOCKMETHODCTL *lockctl =
- LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
- int holdLock;
- /*
- * Ok, but is waitProc waiting for me (thisProc) ?
- */
- if (thisProc->waitLock == lock)
- {
- Assert(first_run);
- holdLock = thisProc->holdLock;
- }
- else
- /* should we cache holdLock ? */
- {
- int lm,
- tmpMask = 2;
- Assert(xidLook->nHolding > 0);
- for (holdLock = 0, lm = 1;
- lm <= lockctl->numLockModes;
- lm++, tmpMask <<= 1)
- {
- if (xidLook->holders[lm] > 0)
- holdLock |= tmpMask;
- }
- Assert(holdLock != 0);
- }
- if (lockctl->conflictTab[waitProc->token] & holdLock)
- {
- /*
- * Last attempt to avoid deadlock - try to wakeup
- * myself.
- */
- if (first_run)
- {
- if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
- MyProc->waitLock,
- MyProc->token,
- MyProc->xid,
- NULL) == STATUS_OK)
- {
- GrantLock(MyProc->waitLock, MyProc->token);
- (MyProc->waitLock->waitProcs.size)--;
- ProcWakeup(MyProc, NO_ERROR);
- return false;
- }
- }
- return true;
- }
- /*
- * Hell! Is he blocked by any (other) holder ?
- */
- if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
- lock,
- waitProc->token,
- waitProc->xid,
- NULL) != STATUS_OK)
- {
- /*
- * Blocked by others - no deadlock...
- */
- #ifdef DEADLOCK_DEBUG
- LOCK_PRINT("DeadLockCheck: blocked by others",
- lock, waitProc->token);
- #endif
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- continue;
- }
- /*
- * Well - wakeup this guy! This is the case of
- * implicit blocking: thisProc blocked someone who
- * blocked waitProc by the fact that he/someone is
- * already waiting for lock. We do this for
- * anti-starving.
- */
- GrantLock(lock, waitProc->token);
- waitQueue->size--;
- waitProc = ProcWakeup(waitProc, NO_ERROR);
- continue;
- }
- }
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- }
- nxtl: ;
- if (done)
- break;
- SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
- xidLook = tmp;
- }
- /* if we got here, no deadlock */
- return false;
- }
- #ifdef NOT_USED
- /*
- * Return an array with the pids of all processes owning a lock.
- * This works only for user locks because normal locks have no
- * pid information in the corresponding XIDLookupEnt.
- */
- ArrayType *
- LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag)
- {
- XIDLookupEnt *xidLook = NULL;
- SPINLOCK masterLock;
- LOCK *lock;
- SHMEM_OFFSET lock_offset;
- int count = 0;
- LOCKMETHODTABLE *lockMethodTable;
- HTAB *xidTable;
- bool found;
- int ndims,
- nitems,
- hdrlen,
- size;
- int lbounds[1],
- hbounds[1];
- ArrayType *array;
- int *data_ptr;
- /* Assume that no one will modify the result */
- static int empty_array[] = {20, 1, 0, 0, 0};
- #ifdef USER_LOCKS
- int is_user_lock;
- is_user_lock = (lockmethod == USER_LOCKMETHOD);
- if (is_user_lock)
- {
- TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u]",
- locktag->objId.blkno);
- }
- #endif
- /* This must be changed when short term locks will be used */
- locktag->lockmethod = lockmethod;
- Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods));
- lockMethodTable = LockMethodTable[lockmethod];
- if (!lockMethodTable)
- {
- elog(NOTICE, "lockMethodTable is null in LockOwners");
- return (ArrayType *) &empty_array;
- }
- if (LockingIsDisabled)
- return (ArrayType *) &empty_array;
- masterLock = lockMethodTable->ctl->masterLock;
- SpinAcquire(masterLock);
- /*
- * Find a lock with this tag
- */
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
- HASH_FIND, &found);
- /*
- * let the caller print its own error message, too. Do not elog(WARN).
- */
- if (!lock)
- {
- SpinRelease(masterLock);
- elog(NOTICE, "LockOwners: locktable corrupted");
- return (ArrayType *) &empty_array;
- }
- if (!found)
- {
- SpinRelease(masterLock);
- #ifdef USER_LOCKS
- if (is_user_lock)
- {
- TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag");
- return (ArrayType *) &empty_array;
- }
- #endif
- elog(NOTICE, "LockOwners: locktable lookup failed, no lock");
- return (ArrayType *) &empty_array;
- }
- LOCK_PRINT("LockOwners: found", lock, 0);
- Assert((lock->nHolding > 0) && (lock->nActive > 0));
- Assert(lock->nActive <= lock->nHolding);
- lock_offset = MAKE_OFFSET(lock);
- /* Construct a 1-dimensional array */
- ndims = 1;
- hdrlen = ARR_OVERHEAD(ndims);
- lbounds[0] = 0;
- hbounds[0] = lock->nActive;
- size = hdrlen + sizeof(int) * hbounds[0];
- array = (ArrayType *) palloc(size);
- MemSet(array, 0, size);
- memmove((char *) array, (char *) &size, sizeof(int));
- memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int));
- memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
- memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int));
- SET_LO_FLAG(false, array);
- data_ptr = (int *) ARR_DATA_PTR(array);
- xidTable = lockMethodTable->xidHash;
- hash_seq(NULL);
- nitems = 0;
- while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) &&
- (xidLook != (XIDLookupEnt *) TRUE))
- {
- if (count++ > 1000)
- {
- elog(NOTICE, "LockOwners: possible loop, giving up");
- break;
- }
- if (xidLook->tag.pid == 0)
- {
- XID_PRINT("LockOwners: no pid", xidLook);
- continue;
- }
- if (!xidLook->tag.lock)
- {
- XID_PRINT("LockOwners: NULL LOCK", xidLook);
- continue;
- }
- if (xidLook->tag.lock != lock_offset)
- {
- XID_PRINT("LockOwners: different lock", xidLook);
- continue;
- }
- if (LOCK_LOCKMETHOD(*lock) != lockmethod)
- {
- XID_PRINT("LockOwners: other table", xidLook);
- continue;
- }
- if (xidLook->nHolding <= 0)
- {
- XID_PRINT("LockOwners: not holding", xidLook);
- continue;
- }
- if (nitems >= hbounds[0])
- {
- elog(NOTICE, "LockOwners: array size exceeded");
- break;
- }
- /*
- * Check that the holding process is still alive by sending him an
- * unused (ignored) signal. If the kill fails the process is not
- * alive.
- */
- if ((xidLook->tag.pid != MyProcPid)
- &&(kill(xidLook->tag.pid, SIGCHLD)) != 0)
- {
- /* Return a negative pid to signal that process is dead */
- data_ptr[nitems++] = -(xidLook->tag.pid);
- XID_PRINT("LockOwners: not alive", xidLook);
- /* XXX - TODO: remove this entry and update lock stats */
- continue;
- }
- /* Found a process holding the lock */
- XID_PRINT("LockOwners: holding", xidLook);
- data_ptr[nitems++] = xidLook->tag.pid;
- }
- SpinRelease(masterLock);
- /* Adjust the actual size of the array */
- hbounds[0] = nitems;
- size = hdrlen + sizeof(int) * hbounds[0];
- memmove((char *) array, (char *) &size, sizeof(int));
- memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
- return array;
- }
- #endif
- #ifdef DEADLOCK_DEBUG
- /*
- * Dump all locks in the proc->lockQueue. Must have already acquired
- * the masterLock.
- */
- void
- DumpLocks()
- {
- SHMEM_OFFSET location;
- PROC *proc;
- SHM_QUEUE *lockQueue;
- int done;
- XIDLookupEnt *xidLook = NULL;
- XIDLookupEnt *tmp = NULL;
- SHMEM_OFFSET end;
- SPINLOCK masterLock;
- int numLockModes;
- LOCK *lock;
- int count = 0;
- int lockmethod = DEFAULT_LOCKMETHOD;
- LOCKMETHODTABLE *lockMethodTable;
- ShmemPIDLookup(MyProcPid, &location);
- if (location == INVALID_OFFSET)
- return;
- proc = (PROC *) MAKE_PTR(location);
- if (proc != MyProc)
- return;
- lockQueue = &proc->lockQueue;
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
- if (!lockMethodTable)
- return;
- numLockModes = lockMethodTable->ctl->numLockModes;
- masterLock = lockMethodTable->ctl->masterLock;
- if (SHMQueueEmpty(lockQueue))
- return;
- SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
- end = MAKE_OFFSET(lockQueue);
- if (MyProc->waitLock)
- LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0);
- for (;;)
- {
- if (count++ > 2000)
- {
- elog(NOTICE, "DumpLocks: xid loop detected, giving up");
- break;
- }
- /* ---------------------------
- * XXX Here we assume the shared memory queue is circular and
- * that we know its internal structure. Should have some sort of
- * macros to allow one to walk it. mer 20 July 1991
- * ---------------------------
- */
- done = (xidLook->queue.next == end);
- lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
- XID_PRINT_AUX("DumpLocks", xidLook);
- LOCK_PRINT_AUX("DumpLocks", lock, 0);
- if (done)
- break;
- SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
- xidLook = tmp;
- }
- }
- /*
- * Dump all postgres locks. Must have already acquired the masterLock.
- */
- void
- DumpAllLocks()
- {
- SHMEM_OFFSET location;
- PROC *proc;
- XIDLookupEnt *xidLook = NULL;
- LOCK *lock;
- int pid;
- int count = 0;
- int lockmethod = DEFAULT_LOCKMETHOD;
- LOCKMETHODTABLE *lockMethodTable;
- HTAB *xidTable;
- pid = getpid();
- ShmemPIDLookup(pid, &location);
- if (location == INVALID_OFFSET)
- return;
- proc = (PROC *) MAKE_PTR(location);
- if (proc != MyProc)
- return;
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
- if (!lockMethodTable)
- return;
- xidTable = lockMethodTable->xidHash;
- if (MyProc->waitLock)
- LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock, 0);
- hash_seq(NULL);
- while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) &&
- (xidLook != (XIDLookupEnt *) TRUE))
- {
- XID_PRINT_AUX("DumpAllLocks", xidLook);
- if (xidLook->tag.lock)
- {
- lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
- LOCK_PRINT_AUX("DumpAllLocks", lock, 0);
- }
- else
- elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
- if (count++ > 2000)
- {
- elog(NOTICE, "DumpAllLocks: possible loop, giving up");
- break;
- }
- }
- }
- #endif