基於 gRPC 和 .NET Core 的伺服器流

原文:bit。ly/3lpz8Ll

作者:Chandan Rauniyar

翻譯:精緻碼農-王亮

早在 2019 年,我寫過《用 Mapbox 繪製位置資料》一文,詳細介紹了我如何透過簡單的檔案上傳,用 Mapbox 繪製約 230 萬個位置點。本文介紹我是如何透過使用 gRPC 和 。NET Core 的伺服器流來快速獲取所有位置歷史資料的。

https://chandankkrr。medium。com/mapping-location-data-with-mapbox-9b256f64d569

什麼是 gRPC

gRPC 是一個現代開源的高效能 RPC 框架,可以在任何環境下執行。它可以有效地連線資料中心內和跨資料中心的服務,並對負載平衡、跟蹤、健康檢查和認證提供可插拔的支援。gRPC 最初是由谷歌建立的,該公司使用一個名為 Stubby 的單一通用 RPC 基礎設施來連線其資料中心內和跨資料中心執行的大量微服務,使用已經超過十年。2015 年 3 月,谷歌決定建立 Stubby 的下一個版本,並將其開源,結果就是現在的 gRPC,被許多企業或組織使用。

https://grpc。io/

gRPC 伺服器流

伺服器流式(Server Streaming)RPC,客戶端向伺服器傳送請求,並獲得一個流來讀取一連串的訊息。客戶端從返回的流中讀取資訊,直到沒有訊息為止。gRPC 保證在單個 RPC 呼叫中的資訊是有序的。

rpc GetLocationData (GetLocationRequest) returns (stream GetLocationResponse);

協議緩衝區(Protobuf)

gRPC 使用協議緩衝區(protocol buffers)作為介面定義語言(IDL)來定義客戶端和伺服器之間的契約。在下面的

proto

檔案中,定義了一個 RPC 方法

GetLocations

,它接收

GetLocationsRequest

訊息型別並返回

GetLocationsResponse

訊息型別。響應訊息型別前面的

stream

關鍵字表示響應是流型別,而不是單個響應。

syntax = “proto3”;option csharp_namespace = “GPRCStreaming”;package location_data;service LocationData { rpc GetLocations (GetLocationsRequest) returns (stream GetLocationsResponse);}message GetLocationsRequest { int32 dataLimit = 1;}message GetLocationsResponse { int32 latitudeE7 = 1; int32 longitudeE7 = 2;}

建立 gRPC 服務

我們可以使用

dotnet new grpc -n threemillion

命令輕鬆建立一個 。NET gRPC 服務。更多關於在 ASP。NET Core 中建立 gRPC 伺服器和客戶端的資訊可在微軟文件中找到。

Create a gRPC client and server in ASP。NET Corehttps://docs。microsoft。com/en-us/aspnet/core/tutorials/grpc/grpc-start?view=aspnetcore-5。0&tabs=visual-studio-code

在添加了 proto 檔案並生成了 gRPC 服務資原始檔後,接下來我添加了

LocationService

類。在下面的程式碼片段中,我有一個

LocationService

類,它繼承了從

Location。proto

檔案中生成的

LocationDataBase

型別。客戶端可以透過

Startup。cs

檔案中

Configure

方法中的

endpoints。MapGrpcService()

來訪問

LocationService

。當伺服器收到

GetLocations

請求時,它首先透過

GetLocationData

方法呼叫讀取 Data 資料夾中

LocationHistory。json

檔案中的所有資料(未包含在原始碼庫)。該方法返回

RootLocation

型別,其中包含

List

型別的

Location

屬性。

Location

類由兩個內部屬性

Longitude

Latitude

組成。接下來,我迴圈瀏覽每個位置,然後將它們寫入

responseStream

中,返回給客戶端。伺服器將訊息寫入流中,直到客戶在

GetLocationRequest

物件中指定的

dataLimit

using System。Threading。Tasks;using Grpc。Core;using Microsoft。Extensions。Logging;using System。IO;using System;using System。Linq;namespace GPRCStreaming{ public class LocationService : LocationData。LocationDataBase { private readonly FileReader _fileReader; private readonly ILogger _logger; public LocationService(FileReader fileReader, ILogger logger) { _fileReader = fileReader; _logger = logger; } public override async Task GetLocations( GetLocationsRequest request, IServerStreamWriter responseStream, ServerCallContext context) { try { _logger。LogInformation(“Incoming request for GetLocationData”); var locationData = await GetLocationData(); var locationDataCount = locationData。Locations。Count; var dataLimit = request。DataLimit > locationDataCount ? locationDataCount : request。DataLimit; for (var i = 0; i <= dataLimit - 1; i++) { var item = locationData。Locations[i]; await responseStream。WriteAsync(new GetLocationsResponse { LatitudeE7 = item。LatitudeE7, LongitudeE7 = item。LongitudeE7 }); } } catch (Exception exception) { _logger。LogError(exception, “Error occurred”); throw; } } private async Task GetLocationData() { var currentDirectory = Directory。GetCurrentDirectory(); var filePath = $“{currentDirectory}/Data/Location_History。json”; var locationData = await _fileReader。ReadAllLinesAsync(filePath); return locationData; } }}

現在,讓我們執行該服務併發送一個請求。我將使用一個叫

grpcurl

的命令列工具,它可以讓你與 gRPC 伺服器互動。它基本上是針對 gRPC 伺服器的

curl

https://github。com/fullstorydev/grpcurl

透過

grpcurl

與 gRPC 端點(endpoint)互動只有在 gRPC 反射服務被啟用時才可用。這允許服務可以被查詢,以發現伺服器上的 gRPC 服務。擴充套件方法

MapGrpcReflectionService

需要引入

Microsoft。AspNetCore。Builder

的名稱空間:

public void Configure(IApplicationBuilder app, IWebHostEnvironment env){ app。UseEndpoints(endpoints => { endpoints。MapGrpcService(); if (env。IsDevelopment()) { endpoints。MapGrpcReflectionService(); } endpoints。MapGet(“/”, async context => { await context。Response。WriteAsync(“Communication with gRPC endpoints must be made through a gRPC client。 To learn how to create a client, visit: https://go。microsoft。com/fwlink/?linkid=2086909”); }); });}

grpcurl -plaintext -d ‘{“dataLimit”: “100000”}’ localhost:80 location_data。LocationData/GetLocations

一旦伺服器收到請求,它就會讀取檔案,然後在位置列表中迴圈,直到達到

dataLimit

計數,並將位置資料返回給客戶端。

基於 gRPC 和 .NET Core 的伺服器流

接下來,讓我們建立一個 Blazor 客戶端來呼叫 gRPC 服務。我們可以使用

IServiceCollection

介面上的

AddGrpcClient

擴充套件方法設定一個 gRPC 客戶端:

public void ConfigureServices(IServiceCollection services){ services。AddRazorPages(); services。AddServerSideBlazor(); services。AddSingleton(); services。AddGrpcClient(client => { client。Address = new Uri(“http://localhost:80”); });}

我使用

Virtualize

Blazor 元件來渲染這些位置。

Virtualize

元件不是一次性渲染列表中的每個專案,只有當前可見的專案才會被渲染。

ASP。NET Core Blazor component virtualizationhttps://docs。microsoft。com/en-us/aspnet/core/blazor/components/virtualization?view=aspnetcore-5。0

相關程式碼:

@page “/locationdata”@using Grpc。Core@using GPRCStreaming@using threemillion。Data@using System。Diagnostics@using Microsoft。AspNetCore。Components。Web。Virtualization@inject IJSRuntime JSRuntime;@inject System。Net。Http。IHttpClientFactory _clientFactory@inject GPRCStreaming。LocationData。LocationDataClient _locationDataClient

Total records: @_locations。Count

Time taken: @_stopWatch。ElapsedMilliseconds milliseconds

Longitude Latitude
@locations。LongitudeE7 @locations。LatitudeE7
@code { private int _dataLimit = 1000; private List _locations = new List(); private Stopwatch _stopWatch = new Stopwatch(); protected override async Task OnInitializedAsync() { await FetchData(); } private async Task FetchData() { ResetState(); _stopWatch。Start(); using (var call = _locationDataClient。GetLocations(new GetLocationsRequest { DataLimit = _dataLimit })) { await foreach (var response in call。ResponseStream。ReadAllAsync()) { _locations。Add(new Location { LongitudeE7 = response。LongitudeE7, LatitudeE7 = response。LatitudeE7 }); StateHasChanged(); } } _stopWatch。Stop(); } private void ResetState() { _locations。Clear(); _stopWatch。Reset(); StateHasChanged(); }}

基於 gRPC 和 .NET Core 的伺服器流

透過在本地執行的流呼叫,從 gRPC 伺服器接收 2,876,679 個單獨的響應大約需要 8 秒鐘。讓我們也在 Mapbox 中載入資料:

@page “/mapbox”@using Grpc。Core@using GPRCStreaming@using System。Diagnostics@inject IJSRuntime JSRuntime;@inject System。Net。Http。IHttpClientFactory _clientFactory@inject GPRCStreaming。LocationData。LocationDataClient LocationDataClient

Total records: @_locations。Count

Time taken: @_stopWatch。ElapsedMilliseconds milliseconds

@code { private int _dataLimit = 100; private List _locations = new List(); private Stopwatch _stopWatch = new Stopwatch(); protected override async Task OnAfterRenderAsync(bool firstRender) { if (!firstRender) { return; } await JSRuntime。InvokeVoidAsync(“mapBoxFunctions。initMapBox”); } private async Task LoadMap() { ResetState(); _stopWatch。Start(); using (var call = LocationDataClient。GetLocations(new GetLocationsRequest { DataLimit = _dataLimit })) { await foreach (var response in call。ResponseStream。ReadAllAsync()) { var pow = Math。Pow(10, 7); var longitude = response。LongitudeE7 / pow; var latitude = response。LatitudeE7 / pow; _locations。Add(new { type = “Feature”, geometry = new { type = “Point”, coordinates = new double[] { longitude, latitude } } }); StateHasChanged(); } _stopWatch。Stop(); await JSRuntime。InvokeVoidAsync(“mapBoxFunctions。addClusterData”, _locations); } } private void ResetState() { JSRuntime。InvokeVoidAsync(“mapBoxFunctions。clearClusterData”); _locations。Clear(); _stopWatch。Reset(); StateHasChanged(); }}

基於 gRPC 和 .NET Core 的伺服器流

原始碼在我的 GitHub 上:

https://github。com/Chandankkrr/threemillion