1 P2P網路
P2P網路用來在網路上傳輸區塊。區塊儲存為一個一個的檔案,所以相當於使用P2P網路來傳輸檔案。P2P網路有①帶伺服器的P2P網路和②沒有伺服器的P2P網路。此處使用的P2P網路參考了https://github。com/zhulinn/P2P-File-System-Python程式碼。將每個節點既設定為伺服器也設定為客戶機。
每個節點可以從其他節點下載區塊,同時自己又能夠為其他節點提供區塊。在產生新的區塊後,將新的區塊新增到列表中。如果有其他的節點請求該區塊,則將該區塊傳送給他。
Server在啟動時,將block資料夾下的所有檔案新增到列表中。
1。1 Code11-P2P
系統在執行過程中會產生交易、區塊等資料。這些資料要透過網路來傳輸。採用P2P網路來進行傳輸,各個節點可以傳送自己的交易和區塊資料,也能從其他節點進行接收。
import socket
import threading
import os
import sys
from collections import defaultdict
import platform
import mimetypes
import time
from pathlib import Path
#獲取目錄下的所有檔名
def GetAllBlocks(path,listName):
if os。path。isdir(path):
for file in os。listdir(path):
filePath = os。path。join(path, file)
if os。path。isdir(filePath):
GetAllBlocks(filePath,listName)
elif os。path。splitext(filePath)[1] == ‘。txt’: #os。path。splitext()函式將路徑拆分為檔名+副檔名
if ‘blockchain’ in os。path。splitext(filePath)[0]:
fpath, tmpFilename = os。path。split(filePath)
#shotname, extension = os。path。splitext(tmpfilename)
listName。append(tmpFilename)
class MyException(Exception):
pass
class Server(object):
def __init__(self, HOST=‘’, PORT=7777, Version=‘1。10’):
self。Host = HOST
self。Port = PORT
self。Version = Version
self。BlockList = []
self。lock = threading。Lock()
# start listenning
def start(self):
try:
self。s = socket。socket(socket。AF_INET, socket。SOCK_STREAM)
self。s。bind((self。Host, self。Port))
self。s。listen(5)
print(‘Server %s %s is listening on port %s’ % (socket。gethostbyname(socket。gethostname()),self。Version,self。Port))
while True:
soc, addr = self。s。accept()
print(‘%s:%s connected’ % (addr[0], addr[1]))
thread = threading。Thread(target=self。handler, args=(soc, addr))
thread。start()
except KeyboardInterrupt:
print(‘\nShutting down the server。。\nGood Bye!’)
try:
sys。exit(0)
except SystemExit:
os。_exit(0)
# connect with a client
def handler(self, soc, addr):
# keep Server received request from client
host = None
port = None
while True:
try:
#sleep(1)
req = soc。recv(1024)。decode()
#print(‘\nServer received request:\n%s’ % req)
lines = req。splitlines()
version = lines[0]。split()[-1]
if version != self。Version:
soc。sendall(str。encode(self。Version + ‘ 505 P2P-CI Version Not Supported\n’))
else:
method = lines[0]。split()[0]
if method == ‘ADD’:
host = lines[1]。split(None, 1)[1]
port = int(lines[2]。split(None, 1)[1])
blockName = lines[0]。split()[-2]
self。addToBlockList(soc, (host, port), blockName)
elif method == ‘LIST’:
self。getAllFromBlockList(soc)
else:
raise AttributeError(‘Method Not Match’)
except ConnectionError:
print(‘%s:%s left’ % (addr[0], addr[1]))
# Clean data if necessary
if host and port:
self。removeFailedConn(host,port)
soc。close()
break
except BaseException:
try:
soc。sendall(str。encode(self。Version + ‘ 400 Bad Request\n’))
except ConnectionError:
print(‘%s:%s left’ % (addr[0], addr[1]))
# Clean data if necessary
if host and port:
self。removeFailedConn(host,port)
soc。close()
break
def removeFailedConn(self, host, port):
self。lock。acquire()
for x in self。BlockList:
if (x[1],x[2]) == (host, port):
self。BlockList。remove(x)
self。lock。release()
def addToBlockList(self, soc, peer, blockName):
self。lock。acquire()
try:
x = [blockName, peer[0], peer[1]]
if x not in self。BlockList:
self。BlockList。append([blockName, peer[0], peer[1]])
finally:
self。lock。release()
header = self。Version + ‘ 200 OK\n’
header += ‘add block %s %s %s\n’ % (blockName, peer[0], peer[1])
soc。sendall(str。encode(header))
def getAllFromBlockList(self, soc):
self。lock。acquire()
try:
if len(self。BlockList) == 0:
header = self。Version + ‘ 404 Not Found\n’
else:
header = self。Version + ‘ 200 OK\n’
for x in self。BlockList:
header += ‘blockchain %s %s %s\n’ % (x[0],x[1],x[2]) #blcokname,hostname,port
finally:
self。lock。release()
soc。sendall(str。encode(header))
class Client(object):
def __init__(self, serverhost=‘localhost’, Version=‘1。10’):
self。SERVER_HOST = serverhost
self。SERVER_PORT = 7777
self。Version = Version
self。RootPath = os。path。abspath(‘。’) #current dir
self。BlockDir = self。RootPath + ‘/blockchain’ # block file directory
Path(self。BlockDir)。mkdir(exist_ok=True)
self。UPLOAD_PORT = None
self。shareable = True
def start(self):
# connect to server
print(‘Connecting to the server %s:%s’ % (self。SERVER_HOST, self。SERVER_PORT))
self。server = socket。socket(socket。AF_INET, socket。SOCK_STREAM)
try:
self。server。connect((self。SERVER_HOST, self。SERVER_PORT))
except Exception:
print(‘Server Not Available。’)
return
print(‘Connected’)
# upload
uploaderProcess = threading。Thread(target=self。initUploadToPeer)
uploaderProcess。start()
while self。UPLOAD_PORT is None:
# wait until upload port is initialized
pass
print(‘Listening on the upload port %s’ % self。UPLOAD_PORT)
#main task
self。mainTask()
def mainTask(self):
listUploaded = [] #已經上傳到server的blocklist
while True:
try:
time。sleep(1)
#將自己擁有的block清單,上傳到server
listLocalBlocName = []
GetAllBlocks(self。BlockDir,listLocalBlocName)
for blocName in listLocalBlocName:
if blocName not in listUploaded:
#不能有空格
if ‘ ’ not in blocName:
self。addBlockToServer(blocName)
listUploaded。append(blocName)
#獲取伺服器上的清單
blockListFromServer = self。getBlockListFromServer()
#如果具有本地沒有的block則下載
for blocDown in blockListFromServer:
if (blocDown[0] not in listLocalBlocName):
s1 = (socket。gethostname(), self。UPLOAD_PORT)
if((blocDown[1],blocDown[2]) != s1):
self。downloadBlockFromPeer(blocDown[0], blocDown[1],blocDown[2]) #download
pass
except Exception as e:
print(e)
except MyException as e:
print(e)
def initUploadToPeer(self):
# listen upload port
self。uploader = socket。socket(socket。AF_INET, socket。SOCK_STREAM)
self。uploader。bind((‘’, 0))
self。UPLOAD_PORT = self。uploader。getsockname()[1]
self。uploader。listen(5)
while self。shareable:
requester, addr = self。uploader。accept()
handler = threading。Thread(target=self。uploadHandler, args=(requester, addr))
handler。start()
self。uploader。close()
def uploadHandler(self, soc, addr):
header = soc。recv(1024)。decode()。splitlines()
try:
version = header[0]。split()[-1]
blockName = header[0]。split()[-2]
method = header[0]。split()[0]
path = ‘%s/%s’ % (self。BlockDir, blockName)
if version != self。Version:
soc。sendall(str。encode(self。Version + ‘ 505 P2P-CI Version Not Supported\n’))
elif not Path(path)。is_file():
soc。sendall(str。encode(self。Version + ‘ 404 Not Found\n’))
elif method == ‘GET’:
header = self。Version + ‘ 200 OK\n’
header += ‘Data: %s\n’ % (time。strftime(“%a, %d %b %Y %H:%M:%S GMT”, time。gmtime()))
header += ‘OS: %s\n’ % (platform。platform())
header += ‘Last-Modified: %s\n’ % (time。strftime(“%a, %d %b %Y %H:%M:%S GMT”, time。gmtime(os。path。getmtime(path))))
header += ‘Content-Length: %s\n’ % (os。path。getsize(path))
header += ‘Content-Type: %s\n’ % (mimetypes。MimeTypes()。guess_type(path)[0])
soc。sendall(header。encode())
# Uploading
try:
print(‘\nUploading。。。’)
send_length = 0
with open(path, ‘r’) as file:
to_send = file。read(1024)
while to_send:
send_length += len(to_send。encode())
soc。sendall(to_send。encode())
to_send = file。read(1024)
except Exception:
raise MyException(‘Uploading Failed’)
print(‘Uploading Completed。’)
else:
raise MyException(‘Bad Request。’)
except Exception as e:
print(e)
except Exception:
soc。sendall(str。encode(self。Version + ‘ 400 P2P Bad Request\n’))
finally:
soc。close()
def addBlockToServer(self, blocName):
msg = ‘ADD blockchain %s %s\n’ % (blocName, self。Version)
msg += ‘Host: %s\n’ % socket。gethostname()
msg += ‘Post: %s\n’ % self。UPLOAD_PORT
self。server。sendall(msg。encode())
res = self。server。recv(1024)。decode()
print(‘Client recieved response: \n%s’ % res)
def getBlockListFromServer(self):
l1 = ‘LIST ALL %s\n’ % self。Version
l2 = ‘Host: %s\n’ % socket。gethostname()
l3 = ‘Post: %s\n’ % self。UPLOAD_PORT
msg = l1 + l2 + l3
self。server。sendall(msg。encode())
res = self。server。recv(1024)。decode()
#print(‘Client recieved response: \n%s’ % res)
listSplit = res。strip()。split(‘\n’)
xList = []
for i in range(1,len(listSplit)):
blockName = listSplit[i]。split(‘ ’)[-3]
peer_host = listSplit[i]。split(‘ ’)[-2]
peer_port = int(listSplit[i]。split(‘ ’)[-1])
xList。append([blockName,peer_host,peer_port])
return xList
def downloadBlockFromPeer(self, blockName, peer_host, peer_port):
try:
# make connnection
soc = socket。socket(socket。AF_INET, socket。SOCK_STREAM)
# connect_ex return errors
if soc。connect_ex((peer_host, peer_port)):
# print(‘Try Local Network。。。’)
# if soc。connect_ex((‘localhost’, peer_port)):
raise MyException(‘Peer Not Available’)
# make request
msg = ‘GET blockchain %s %s\n’ % (blockName, self。Version)
msg += ‘Host: %s\n’ % socket。gethostname()
msg += ‘OS: %s\n’ % platform。platform()
soc。sendall(msg。encode())
# Downloading
header = soc。recv(1024)。decode()
print(‘Recieve response header: \n%s’ % header)
header = header。splitlines()
if header[0]。split()[-2] == ‘200’:
path = ‘%s/%s’ % (self。BlockDir, blockName)
print(‘Downloading。。。’)
try:
with open(path, ‘w’) as file:
content = soc。recv(1024)
while content:
file。write(content。decode())
content = soc。recv(1024)
except Exception:
raise MyException(‘Downloading Failed’)
total_length = int(header[4]。split()[1])
# print(‘write: %s | total: %s’ % (os。path。getsize(path),
# total_length))
if os。path。getsize(path) < total_length:
raise MyException(‘Downloading Failed’)
print(‘Downloading Completed。’)
elif header[0]。split()[1] == ‘400’:
raise MyException(‘Invalid Input。’)
elif header[0]。split()[1] == ‘404’:
raise MyException(‘File Not Available。’)
elif header[0]。split()[1] == ‘500’:
raise MyException(‘Version Not Supported。’)
finally:
soc。close()
def invalid_input(self):
raise MyException(‘Invalid Input。’)
def shutdown(self):
print(‘\nShutting Down。。。’)
self。server。close()
try:
sys。exit(0)
except SystemExit:
os。_exit(0)
req = input(‘\n1: Start server 2: Start client 3:Start Server and Client 4:exit\nEnter your request: ’)
#req = ‘3’
if int(req) == 1 or int(req) == 3:
#s = Server(HOST=‘ZYhome’)
s = Server()
#s。start()
thread = threading。Thread(target=s。start, args=())
thread。start()
time。sleep(2)
if int(req) == 2 or int(req) == 3:
reqServerAdd = input(‘\nInput server address to connect: ’)
if len(reqServerAdd。strip()) == 0:
reqServerAdd = ‘127。0。0。1’
#client = Client(serverhost=‘192。168。1。8’)
client = Client(serverhost=reqServerAdd)
threadClient = threading。Thread(target=client。start, args=())
threadClient。start()
1。2 Code11-P2P Output
各節點可以把自己的區塊共享出去,也可以從其他節點下載自己沒有的區塊。