gparray。py依賴的python包(datetime、copy、traceback、os),依賴的gp包(gplog、utils、db、gpversion、commands。unix)
from datetime import date
import copy
import traceback
from gppylib。utils import checkNotNone, checkIsInt
from gppylib import gplog
from gppylib。db import dbconn
from gppylib。gpversion import GpVersion
from gppylib。commands。unix import *
import os
程式碼分析
QD(Query Dispatcher)包含master和standby master,QE(Query Executor)包含primary和mirror。每個posgres資料庫的資訊使用GpDB物件表示。Segment物件代表primaryDB和其對應的零個、一個或多個mirrorDB。GpArray物件就是master、standbyMaster和多個Segmnet物件的組合。
圖 | GpArray類的成員
GpDB類
GpDB類是單個dbid所指的postgres資料庫例項的配置資訊。其餘成員都可以很好地理解,這裡說說__filespaces成員是存放key為資料庫物件oid,value為其資料庫物件的檔案目錄路徑的字典。因此GpDB類建構函式的datadir是SYSTEM_FILESPACE(oid為3052)所處的檔案路徑。
class GpDB:
def __init__(self, content, preferred_role, dbid, role, mode, status, hostname, address, port, datadir, replicationPort):
self。content=content
self。preferred_role=preferred_role
self。dbid=dbid
self。role=role
self。mode=mode
self。status=status
self。hostname=hostname
self。address=address
self。port=port
self。datadir=datadir
self。replicationPort=replicationPort
# Filespace mappings for this segment
self。__filespaces = { SYSTEM_FILESPACE: datadir } # SYSTEM_FILESPACE oid of the system filespace 3052
# Pending filespace creation
self。__pending_filespace = None
# Catalog directory for each database in this segment
self。catdirs = None
# Todo: Remove old dead code
self。valid = (status == ‘u’)
def __str__(self): # 構造GpDB類可列印的字串表示
def __repr__(self):
fsOids = [oid for oid in self。__filespaces] # 取出__filespaces中所有的key,及資料庫物件對應的oid
fsOids。sort() # sort for determinism
filespaces = []
for fsoid in fsOids:
if fsoid not in [SYSTEM_FILESPACE]:
filespaces。append(“%d:%s” % (fsoid, self。__filespaces[fsoid])) # 以oid:datadir字串為item,放入filespaces
return ‘%d|%d|%s|%s|%s|%s|%s|%s|%d|%s|%s|%s|%s’ % (self。dbid,self。content,self。role,self。preferred_role,self。mode,self。status,self。hostname,self。address,self。port,self。replicationPort,self。datadir,‘,’。join(filespaces),‘,’。join(self。catdirs) if self。catdirs else [])
def __cmp__(self,other): # 使用__reper__函式序列化GpDB物件,並進行比較
def equalIgnoringModeAndStatusAndReplicationPort(self, other): # 如果核心屬性(比如filespace)都相同則返回true,該方法在updateSystemConfig函式呼叫(在移除mirror segment或再新增mirror segmnet時會造成catalog改變)
def copy(self):
def isSegmentQD(self):
def isSegmentMaster(self, current_role=False):
。。。
def isSegmentModeInResynchronization(self):
def getSegmentDbId(self):
def getSegmentContentId(self):
。。。
def getSegmentFilespaces(self):
def setSegmentDbId(self, dbId):
def setSegmentContentId(self, contentId):
。。。
def setSegmentDataDirectory(self, dataDirectory):
def addSegmentFilespace(self, oid, path):
def getSegmentPendingFilespace(self):
@staticmethod
def getDataDirPrefix(datadir):
retValue = “”
retValue = datadir[:datadir。rfind(‘/’)]
return retValue
成員變數createTemplate函式建立GpDB的資訊的模板,第一步確保dstDir有足夠的空間存放segment和其filespace(透過fillespaces中存放的oid和dirpath,查詢各資料庫物件所對應的空間佔用大小);第二步獲取磁碟空閒空間(DiskFree。get_size_local(name = “Check for available free space for segment template”, directory = dstDir));第三步使用LocalDirCopy類物件將segment資料目錄複製到目標目錄dstDir;第四步先判別__filespaces中除了SYSTEM_FILESPACE(oid為3052)之外是否還有其他資料庫物件,如果有,先判別dstDir + “/fs_directory”目錄是否存在,不斷將fillespaces中存放的dirpath中的目錄在目標路徑進行建立,資料庫物件檔案進行複製;第五步,刪除目標路徑下的gp_dbid檔案(dstDir + ‘/gp_dbid’),對dstDir設定0700許可權。
def createTemplate(self, dstDir):
# Make sure we have enough room in the dstDir to fit the segment and its filespaces。
duCmd = DiskUsage(name = “srcDir”, directory = dstDir)
duCmd。run(validateAfter=True)
requiredSize = duCmd。get_bytes_used()
name = “segcopy filespace get_size”
for oid in self。__filespaces:
if oid == SYSTEM_FILESPACE:
continue
dir = self。__filespaces[oid]
duCmd = DiskUsage(name, dir)
duCmd。run(validateAfter=True)
size = duCmd。get_bytes_used()
requiredSize = requiredSize + size
dstBytesAvail = DiskFree。get_size_local(name = “Check for available free space for segment template”, directory = dstDir)
if dstBytesAvail <= requiredSize:
raise Exception(“Not enough space on directory: ‘%s’。 Currently %d bytes free but need %d bytes。” % (dstDir, int(dstBytesAvail), int(requiredSize)))
logger。info(“Starting copy of segment dbid %d to location %s” % (int(self。getSegmentDbId()), dstDir))
cpCmd = LocalDirCopy(“Copy system data directory”, self。getSegmentDataDirectory(), dstDir)
cpCmd。run(validateAfter = True)
res = cpCmd。get_results()
if len(self。__filespaces) > 1:
“”“ Make directory to hold file spaces ”“”
fullPathFsDir = dstDir + “/” + DESTINATION_FILE_SPACES_DIRECTORY # DESTINATION_FILE_SPACES_DIRECTORY = “fs_directory”
cmd = FileDirExists( name = “check for existance of template filespace directory”, directory = fullPathFsDir)
cmd。run(validateAfter = True)
MakeDirectory。local(“gpexpand make directory to hold file spaces”, fullPathFsDir)
for oid in self。__filespaces:
MakeDirectory。local(“gpexpand make directory to hold file space oid: ” + str(oid), fullPathFsDir)
dir = self。__filespaces[oid]
destDir = fullPathFsDir + “/” + str(oid)
MakeDirectory。local(“gpexpand make directory to hold file space: ” + destDir, destDir)
name = “GpSegCopy %s to %s” % (dir, destDir)
cpCmd = LocalDirCopy(name, dir, destDir)
cpCmd。run(validateAfter = True)
res = cpCmd。get_results()
# Remove the gp_dbid file from the data dir
RemoveFile。local(‘Remove gp_dbid file’, os。path。normpath(dstDir + ‘/gp_dbid’))
logger。info(“Cleaning up catalog for schema only copy on destination”)
# We need 700 permissions or postgres won‘t start
Chmod。local(’set template permissions‘, dstDir, ’0700‘)
靜態成員函式initFromString(s)為工廠函式,從字串中初始化GpDB物件,該字串和repr()輸出相容。
@staticmethod
def initFromString(s):
tup = s。strip()。split(’|‘)
# Old format: 8 fields Todo: remove the need for this, or rework it to be cleaner
if len(tup) == 8:
# This describes the gp_configuration catalog (pre 3。4)
content = int(tup[0])
。。。
datadir = tup[7]
# Calculate new fields from old ones
# Note: this should be kept in sync with the code in
# GpArray。InitFromCatalog() code for initializing old catalog
# formats。
preferred_role = ROLE_PRIMARY if definedprimary else ROLE_MIRROR
role = ROLE_PRIMARY if isprimary else ROLE_MIRROR
hostname = None
mode = MODE_SYNCHRONIZED # ???
status = STATUS_UP if valid else STATUS_DOWN
replicationPort = None
filespaces = “”
catdirs = “”
# Catalog 3。4 format: 12 fields
elif len(tup) == 12:
# This describes the gp_segment_configuration catalog (3。4)
dbid = int(tup[0])
。。。
catdirs = “”
# Catalog 4。0+: 13 fields
elif len(tup) == 13:
# This describes the gp_segment_configuration catalog (3。4+)
dbid = int(tup[0])
。。。
catdirs = tup[12]
else:
raise Exception(“GpDB unknown input format: %s” % s)
# Initialize segment without filespace information
gpdb = GpDB(content=content,preferred_role=preferred_role,dbid=dbid,role=role,mode=mode,status=status,hostname=hostname,address=address,port=port,datadir=datadir,replicationPort=replicationPort)
# Add in filespace information, if present
for fs in filespaces。split(“,”):
if fs == “”:
continue
(fsoid, fselocation) = fs。split(“:”)
gpdb。addSegmentFilespace(fsoid, fselocation)
# Add Catalog Dir, if present
gpdb。catdirs = []
for d in catdirs。split(“,”):
if d == “”:
continue
gpdb。catdirs。append(d)
# Return the completed segment
return gpdb
Segment類
Segment類代表相同contentID的SegmentDBs,目前至多一個primary SegDB和單個mirror SegDB,在後續版本中會支援多mirror SegDB。
class Segment:
primaryDB=None #primary (GpDB例項)
mirrorDBs =None
def __init__(self):
self。mirrorDBs = [] #mirror (GpDB例項)
pass
def addPrimary(self,segDB) #設定primary
def addMirror(self,segDB) #追加mirror
def get_dbs(self) #返回Primary和Mirror例項組成的列表(GpDB例項列表)
def get_hosts(self) #返回Primary和Mirror所在主機的主機名的列表
def is_segment_pair_valid(self):
“”“Validates that the primary/mirror pair are in a valid state”“”
for mirror_db in self。mirrorDBs:
prim_status = self。primaryDB。getSegmentStatus()
prim_mode = self。primaryDB。getSegmentMode()
mirror_status = mirror_db。getSegmentStatus()
mirror_role = mirror_db。getSegmentMode()
if (prim_status, prim_mode, mirror_status, mirror_role) not in VALID_SEGMENT_STATES:
return False
return True
primary和mirror對的合法狀態如下,各個欄位含義如下:primaryDB。getSegmentStatus、primaryDB。getSegmentMode、mirror_db。getSegmentStatus、mirror_db。getSegmentMode。
VALID_SEGMENT_STATES = [
(STATUS_UP, MODE_CHANGELOGGING, STATUS_DOWN, MODE_SYNCHRONIZED),
(STATUS_UP, MODE_CHANGELOGGING, STATUS_DOWN, MODE_RESYNCHRONIZATION),
(STATUS_UP, MODE_RESYNCHRONIZATION, STATUS_UP, MODE_RESYNCHRONIZATION),
(STATUS_UP, MODE_SYNCHRONIZED, STATUS_UP, MODE_SYNCHRONIZED)
]
primaryDB狀態為up,模式為CHANGELOGGING,mirrorDB狀態為down,模式可以為SYNCHRONIZED、RESYNCHRONIZATION
primaryDB狀態為up,模式為RESYNCHRONIZATION,mirrorDB狀態為up,模式為RESYNCHRONIZATION
primaryDB狀態為up,模式為SYNCHRONIZED,mirrorDB狀態為up,模式為SYNCHRONIZED
如果要返回primaryDB的主機名,可使用segment1。primaryDB。getSegmentHostName()。
GpArray類
GpArray類建構函式接受包含QD和QE的GpDB的列表segments
class GpArray:
def __init__(self, segments, segmentsAsLoadedFromDb=None, strategyLoadedFromDb=None):
self。master =None #GpDB例項
self。standbyMaster = None #GpDB例項
self。segments = [] #Segment例項列表
self。expansionSegments=[]
self。numPrimarySegments = 0
self。recoveredSegmentDbids = []
self。__version = None
self。__segmentsAsLoadedFromDb = segmentsAsLoadedFromDb
self。__strategyLoadedFromDb = strategyLoadedFromDb
self。__strategy = FAULT_STRATEGY_NONE # FAULT_STRATEGY_NONE = ’n‘ # mirrorless systems 無mirror系統
self。setFilespaces([])
for segdb in segments:
# Handle QD nodes # 處理QD節點
if segdb。isSegmentMaster(True):
if self。master != None:
logger。error(“multiple master dbs defined”)
raise Exception(“GpArray - multiple master dbs defined”)
self。master = segdb
elif segdb。isSegmentStandby(True):
if self。standbyMaster != None:
logger。error(“multiple standby master dbs defined”)
raise Exception(“GpArray - multiple standby master dbs defined”)
self。standbyMaster = segdb
# Handle regular segments # 處理QE節點
elif segdb。isSegmentQE():
if segdb。isSegmentMirror():
self。__strategy = FAULT_STRATEGY_FILE_REPLICATION # FAULT_STRATEGY_FILE_REPLICATION = ’f‘ # valid for versions 4。0+ # 有mirror節點
self。addSegmentDb(segdb)
else:
# Not a master, standbymaster, primary, or mirror?
# shouldn’t even be possible。
logger。error(“FATAL - invalid dbs defined”)
raise Exception(“Error: GpArray() - invalid dbs defined”)
# Make sure we have a master db
if self。master is None:
logger。error(“FATAL - no master dbs defined!”)
raise Exception(“Error: GpArray() - no master dbs defined”)
def __str__(self):
def hasStandbyMaster(self):
def addSegmentDb(self, segdb): # segdb是GpDB例項,向self。segments中加入新的segment或向原有的segment物件新增GpDB例項(addPrimary或addMirror)
def isStandardArray(self):
def is_array_valid(self):
def dumpToFile(self, filename):
def setFaultStrategy(self, strategy):
def getFaultStrategy(self):
。。。。
initFromCatalog從資料庫中獲取GpArray物件的資料成員的資料,形參為資料庫URL,設定utility模式。主要是一些查詢資料庫狀態資訊的SQL,作為DBA需要收集學習這些SQL,以備後續學習運維使用。
@staticmethod
def initFromCatalog(dbURL, utility=False):
conn = dbconn。connect(dbURL, utility)
# Get the version from the database:
version_str = None
for row in dbconn。execSQL(conn, “SELECT version()”):
version_str = row[0]
version = GpVersion(version_str)
if version。getVersionRelease() in (“3。0”, “3。1”, “3。2”, “3。3”):
# In older releases we get the fault strategy using the
# gp_fault_action guc。
strategy_rows = dbconn。execSQL(conn, “show gp_fault_action”)
# Note: Mode may not be “right”, certainly 4。0 concepts of mirroring
# mode do not apply to 3。x, so it depends on how the scripts are
# making use of mode。 For now it is initialized to synchronized。
#
# Note: hostname is initialized to null since the catalog does not
# contain this information。 Initializing a hostcache using the
# resulting gparray will automatically fill in a value for hostname。
#
# Note: this should be kept in sync with the code in
# GpDB。InitFromString() code for initializing old catalog formats。
config_rows = dbconn。execSQL(conn, ‘’‘
SELECT dbid, content,case when isprimary then ’p‘ else ’m‘ end as role,
case when definedprimary then ’p‘ else ’m‘ end as preferred_role,
’s‘ as mode,case when valid then ’u‘ else ’d‘ end as status,
null as hostname,hostname as address,port,null as replication_port,
%s as fsoid,datadir as fselocation FROM pg_catalog。gp_configuration
ORDER BY content, preferred_role DESC
’‘’ % str(SYSTEM_FILESPACE))
# no filespace support in older releases。
filespaceArr = []
else:
strategy_rows = dbconn。execSQL(conn, ‘’‘
SELECT fault_strategy FROM gp_fault_strategy
’‘’)
config_rows = dbconn。execSQL(conn, ‘’‘
SELECT dbid, content, role, preferred_role, mode, status,
hostname, address, port, replication_port, fs。oid,
fselocation
FROM pg_catalog。gp_segment_configuration
JOIN pg_catalog。pg_filespace_entry on (dbid = fsedbid)
JOIN pg_catalog。pg_filespace fs on (fsefsoid = fs。oid)
ORDER BY content, preferred_role DESC, fs。oid
’‘’)
filespaceRows = dbconn。execSQL(conn, ‘’‘
SELECT oid, fsname FROM pg_filespace ORDER BY fsname;
’‘’)
filespaceArr = [GpFilespaceObj(fsRow[0], fsRow[1]) for fsRow in filespaceRows]
# Todo: add checks that all segments should have the same filespaces?
recoveredSegmentDbids = []
segments = []
seg = None
for row in config_rows:
# Extract fields from the row
(dbid, content, role, preferred_role, mode, status, hostname,
address, port, replicationPort, fsoid, fslocation) = row
# If we have segments which have recovered, record them。
if preferred_role != role and content >= 0:
if mode == MODE_SYNCHRONIZED and status == STATUS_UP:
recoveredSegmentDbids。append(dbid)
# The query returns all the filespaces for a segment on separate
# rows。 If this row is the same dbid as the previous row simply
# add this filespace to the existing list, otherwise create a
# new segment。
if seg and seg。getSegmentDbId() == dbid:
seg。addSegmentFilespace(fsoid, fslocation)
else:
seg = GpDB(content, preferred_role, dbid, role, mode, status,
hostname, address, port, fslocation, replicationPort)
segments。append(seg)
datcatloc = dbconn。execSQL(conn, ‘’‘
select fsloc。dbid, fsloc。fselocation || ’/‘ || case when db。dattablespace = 1663
then ’base‘ else db。dattablespace::text end || ’/‘||db。oid as catloc
from pg_Database db, pg_tablespace ts,
(SELECT dbid, fs。oid, fselocation
FROM pg_catalog。gp_segment_configuration
JOIN pg_catalog。pg_filespace_entry on (dbid = fsedbid)
JOIN pg_catalog。pg_filespace fs on (fsefsoid = fs。oid)) fsloc
where db。dattablespace = ts。oid
and ts。spcfsoid = fsloc。oid’‘’)
conn。close()
catlocmap = {}
for row in datcatloc:
if catlocmap。has_key(row[0]):
catlocmap[row[0]]。append(row[1])
else:
catlocmap[row[0]] = [row[1]]
for seg in segments:
seg。catdirs = catlocmap[seg。dbid]
origSegments = [seg。copy() for seg in segments]
if strategy_rows。rowcount == 0:
raise Exception(“Database does not contain gp_fault_strategy entry”)
if strategy_rows。rowcount > 1:
raise Exception(“Database has too many gp_fault_strategy entries”)
strategy = strategy_rows。fetchone()[0]
array = GpArray(segments, origSegments, strategy)
array。__version = version
array。recoveredSegmentDbids = recoveredSegmentDbids
array。setFaultStrategy(strategy) # override the preliminary default `__strategy` with the database state, if available
array。setFilespaces(filespaceArr)
return array
initFromFile函式從檔案中讀取GpArray的資訊,透過GpDB的initFromString函式,並使用GpArray建構函式建立GpArray物件。
@staticmethod
def initFromFile(filename):
segdbs=[]
fp = open(filename, ‘r’)
for line in fp:
segdbs。append(GpDB。initFromString(line))
fp。close()
return GpArray(segdbs)
如何使用
透過gppylib的system資料夾下提供的configurationInterface介面,註冊配置Provider,並初始化Provider,透過呼叫loadSystemConfig函式載入GpArray物件。get_gparray_from_config函式返回GpArray物件。
def get_gparray_from_config():
# imports below, when moved to the top, seem to cause an import error in a unit test because of dependency issue
from gppylib。system import configurationInterface
from gppylib。system import configurationImplGpdb
from gppylib。system。environment import GpMasterEnvironment
master_data_dir = os。environ[‘MASTER_DATA_DIRECTORY’]
gpEnv = GpMasterEnvironment(master_data_dir, False)
configurationInterface。registerConfigurationProvider(configurationImplGpdb。GpConfigurationProviderUsingGpdbCatalog())
confProvider = configurationInterface。getConfigurationProvider()。initializeProvider(gpEnv。getMasterPort())
return confProvider。loadSystemConfig(useUtilityMode=True)
程式碼來自於greenplum-db-5。27。1原始碼