035 Python實現一個區塊鏈 11 P2P網路

1 P2P網路

P2P網路用來在網路上傳輸區塊。區塊儲存為一個一個的檔案,所以相當於使用P2P網路來傳輸檔案。P2P網路有①帶伺服器的P2P網路和②沒有伺服器的P2P網路。此處使用的P2P網路參考了https://github。com/zhulinn/P2P-File-System-Python程式碼。將每個節點既設定為伺服器也設定為客戶機。

每個節點可以從其他節點下載區塊,同時自己又能夠為其他節點提供區塊。在產生新的區塊後,將新的區塊新增到列表中。如果有其他的節點請求該區塊,則將該區塊傳送給他。

Server在啟動時,將block資料夾下的所有檔案新增到列表中。

1。1 Code11-P2P

系統在執行過程中會產生交易、區塊等資料。這些資料要透過網路來傳輸。採用P2P網路來進行傳輸,各個節點可以傳送自己的交易和區塊資料,也能從其他節點進行接收。

import socket

import threading

import os

import sys

from collections import defaultdict

import platform

import mimetypes

import time

from pathlib import Path

#獲取目錄下的所有檔名

def GetAllBlocks(path,listName):

if os。path。isdir(path):

for file in os。listdir(path):

filePath = os。path。join(path, file)

if os。path。isdir(filePath):

GetAllBlocks(filePath,listName)

elif os。path。splitext(filePath)[1] == ‘。txt’: #os。path。splitext()函式將路徑拆分為檔名+副檔名

if ‘blockchain’ in os。path。splitext(filePath)[0]:

fpath, tmpFilename = os。path。split(filePath)

#shotname, extension = os。path。splitext(tmpfilename)

listName。append(tmpFilename)

class MyException(Exception):

pass

class Server(object):

def __init__(self, HOST=‘’, PORT=7777, Version=‘1。10’):

self。Host = HOST

self。Port = PORT

self。Version = Version

self。BlockList = []

self。lock = threading。Lock()

# start listenning

def start(self):

try:

self。s = socket。socket(socket。AF_INET, socket。SOCK_STREAM)

self。s。bind((self。Host, self。Port))

self。s。listen(5)

print(‘Server %s %s is listening on port %s’ % (socket。gethostbyname(socket。gethostname()),self。Version,self。Port))

while True:

soc, addr = self。s。accept()

print(‘%s:%s connected’ % (addr[0], addr[1]))

thread = threading。Thread(target=self。handler, args=(soc, addr))

thread。start()

except KeyboardInterrupt:

print(‘\nShutting down the server。。\nGood Bye!’)

try:

sys。exit(0)

except SystemExit:

os。_exit(0)

# connect with a client

def handler(self, soc, addr):

# keep Server received request from client

host = None

port = None

while True:

try:

#sleep(1)

req = soc。recv(1024)。decode()

#print(‘\nServer received request:\n%s’ % req)

lines = req。splitlines()

version = lines[0]。split()[-1]

if version != self。Version:

soc。sendall(str。encode(self。Version + ‘ 505 P2P-CI Version Not Supported\n’))

else:

method = lines[0]。split()[0]

if method == ‘ADD’:

host = lines[1]。split(None, 1)[1]

port = int(lines[2]。split(None, 1)[1])

blockName = lines[0]。split()[-2]

self。addToBlockList(soc, (host, port), blockName)

elif method == ‘LIST’:

self。getAllFromBlockList(soc)

else:

raise AttributeError(‘Method Not Match’)

except ConnectionError:

print(‘%s:%s left’ % (addr[0], addr[1]))

# Clean data if necessary

if host and port:

self。removeFailedConn(host,port)

soc。close()

break

except BaseException:

try:

soc。sendall(str。encode(self。Version + ‘ 400 Bad Request\n’))

except ConnectionError:

print(‘%s:%s left’ % (addr[0], addr[1]))

# Clean data if necessary

if host and port:

self。removeFailedConn(host,port)

soc。close()

break

def removeFailedConn(self, host, port):

self。lock。acquire()

for x in self。BlockList:

if (x[1],x[2]) == (host, port):

self。BlockList。remove(x)

self。lock。release()

def addToBlockList(self, soc, peer, blockName):

self。lock。acquire()

try:

x = [blockName, peer[0], peer[1]]

if x not in self。BlockList:

self。BlockList。append([blockName, peer[0], peer[1]])

finally:

self。lock。release()

header = self。Version + ‘ 200 OK\n’

header += ‘add block %s %s %s\n’ % (blockName, peer[0], peer[1])

soc。sendall(str。encode(header))

def getAllFromBlockList(self, soc):

self。lock。acquire()

try:

if len(self。BlockList) == 0:

header = self。Version + ‘ 404 Not Found\n’

else:

header = self。Version + ‘ 200 OK\n’

for x in self。BlockList:

header += ‘blockchain %s %s %s\n’ % (x[0],x[1],x[2]) #blcokname,hostname,port

finally:

self。lock。release()

soc。sendall(str。encode(header))

class Client(object):

def __init__(self, serverhost=‘localhost’, Version=‘1。10’):

self。SERVER_HOST = serverhost

self。SERVER_PORT = 7777

self。Version = Version

self。RootPath = os。path。abspath(‘。’) #current dir

self。BlockDir = self。RootPath + ‘/blockchain’ # block file directory

Path(self。BlockDir)。mkdir(exist_ok=True)

self。UPLOAD_PORT = None

self。shareable = True

def start(self):

# connect to server

print(‘Connecting to the server %s:%s’ % (self。SERVER_HOST, self。SERVER_PORT))

self。server = socket。socket(socket。AF_INET, socket。SOCK_STREAM)

try:

self。server。connect((self。SERVER_HOST, self。SERVER_PORT))

except Exception:

print(‘Server Not Available。’)

return

print(‘Connected’)

# upload

uploaderProcess = threading。Thread(target=self。initUploadToPeer)

uploaderProcess。start()

while self。UPLOAD_PORT is None:

# wait until upload port is initialized

pass

print(‘Listening on the upload port %s’ % self。UPLOAD_PORT)

#main task

self。mainTask()

def mainTask(self):

listUploaded = [] #已經上傳到server的blocklist

while True:

try:

time。sleep(1)

#將自己擁有的block清單,上傳到server

listLocalBlocName = []

GetAllBlocks(self。BlockDir,listLocalBlocName)

for blocName in listLocalBlocName:

if blocName not in listUploaded:

#不能有空格

if ‘ ’ not in blocName:

self。addBlockToServer(blocName)

listUploaded。append(blocName)

#獲取伺服器上的清單

blockListFromServer = self。getBlockListFromServer()

#如果具有本地沒有的block則下載

for blocDown in blockListFromServer:

if (blocDown[0] not in listLocalBlocName):

s1 = (socket。gethostname(), self。UPLOAD_PORT)

if((blocDown[1],blocDown[2]) != s1):

self。downloadBlockFromPeer(blocDown[0], blocDown[1],blocDown[2]) #download

pass

except Exception as e:

print(e)

except MyException as e:

print(e)

def initUploadToPeer(self):

# listen upload port

self。uploader = socket。socket(socket。AF_INET, socket。SOCK_STREAM)

self。uploader。bind((‘’, 0))

self。UPLOAD_PORT = self。uploader。getsockname()[1]

self。uploader。listen(5)

while self。shareable:

requester, addr = self。uploader。accept()

handler = threading。Thread(target=self。uploadHandler, args=(requester, addr))

handler。start()

self。uploader。close()

def uploadHandler(self, soc, addr):

header = soc。recv(1024)。decode()。splitlines()

try:

version = header[0]。split()[-1]

blockName = header[0]。split()[-2]

method = header[0]。split()[0]

path = ‘%s/%s’ % (self。BlockDir, blockName)

if version != self。Version:

soc。sendall(str。encode(self。Version + ‘ 505 P2P-CI Version Not Supported\n’))

elif not Path(path)。is_file():

soc。sendall(str。encode(self。Version + ‘ 404 Not Found\n’))

elif method == ‘GET’:

header = self。Version + ‘ 200 OK\n’

header += ‘Data: %s\n’ % (time。strftime(“%a, %d %b %Y %H:%M:%S GMT”, time。gmtime()))

header += ‘OS: %s\n’ % (platform。platform())

header += ‘Last-Modified: %s\n’ % (time。strftime(“%a, %d %b %Y %H:%M:%S GMT”, time。gmtime(os。path。getmtime(path))))

header += ‘Content-Length: %s\n’ % (os。path。getsize(path))

header += ‘Content-Type: %s\n’ % (mimetypes。MimeTypes()。guess_type(path)[0])

soc。sendall(header。encode())

# Uploading

try:

print(‘\nUploading。。。’)

send_length = 0

with open(path, ‘r’) as file:

to_send = file。read(1024)

while to_send:

send_length += len(to_send。encode())

soc。sendall(to_send。encode())

to_send = file。read(1024)

except Exception:

raise MyException(‘Uploading Failed’)

print(‘Uploading Completed。’)

else:

raise MyException(‘Bad Request。’)

except Exception as e:

print(e)

except Exception:

soc。sendall(str。encode(self。Version + ‘ 400 P2P Bad Request\n’))

finally:

soc。close()

def addBlockToServer(self, blocName):

msg = ‘ADD blockchain %s %s\n’ % (blocName, self。Version)

msg += ‘Host: %s\n’ % socket。gethostname()

msg += ‘Post: %s\n’ % self。UPLOAD_PORT

self。server。sendall(msg。encode())

res = self。server。recv(1024)。decode()

print(‘Client recieved response: \n%s’ % res)

def getBlockListFromServer(self):

l1 = ‘LIST ALL %s\n’ % self。Version

l2 = ‘Host: %s\n’ % socket。gethostname()

l3 = ‘Post: %s\n’ % self。UPLOAD_PORT

msg = l1 + l2 + l3

self。server。sendall(msg。encode())

res = self。server。recv(1024)。decode()

#print(‘Client recieved response: \n%s’ % res)

listSplit = res。strip()。split(‘\n’)

xList = []

for i in range(1,len(listSplit)):

blockName = listSplit[i]。split(‘ ’)[-3]

peer_host = listSplit[i]。split(‘ ’)[-2]

peer_port = int(listSplit[i]。split(‘ ’)[-1])

xList。append([blockName,peer_host,peer_port])

return xList

def downloadBlockFromPeer(self, blockName, peer_host, peer_port):

try:

# make connnection

soc = socket。socket(socket。AF_INET, socket。SOCK_STREAM)

# connect_ex return errors

if soc。connect_ex((peer_host, peer_port)):

# print(‘Try Local Network。。。’)

# if soc。connect_ex((‘localhost’, peer_port)):

raise MyException(‘Peer Not Available’)

# make request

msg = ‘GET blockchain %s %s\n’ % (blockName, self。Version)

msg += ‘Host: %s\n’ % socket。gethostname()

msg += ‘OS: %s\n’ % platform。platform()

soc。sendall(msg。encode())

# Downloading

header = soc。recv(1024)。decode()

print(‘Recieve response header: \n%s’ % header)

header = header。splitlines()

if header[0]。split()[-2] == ‘200’:

path = ‘%s/%s’ % (self。BlockDir, blockName)

print(‘Downloading。。。’)

try:

with open(path, ‘w’) as file:

content = soc。recv(1024)

while content:

file。write(content。decode())

content = soc。recv(1024)

except Exception:

raise MyException(‘Downloading Failed’)

total_length = int(header[4]。split()[1])

# print(‘write: %s | total: %s’ % (os。path。getsize(path),

# total_length))

if os。path。getsize(path) < total_length:

raise MyException(‘Downloading Failed’)

print(‘Downloading Completed。’)

elif header[0]。split()[1] == ‘400’:

raise MyException(‘Invalid Input。’)

elif header[0]。split()[1] == ‘404’:

raise MyException(‘File Not Available。’)

elif header[0]。split()[1] == ‘500’:

raise MyException(‘Version Not Supported。’)

finally:

soc。close()

def invalid_input(self):

raise MyException(‘Invalid Input。’)

def shutdown(self):

print(‘\nShutting Down。。。’)

self。server。close()

try:

sys。exit(0)

except SystemExit:

os。_exit(0)

req = input(‘\n1: Start server 2: Start client 3:Start Server and Client 4:exit\nEnter your request: ’)

#req = ‘3’

if int(req) == 1 or int(req) == 3:

#s = Server(HOST=‘ZYhome’)

s = Server()

#s。start()

thread = threading。Thread(target=s。start, args=())

thread。start()

time。sleep(2)

if int(req) == 2 or int(req) == 3:

reqServerAdd = input(‘\nInput server address to connect: ’)

if len(reqServerAdd。strip()) == 0:

reqServerAdd = ‘127。0。0。1’

#client = Client(serverhost=‘192。168。1。8’)

client = Client(serverhost=reqServerAdd)

threadClient = threading。Thread(target=client。start, args=())

threadClient。start()

1。2 Code11-P2P Output

各節點可以把自己的區塊共享出去,也可以從其他節點下載自己沒有的區塊。