Greenplum Python專用庫gppylib學習——GpArray

gparray。py依賴的python包(datetime、copy、traceback、os),依賴的gp包(gplog、utils、db、gpversion、commands。unix)

from datetime import date

import copy

import traceback

from gppylib。utils import checkNotNone, checkIsInt

from gppylib import gplog

from gppylib。db import dbconn

from gppylib。gpversion import GpVersion

from gppylib。commands。unix import *

import os

程式碼分析

QD(Query Dispatcher)包含master和standby master,QE(Query Executor)包含primary和mirror。每個posgres資料庫的資訊使用GpDB物件表示。Segment物件代表primaryDB和其對應的零個、一個或多個mirrorDB。GpArray物件就是master、standbyMaster和多個Segmnet物件的組合。

Greenplum Python專用庫gppylib學習——GpArray

圖 | GpArray類的成員

GpDB類

GpDB類是單個dbid所指的postgres資料庫例項的配置資訊。其餘成員都可以很好地理解,這裡說說__filespaces成員是存放key為資料庫物件oid,value為其資料庫物件的檔案目錄路徑的字典。因此GpDB類建構函式的datadir是SYSTEM_FILESPACE(oid為3052)所處的檔案路徑。

class GpDB:

def __init__(self, content, preferred_role, dbid, role, mode, status, hostname, address, port, datadir, replicationPort):

self。content=content

self。preferred_role=preferred_role

self。dbid=dbid

self。role=role

self。mode=mode

self。status=status

self。hostname=hostname

self。address=address

self。port=port

self。datadir=datadir

self。replicationPort=replicationPort

# Filespace mappings for this segment

self。__filespaces = { SYSTEM_FILESPACE: datadir } # SYSTEM_FILESPACE oid of the system filespace 3052

# Pending filespace creation

self。__pending_filespace = None

# Catalog directory for each database in this segment

self。catdirs = None

# Todo: Remove old dead code

self。valid = (status == ‘u’)

def __str__(self): # 構造GpDB類可列印的字串表示

def __repr__(self):

fsOids = [oid for oid in self。__filespaces] # 取出__filespaces中所有的key,及資料庫物件對應的oid

fsOids。sort() # sort for determinism

filespaces = []

for fsoid in fsOids:

if fsoid not in [SYSTEM_FILESPACE]:

filespaces。append(“%d:%s” % (fsoid, self。__filespaces[fsoid])) # 以oid:datadir字串為item,放入filespaces

return ‘%d|%d|%s|%s|%s|%s|%s|%s|%d|%s|%s|%s|%s’ % (self。dbid,self。content,self。role,self。preferred_role,self。mode,self。status,self。hostname,self。address,self。port,self。replicationPort,self。datadir,‘,’。join(filespaces),‘,’。join(self。catdirs) if self。catdirs else [])

def __cmp__(self,other): # 使用__reper__函式序列化GpDB物件,並進行比較

def equalIgnoringModeAndStatusAndReplicationPort(self, other): # 如果核心屬性(比如filespace)都相同則返回true,該方法在updateSystemConfig函式呼叫(在移除mirror segment或再新增mirror segmnet時會造成catalog改變)

def copy(self):

def isSegmentQD(self):

def isSegmentMaster(self, current_role=False):

。。。

def isSegmentModeInResynchronization(self):

def getSegmentDbId(self):

def getSegmentContentId(self):

。。。

def getSegmentFilespaces(self):

def setSegmentDbId(self, dbId):

def setSegmentContentId(self, contentId):

。。。

def setSegmentDataDirectory(self, dataDirectory):

def addSegmentFilespace(self, oid, path):

def getSegmentPendingFilespace(self):

@staticmethod

def getDataDirPrefix(datadir):

retValue = “”

retValue = datadir[:datadir。rfind(‘/’)]

return retValue

成員變數createTemplate函式建立GpDB的資訊的模板,第一步確保dstDir有足夠的空間存放segment和其filespace(透過fillespaces中存放的oid和dirpath,查詢各資料庫物件所對應的空間佔用大小);第二步獲取磁碟空閒空間(DiskFree。get_size_local(name = “Check for available free space for segment template”, directory = dstDir));第三步使用LocalDirCopy類物件將segment資料目錄複製到目標目錄dstDir;第四步先判別__filespaces中除了SYSTEM_FILESPACE(oid為3052)之外是否還有其他資料庫物件,如果有,先判別dstDir + “/fs_directory”目錄是否存在,不斷將fillespaces中存放的dirpath中的目錄在目標路徑進行建立,資料庫物件檔案進行複製;第五步,刪除目標路徑下的gp_dbid檔案(dstDir + ‘/gp_dbid’),對dstDir設定0700許可權。

def createTemplate(self, dstDir):

# Make sure we have enough room in the dstDir to fit the segment and its filespaces。

duCmd = DiskUsage(name = “srcDir”, directory = dstDir)

duCmd。run(validateAfter=True)

requiredSize = duCmd。get_bytes_used()

name = “segcopy filespace get_size”

for oid in self。__filespaces:

if oid == SYSTEM_FILESPACE:

continue

dir = self。__filespaces[oid]

duCmd = DiskUsage(name, dir)

duCmd。run(validateAfter=True)

size = duCmd。get_bytes_used()

requiredSize = requiredSize + size

dstBytesAvail = DiskFree。get_size_local(name = “Check for available free space for segment template”, directory = dstDir)

if dstBytesAvail <= requiredSize:

raise Exception(“Not enough space on directory: ‘%s’。 Currently %d bytes free but need %d bytes。” % (dstDir, int(dstBytesAvail), int(requiredSize)))

logger。info(“Starting copy of segment dbid %d to location %s” % (int(self。getSegmentDbId()), dstDir))

cpCmd = LocalDirCopy(“Copy system data directory”, self。getSegmentDataDirectory(), dstDir)

cpCmd。run(validateAfter = True)

res = cpCmd。get_results()

if len(self。__filespaces) > 1:

“”“ Make directory to hold file spaces ”“”

fullPathFsDir = dstDir + “/” + DESTINATION_FILE_SPACES_DIRECTORY # DESTINATION_FILE_SPACES_DIRECTORY = “fs_directory”

cmd = FileDirExists( name = “check for existance of template filespace directory”, directory = fullPathFsDir)

cmd。run(validateAfter = True)

MakeDirectory。local(“gpexpand make directory to hold file spaces”, fullPathFsDir)

for oid in self。__filespaces:

MakeDirectory。local(“gpexpand make directory to hold file space oid: ” + str(oid), fullPathFsDir)

dir = self。__filespaces[oid]

destDir = fullPathFsDir + “/” + str(oid)

MakeDirectory。local(“gpexpand make directory to hold file space: ” + destDir, destDir)

name = “GpSegCopy %s to %s” % (dir, destDir)

cpCmd = LocalDirCopy(name, dir, destDir)

cpCmd。run(validateAfter = True)

res = cpCmd。get_results()

# Remove the gp_dbid file from the data dir

RemoveFile。local(‘Remove gp_dbid file’, os。path。normpath(dstDir + ‘/gp_dbid’))

logger。info(“Cleaning up catalog for schema only copy on destination”)

# We need 700 permissions or postgres won‘t start

Chmod。local(’set template permissions‘, dstDir, ’0700‘)

靜態成員函式initFromString(s)為工廠函式,從字串中初始化GpDB物件,該字串和repr()輸出相容。

@staticmethod

def initFromString(s):

tup = s。strip()。split(’|‘)

# Old format: 8 fields Todo: remove the need for this, or rework it to be cleaner

if len(tup) == 8:

# This describes the gp_configuration catalog (pre 3。4)

content = int(tup[0])

。。。

datadir = tup[7]

# Calculate new fields from old ones

# Note: this should be kept in sync with the code in

# GpArray。InitFromCatalog() code for initializing old catalog

# formats。

preferred_role = ROLE_PRIMARY if definedprimary else ROLE_MIRROR

role = ROLE_PRIMARY if isprimary else ROLE_MIRROR

hostname = None

mode = MODE_SYNCHRONIZED # ???

status = STATUS_UP if valid else STATUS_DOWN

replicationPort = None

filespaces = “”

catdirs = “”

# Catalog 3。4 format: 12 fields

elif len(tup) == 12:

# This describes the gp_segment_configuration catalog (3。4)

dbid = int(tup[0])

。。。

catdirs = “”

# Catalog 4。0+: 13 fields

elif len(tup) == 13:

# This describes the gp_segment_configuration catalog (3。4+)

dbid = int(tup[0])

。。。

catdirs = tup[12]

else:

raise Exception(“GpDB unknown input format: %s” % s)

# Initialize segment without filespace information

gpdb = GpDB(content=content,preferred_role=preferred_role,dbid=dbid,role=role,mode=mode,status=status,hostname=hostname,address=address,port=port,datadir=datadir,replicationPort=replicationPort)

# Add in filespace information, if present

for fs in filespaces。split(“,”):

if fs == “”:

continue

(fsoid, fselocation) = fs。split(“:”)

gpdb。addSegmentFilespace(fsoid, fselocation)

# Add Catalog Dir, if present

gpdb。catdirs = []

for d in catdirs。split(“,”):

if d == “”:

continue

gpdb。catdirs。append(d)

# Return the completed segment

return gpdb

Segment類

Segment類代表相同contentID的SegmentDBs,目前至多一個primary SegDB和單個mirror SegDB,在後續版本中會支援多mirror SegDB。

class Segment:

primaryDB=None #primary (GpDB例項)

mirrorDBs =None

def __init__(self):

self。mirrorDBs = [] #mirror (GpDB例項)

pass

def addPrimary(self,segDB) #設定primary

def addMirror(self,segDB) #追加mirror

def get_dbs(self) #返回Primary和Mirror例項組成的列表(GpDB例項列表)

def get_hosts(self) #返回Primary和Mirror所在主機的主機名的列表

def is_segment_pair_valid(self):

“”“Validates that the primary/mirror pair are in a valid state”“”

for mirror_db in self。mirrorDBs:

prim_status = self。primaryDB。getSegmentStatus()

prim_mode = self。primaryDB。getSegmentMode()

mirror_status = mirror_db。getSegmentStatus()

mirror_role = mirror_db。getSegmentMode()

if (prim_status, prim_mode, mirror_status, mirror_role) not in VALID_SEGMENT_STATES:

return False

return True

primary和mirror對的合法狀態如下,各個欄位含義如下:primaryDB。getSegmentStatus、primaryDB。getSegmentMode、mirror_db。getSegmentStatus、mirror_db。getSegmentMode。

VALID_SEGMENT_STATES = [

(STATUS_UP, MODE_CHANGELOGGING, STATUS_DOWN, MODE_SYNCHRONIZED),

(STATUS_UP, MODE_CHANGELOGGING, STATUS_DOWN, MODE_RESYNCHRONIZATION),

(STATUS_UP, MODE_RESYNCHRONIZATION, STATUS_UP, MODE_RESYNCHRONIZATION),

(STATUS_UP, MODE_SYNCHRONIZED, STATUS_UP, MODE_SYNCHRONIZED)

primaryDB狀態為up,模式為CHANGELOGGING,mirrorDB狀態為down,模式可以為SYNCHRONIZED、RESYNCHRONIZATION

primaryDB狀態為up,模式為RESYNCHRONIZATION,mirrorDB狀態為up,模式為RESYNCHRONIZATION

primaryDB狀態為up,模式為SYNCHRONIZED,mirrorDB狀態為up,模式為SYNCHRONIZED

如果要返回primaryDB的主機名,可使用segment1。primaryDB。getSegmentHostName()。

GpArray類

GpArray類建構函式接受包含QD和QE的GpDB的列表segments

class GpArray:

def __init__(self, segments, segmentsAsLoadedFromDb=None, strategyLoadedFromDb=None):

self。master =None #GpDB例項

self。standbyMaster = None #GpDB例項

self。segments = [] #Segment例項列表

self。expansionSegments=[]

self。numPrimarySegments = 0

self。recoveredSegmentDbids = []

self。__version = None

self。__segmentsAsLoadedFromDb = segmentsAsLoadedFromDb

self。__strategyLoadedFromDb = strategyLoadedFromDb

self。__strategy = FAULT_STRATEGY_NONE # FAULT_STRATEGY_NONE = ’n‘ # mirrorless systems 無mirror系統

self。setFilespaces([])

for segdb in segments:

# Handle QD nodes # 處理QD節點

if segdb。isSegmentMaster(True):

if self。master != None:

logger。error(“multiple master dbs defined”)

raise Exception(“GpArray - multiple master dbs defined”)

self。master = segdb

elif segdb。isSegmentStandby(True):

if self。standbyMaster != None:

logger。error(“multiple standby master dbs defined”)

raise Exception(“GpArray - multiple standby master dbs defined”)

self。standbyMaster = segdb

# Handle regular segments # 處理QE節點

elif segdb。isSegmentQE():

if segdb。isSegmentMirror():

self。__strategy = FAULT_STRATEGY_FILE_REPLICATION # FAULT_STRATEGY_FILE_REPLICATION = ’f‘ # valid for versions 4。0+ # 有mirror節點

self。addSegmentDb(segdb)

else:

# Not a master, standbymaster, primary, or mirror?

# shouldn’t even be possible。

logger。error(“FATAL - invalid dbs defined”)

raise Exception(“Error: GpArray() - invalid dbs defined”)

# Make sure we have a master db

if self。master is None:

logger。error(“FATAL - no master dbs defined!”)

raise Exception(“Error: GpArray() - no master dbs defined”)

def __str__(self):

def hasStandbyMaster(self):

def addSegmentDb(self, segdb): # segdb是GpDB例項,向self。segments中加入新的segment或向原有的segment物件新增GpDB例項(addPrimary或addMirror)

def isStandardArray(self):

def is_array_valid(self):

def dumpToFile(self, filename):

def setFaultStrategy(self, strategy):

def getFaultStrategy(self):

。。。。

initFromCatalog從資料庫中獲取GpArray物件的資料成員的資料,形參為資料庫URL,設定utility模式。主要是一些查詢資料庫狀態資訊的SQL,作為DBA需要收集學習這些SQL,以備後續學習運維使用。

@staticmethod

def initFromCatalog(dbURL, utility=False):

conn = dbconn。connect(dbURL, utility)

# Get the version from the database:

version_str = None

for row in dbconn。execSQL(conn, “SELECT version()”):

version_str = row[0]

version = GpVersion(version_str)

if version。getVersionRelease() in (“3。0”, “3。1”, “3。2”, “3。3”):

# In older releases we get the fault strategy using the

# gp_fault_action guc。

strategy_rows = dbconn。execSQL(conn, “show gp_fault_action”)

# Note: Mode may not be “right”, certainly 4。0 concepts of mirroring

# mode do not apply to 3。x, so it depends on how the scripts are

# making use of mode。 For now it is initialized to synchronized。

#

# Note: hostname is initialized to null since the catalog does not

# contain this information。 Initializing a hostcache using the

# resulting gparray will automatically fill in a value for hostname。

#

# Note: this should be kept in sync with the code in

# GpDB。InitFromString() code for initializing old catalog formats。

config_rows = dbconn。execSQL(conn, ‘’‘

SELECT dbid, content,case when isprimary then ’p‘ else ’m‘ end as role,

case when definedprimary then ’p‘ else ’m‘ end as preferred_role,

’s‘ as mode,case when valid then ’u‘ else ’d‘ end as status,

null as hostname,hostname as address,port,null as replication_port,

%s as fsoid,datadir as fselocation FROM pg_catalog。gp_configuration

ORDER BY content, preferred_role DESC

’‘’ % str(SYSTEM_FILESPACE))

# no filespace support in older releases。

filespaceArr = []

else:

strategy_rows = dbconn。execSQL(conn, ‘’‘

SELECT fault_strategy FROM gp_fault_strategy

’‘’)

config_rows = dbconn。execSQL(conn, ‘’‘

SELECT dbid, content, role, preferred_role, mode, status,

hostname, address, port, replication_port, fs。oid,

fselocation

FROM pg_catalog。gp_segment_configuration

JOIN pg_catalog。pg_filespace_entry on (dbid = fsedbid)

JOIN pg_catalog。pg_filespace fs on (fsefsoid = fs。oid)

ORDER BY content, preferred_role DESC, fs。oid

’‘’)

filespaceRows = dbconn。execSQL(conn, ‘’‘

SELECT oid, fsname FROM pg_filespace ORDER BY fsname;

’‘’)

filespaceArr = [GpFilespaceObj(fsRow[0], fsRow[1]) for fsRow in filespaceRows]

# Todo: add checks that all segments should have the same filespaces?

recoveredSegmentDbids = []

segments = []

seg = None

for row in config_rows:

# Extract fields from the row

(dbid, content, role, preferred_role, mode, status, hostname,

address, port, replicationPort, fsoid, fslocation) = row

# If we have segments which have recovered, record them。

if preferred_role != role and content >= 0:

if mode == MODE_SYNCHRONIZED and status == STATUS_UP:

recoveredSegmentDbids。append(dbid)

# The query returns all the filespaces for a segment on separate

# rows。 If this row is the same dbid as the previous row simply

# add this filespace to the existing list, otherwise create a

# new segment。

if seg and seg。getSegmentDbId() == dbid:

seg。addSegmentFilespace(fsoid, fslocation)

else:

seg = GpDB(content, preferred_role, dbid, role, mode, status,

hostname, address, port, fslocation, replicationPort)

segments。append(seg)

datcatloc = dbconn。execSQL(conn, ‘’‘

select fsloc。dbid, fsloc。fselocation || ’/‘ || case when db。dattablespace = 1663

then ’base‘ else db。dattablespace::text end || ’/‘||db。oid as catloc

from pg_Database db, pg_tablespace ts,

(SELECT dbid, fs。oid, fselocation

FROM pg_catalog。gp_segment_configuration

JOIN pg_catalog。pg_filespace_entry on (dbid = fsedbid)

JOIN pg_catalog。pg_filespace fs on (fsefsoid = fs。oid)) fsloc

where db。dattablespace = ts。oid

and ts。spcfsoid = fsloc。oid’‘’)

conn。close()

catlocmap = {}

for row in datcatloc:

if catlocmap。has_key(row[0]):

catlocmap[row[0]]。append(row[1])

else:

catlocmap[row[0]] = [row[1]]

for seg in segments:

seg。catdirs = catlocmap[seg。dbid]

origSegments = [seg。copy() for seg in segments]

if strategy_rows。rowcount == 0:

raise Exception(“Database does not contain gp_fault_strategy entry”)

if strategy_rows。rowcount > 1:

raise Exception(“Database has too many gp_fault_strategy entries”)

strategy = strategy_rows。fetchone()[0]

array = GpArray(segments, origSegments, strategy)

array。__version = version

array。recoveredSegmentDbids = recoveredSegmentDbids

array。setFaultStrategy(strategy) # override the preliminary default `__strategy` with the database state, if available

array。setFilespaces(filespaceArr)

return array

initFromFile函式從檔案中讀取GpArray的資訊,透過GpDB的initFromString函式,並使用GpArray建構函式建立GpArray物件。

@staticmethod

def initFromFile(filename):

segdbs=[]

fp = open(filename, ‘r’)

for line in fp:

segdbs。append(GpDB。initFromString(line))

fp。close()

return GpArray(segdbs)

如何使用

透過gppylib的system資料夾下提供的configurationInterface介面,註冊配置Provider,並初始化Provider,透過呼叫loadSystemConfig函式載入GpArray物件。get_gparray_from_config函式返回GpArray物件。

def get_gparray_from_config():

# imports below, when moved to the top, seem to cause an import error in a unit test because of dependency issue

from gppylib。system import configurationInterface

from gppylib。system import configurationImplGpdb

from gppylib。system。environment import GpMasterEnvironment

master_data_dir = os。environ[‘MASTER_DATA_DIRECTORY’]

gpEnv = GpMasterEnvironment(master_data_dir, False)

configurationInterface。registerConfigurationProvider(configurationImplGpdb。GpConfigurationProviderUsingGpdbCatalog())

confProvider = configurationInterface。getConfigurationProvider()。initializeProvider(gpEnv。getMasterPort())

return confProvider。loadSystemConfig(useUtilityMode=True)

程式碼來自於greenplum-db-5。27。1原始碼