# CloudWeGo Eino实战案例:构建微服务系统
CloudWeGo Eino作为一款现代化的高性能云原生RPC框架,非常适合构建微服务系统。本文将通过一个实战案例,详细介绍如何使用Eino构建一个完整的微服务系统,包括服务设计、实现、部署和监控等方面,帮助开发者了解如何在实际项目中使用Eino。
## 项目背景
我们将构建一个简单的电商系统,包含以下微服务:
1. **用户服务**:处理用户注册、登录、信息管理等功能
2. **商品服务**:处理商品管理、查询等功能
3. **订单服务**:处理订单创建、查询、状态更新等功能
4. **支付服务**:处理支付流程、回调等功能
5. **网关服务**:作为系统的入口,路由请求到各个微服务
## 技术栈选择
– **RPC框架**:CloudWeGo Eino
– **服务发现**:Consul
– **配置管理**:etcd
– **数据库**:MySQL
– **缓存**:Redis
– **消息队列**:Kafka
– **监控**:Prometheus + Grafana
– **追踪**:Jaeger
– **容器编排**:Kubernetes
## 服务设计
### 1. 接口定义
使用Protobuf定义服务接口:
#### 用户服务接口
“`protobuf
syntax = “proto3”;
package user;
option go_package = “github.com/cloudwego/eino-examples/user”;
service UserService {
rpc Register(RegisterRequest) returns (RegisterResponse);
rpc Login(LoginRequest) returns (LoginResponse);
rpc GetUserInfo(GetUserInfoRequest) returns (GetUserInfoResponse);
rpc UpdateUserInfo(UpdateUserInfoRequest) returns (UpdateUserInfoResponse);
}
message RegisterRequest {
string username = 1;
string password = 2;
string email = 3;
string phone = 4;
}
message RegisterResponse {
int64 user_id = 1;
string token = 2;
}
message LoginRequest {
string username = 1;
string password = 2;
}
message LoginResponse {
int64 user_id = 1;
string token = 2;
}
message GetUserInfoRequest {
int64 user_id = 1;
}
message GetUserInfoResponse {
int64 user_id = 1;
string username = 2;
string email = 3;
string phone = 4;
}
message UpdateUserInfoRequest {
int64 user_id = 1;
string email = 2;
string phone = 3;
}
message UpdateUserInfoResponse {
bool success = 1;
}
“`
#### 商品服务接口
“`protobuf
syntax = “proto3”;
package product;
option go_package = “github.com/cloudwego/eino-examples/product”;
service ProductService {
rpc CreateProduct(CreateProductRequest) returns (CreateProductResponse);
rpc GetProduct(GetProductRequest) returns (GetProductResponse);
rpc ListProducts(ListProductsRequest) returns (ListProductsResponse);
rpc UpdateProduct(UpdateProductRequest) returns (UpdateProductResponse);
rpc DeleteProduct(DeleteProductRequest) returns (DeleteProductResponse);
}
message CreateProductRequest {
string name = 1;
string description = 2;
double price = 3;
int32 stock = 4;
string category = 5;
}
message CreateProductResponse {
int64 product_id = 1;
}
message GetProductRequest {
int64 product_id = 1;
}
message GetProductResponse {
int64 product_id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock = 5;
string category = 6;
}
message ListProductsRequest {
string category = 1;
int32 page = 2;
int32 page_size = 3;
}
message ListProductsResponse {
repeated Product products = 1;
int32 total = 2;
}
message Product {
int64 product_id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock = 5;
string category = 6;
}
message UpdateProductRequest {
int64 product_id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock = 5;
string category = 6;
}
message UpdateProductResponse {
bool success = 1;
}
message DeleteProductRequest {
int64 product_id = 1;
}
message DeleteProductResponse {
bool success = 1;
}
“`
#### 订单服务接口
“`protobuf
syntax = “proto3”;
package order;
option go_package = “github.com/cloudwego/eino-examples/order”;
service OrderService {
rpc CreateOrder(CreateOrderRequest) returns (CreateOrderResponse);
rpc GetOrder(GetOrderRequest) returns (GetOrderResponse);
rpc ListOrders(ListOrdersRequest) returns (ListOrdersResponse);
rpc UpdateOrderStatus(UpdateOrderStatusRequest) returns (UpdateOrderStatusResponse);
}
message CreateOrderRequest {
int64 user_id = 1;
repeated OrderItem items = 2;
string address = 3;
string payment_method = 4;
}
message OrderItem {
int64 product_id = 1;
int32 quantity = 2;
double price = 3;
}
message CreateOrderResponse {
int64 order_id = 1;
string payment_url = 2;
}
message GetOrderRequest {
int64 order_id = 1;
}
message GetOrderResponse {
int64 order_id = 1;
int64 user_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
string status = 5;
string address = 6;
string payment_method = 7;
string created_at = 8;
}
message ListOrdersRequest {
int64 user_id = 1;
int32 page = 2;
int32 page_size = 3;
}
message ListOrdersResponse {
repeated Order orders = 1;
int32 total = 2;
}
message Order {
int64 order_id = 1;
double total_amount = 2;
string status = 3;
string created_at = 4;
}
message UpdateOrderStatusRequest {
int64 order_id = 1;
string status = 2;
}
message UpdateOrderStatusResponse {
bool success = 1;
}
“`
#### 支付服务接口
“`protobuf
syntax = “proto3”;
package payment;
option go_package = “github.com/cloudwego/eino-examples/payment”;
service PaymentService {
rpc CreatePayment(CreatePaymentRequest) returns (CreatePaymentResponse);
rpc HandlePaymentCallback(HandlePaymentCallbackRequest) returns (HandlePaymentCallbackResponse);
rpc GetPaymentStatus(GetPaymentStatusRequest) returns (GetPaymentStatusResponse);
}
message CreatePaymentRequest {
int64 order_id = 1;
double amount = 2;
string payment_method = 3;
string return_url = 4;
}
message CreatePaymentResponse {
string payment_url = 1;
string payment_id = 2;
}
message HandlePaymentCallbackRequest {
string payment_id = 1;
string status = 2;
string transaction_id = 3;
map
}
message HandlePaymentCallbackResponse {
bool success = 1;
}
message GetPaymentStatusRequest {
string payment_id = 1;
}
message GetPaymentStatusResponse {
string status = 1;
string transaction_id = 2;
}
“`
### 2. 服务依赖关系
– **用户服务**:依赖数据库
– **商品服务**:依赖数据库和缓存
– **订单服务**:依赖数据库、用户服务、商品服务和支付服务
– **支付服务**:依赖数据库和订单服务
– **网关服务**:依赖所有其他服务
## 服务实现
### 1. 用户服务实现
“`go
package main
import (
“context”
“log”
“time”
“github.com/cloudwego/eino”
“github.com/cloudwego/eino-examples/user/proto”
“github.com/golang-jwt/jwt/v4”
“gorm.io/driver/mysql”
“gorm.io/gorm”
)
// User 用户模型
type User struct {
ID int64 `gorm:”primaryKey”`
Username string `gorm:”size:50;uniqueIndex”`
Password string `gorm:”size:100″`
Email string `gorm:”size:100;uniqueIndex”`
Phone string `gorm:”size:20″`
CreatedAt time.Time `gorm:”autoCreateTime”`
UpdatedAt time.Time `gorm:”autoUpdateTime”`
}
// UserService 用户服务实现
type UserService struct {
db *gorm.DB
}
// Register 注册用户
func (s *UserService) Register(ctx context.Context, req *proto.RegisterRequest) (*proto.RegisterResponse, error) {
// 检查用户是否已存在
var existingUser User
if err := s.db.Where(“username = ?”, req.Username).First(&existingUser).Error; err == nil {
return nil, errors.New(“user already exists”)
}
// 创建新用户
user := User{
Username: req.Username,
Password: req.Password, // 实际应用中应该加密存储
Email: req.Email,
Phone: req.Phone,
}
if err := s.db.Create(&user).Error; err != nil {
return nil, err
}
// 生成JWT令牌
token, err := generateToken(user.ID)
if err != nil {
return nil, err
}
return &proto.RegisterResponse{
UserId: user.ID,
Token: token,
}, nil
}
// Login 用户登录
func (s *UserService) Login(ctx context.Context, req *proto.LoginRequest) (*proto.LoginResponse, error) {
// 查找用户
var user User
if err := s.db.Where(“username = ?”, req.Username).First(&user).Error; err != nil {
return nil, errors.New(“invalid username or password”)
}
// 验证密码
if user.Password != req.Password { // 实际应用中应该使用密码哈希验证
return nil, errors.New(“invalid username or password”)
}
// 生成JWT令牌
token, err := generateToken(user.ID)
if err != nil {
return nil, err
}
return &proto.LoginResponse{
UserId: user.ID,
Token: token,
}, nil
}
// GetUserInfo 获取用户信息
func (s *UserService) GetUserInfo(ctx context.Context, req *proto.GetUserInfoRequest) (*proto.GetUserInfoResponse, error) {
var user User
if err := s.db.First(&user, req.UserId).Error; err != nil {
return nil, errors.New(“user not found”)
}
return &proto.GetUserInfoResponse{
UserId: user.ID,
Username: user.Username,
Email: user.Email,
Phone: user.Phone,
}, nil
}
// UpdateUserInfo 更新用户信息
func (s *UserService) UpdateUserInfo(ctx context.Context, req *proto.UpdateUserInfoRequest) (*proto.UpdateUserInfoResponse, error) {
var user User
if err := s.db.First(&user, req.UserId).Error; err != nil {
return nil, errors.New(“user not found”)
}
// 更新用户信息
if req.Email != “” {
user.Email = req.Email
}
if req.Phone != “” {
user.Phone = req.Phone
}
if err := s.db.Save(&user).Error; err != nil {
return nil, err
}
return &proto.UpdateUserInfoResponse{Success: true}, nil
}
// generateToken 生成JWT令牌
func generateToken(userID int64) (string, error) {
claims := jwt.MapClaims{
“user_id”: userID,
“exp”: time.Now().Add(time.Hour * 24 * 7).Unix(),
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString([]byte(“your-secret-key”))
}
func main() {
// 连接数据库
db, err := gorm.Open(mysql.Open(“user:password@tcp(localhost:3306)/eino_example?charset=utf8mb4&parseTime=True&loc=Local”), &gorm.Config{})
if err != nil {
log.Fatalf(“Failed to connect to database: %v”, err)
}
// 自动迁移数据库
db.AutoMigrate(&User{})
// 创建服务器
server := eino.NewServer()
proto.RegisterUserService(server, &UserService{db: db})
// 启动服务器
log.Println(“User service starting on :8081”)
if err := server.ListenAndServe(“:8081”); err != nil {
log.Fatalf(“Failed to start server: %v”, err)
}
}
“`
### 2. 商品服务实现
“`go
package main
import (
“context”
“log”
“time”
“github.com/cloudwego/eino”
“github.com/cloudwego/eino-examples/product/proto”
“github.com/redis/go-redis/v9”
“gorm.io/driver/mysql”
“gorm.io/gorm”
)
// Product 商品模型
type Product struct {
ID int64 `gorm:”primaryKey”`
Name string `gorm:”size:100″`
Description string `gorm:”size:500″`
Price float64 `gorm:”type:decimal(10,2)”`
Stock int32 `gorm:”default:0″`
Category string `gorm:”size:50;index”`
CreatedAt time.Time `gorm:”autoCreateTime”`
UpdatedAt time.Time `gorm:”autoUpdateTime”`
}
// ProductService 商品服务实现
type ProductService struct {
db *gorm.DB
redis *redis.Client
}
// CreateProduct 创建商品
func (s *ProductService) CreateProduct(ctx context.Context, req *proto.CreateProductRequest) (*proto.CreateProductResponse, error) {
product := Product{
Name: req.Name,
Description: req.Description,
Price: req.Price,
Stock: req.Stock,
Category: req.Category,
}
if err := s.db.Create(&product).Error; err != nil {
return nil, err
}
return &proto.CreateProductResponse{ProductId: product.ID}, nil
}
// GetProduct 获取商品详情
func (s *ProductService) GetProduct(ctx context.Context, req *proto.GetProductRequest) (*proto.GetProductResponse, error) {
// 尝试从缓存获取
cacheKey := fmt.Sprintf(“product:%d”, req.ProductId)
cachedProduct, err := s.redis.Get(ctx, cacheKey).Result()
if err == nil {
var product proto.GetProductResponse
if err := json.Unmarshal([]byte(cachedProduct), &product); err == nil {
return &product, nil
}
}
// 从数据库获取
var product Product
if err := s.db.First(&product, req.ProductId).Error; err != nil {
return nil, errors.New(“product not found”)
}
response := &proto.GetProductResponse{
ProductId: product.ID,
Name: product.Name,
Description: product.Description,
Price: product.Price,
Stock: product.Stock,
Category: product.Category,
}
// 缓存商品信息
if data, err := json.Marshal(response); err == nil {
s.redis.Set(ctx, cacheKey, data, 10*time.Minute)
}
return response, nil
}
// ListProducts 列出商品
func (s *ProductService) ListProducts(ctx context.Context, req *proto.ListProductsRequest) (*proto.ListProductsResponse, error) {
var products []Product
var total int64
query := s.db.Model(&Product{})
if req.Category != “” {
query = query.Where(“category = ?”, req.Category)
}
// 计算总数
query.Count(&total)
// 分页查询
offset := (req.Page – 1) * req.PageSize
query.Offset(int(offset)).Limit(int(req.PageSize)).Find(&products)
// 构建响应
var productList []*proto.Product
for _, p := range products {
productList = append(productList, &proto.Product{
ProductId: p.ID,
Name: p.Name,
Description: p.Description,
Price: p.Price,
Stock: p.Stock,
Category: p.Category,
})
}
return &proto.ListProductsResponse{
Products: productList,
Total: int32(total),
}, nil
}
// UpdateProduct 更新商品
func (s *ProductService) UpdateProduct(ctx context.Context, req *proto.UpdateProductRequest) (*proto.UpdateProductResponse, error) {
var product Product
if err := s.db.First(&product, req.ProductId).Error; err != nil {
return nil, errors.New(“product not found”)
}
// 更新商品信息
if req.Name != “” {
product.Name = req.Name
}
if req.Description != “” {
product.Description = req.Description
}
if req.Price > 0 {
product.Price = req.Price
}
if req.Stock > 0 {
product.Stock = req.Stock
}
if req.Category != “” {
product.Category = req.Category
}
if err := s.db.Save(&product).Error; err != nil {
return nil, err
}
// 清除缓存
cacheKey := fmt.Sprintf(“product:%d”, req.ProductId)
s.redis.Del(ctx, cacheKey)
return &proto.UpdateProductResponse{Success: true}, nil
}
// DeleteProduct 删除商品
func (s *ProductService) DeleteProduct(ctx context.Context, req *proto.DeleteProductRequest) (*proto.DeleteProductResponse, error) {
if err := s.db.Delete(&Product{}, req.ProductId).Error; err != nil {
return nil, err
}
// 清除缓存
cacheKey := fmt.Sprintf(“product:%d”, req.ProductId)
s.redis.Del(ctx, cacheKey)
return &proto.DeleteProductResponse{Success: true}, nil
}
func main() {
// 连接数据库
db, err := gorm.Open(mysql.Open(“user:password@tcp(localhost:3306)/eino_example?charset=utf8mb4&parseTime=True&loc=Local”), &gorm.Config{})
if err != nil {
log.Fatalf(“Failed to connect to database: %v”, err)
}
// 自动迁移数据库
db.AutoMigrate(&Product{})
// 连接Redis
redisClient := redis.NewClient(&redis.Options{
Addr: “localhost:6379”,
})
// 创建服务器
server := eino.NewServer()
proto.RegisterProductService(server, &ProductService{db: db, redis: redisClient})
// 启动服务器
log.Println(“Product service starting on :8082”)
if err := server.ListenAndServe(“:8082”); err != nil {
log.Fatalf(“Failed to start server: %v”, err)
}
}
“`
### 3. 订单服务实现
“`go
package main
import (
“context”
“fmt”
“log”
“time”
“github.com/cloudwego/eino”
“github.com/cloudwego/eino-examples/order/proto”
“github.com/cloudwego/eino-examples/payment/proto/payment”
“gorm.io/driver/mysql”
“gorm.io/gorm”
)
// Order 订单模型
type Order struct {
ID int64 `gorm:”primaryKey”`
UserID int64 `gorm:”index”`
TotalAmount float64 `gorm:”type:decimal(10,2)”`
Status string `gorm:”size:20;default:’pending'”`
Address string `gorm:”size:200″`
PaymentMethod string `gorm:”size:50″`
CreatedAt time.Time `gorm:”autoCreateTime”`
UpdatedAt time.Time `gorm:”autoUpdateTime”`
}
// OrderItem 订单项模型
type OrderItem struct {
ID int64 `gorm:”primaryKey”`
OrderID int64 `gorm:”index”`
ProductID int64 `gorm:”index”`
Quantity int32 `gorm:”default:1″`
Price float64 `gorm:”type:decimal(10,2)”`
}
// OrderService 订单服务实现
type OrderService struct {
db *gorm.DB
paymentClient payment.PaymentServiceClient
productClient product.ProductServiceClient
userClient user.UserServiceClient
}
// CreateOrder 创建订单
func (s *OrderService) CreateOrder(ctx context.Context, req *proto.CreateOrderRequest) (*proto.CreateOrderResponse, error) {
// 验证用户是否存在
_, err := s.userClient.GetUserInfo(ctx, &user.GetUserInfoRequest{UserId: req.UserId})
if err != nil {
return nil, errors.New(“user not found”)
}
// 验证商品库存
var totalAmount float64
for _, item := range req.Items {
productResp, err := s.productClient.GetProduct(ctx, &product.GetProductRequest{ProductId: item.ProductId})
if err != nil {
return nil, fmt.Errorf(“product not found: %v”, err)
}
if productResp.Stock \u003c item.Quantity {
return nil, fmt.Errorf(“insufficient stock for product %d”, item.ProductId)
}
totalAmount += float64(item.Quantity) * item.Price
}
// 创建订单
order := Order{
UserID: req.UserId,
TotalAmount: totalAmount,
Status: “pending”,
Address: req.Address,
PaymentMethod: req.PaymentMethod,
}
if err := s.db.Create(&order).Error; err != nil {
return nil, err
}
// 创建订单项
for _, item := range req.Items {
orderItem := OrderItem{
OrderID: order.ID,
ProductID: item.ProductId,
Quantity: item.Quantity,
Price: item.Price,
}
if err := s.db.Create(&orderItem).Error; err != nil {
// 回滚订单
s.db.Delete(&order)
return nil, err
}
}
// 创建支付
paymentResp, err := s.paymentClient.CreatePayment(ctx, &payment.CreatePaymentRequest{
OrderId: order.ID,
Amount: totalAmount,
PaymentMethod: req.PaymentMethod,
ReturnUrl: “http://localhost:8080/payment/callback”,
})
if err != nil {
// 回滚订单
s.db.Delete(&order)
return nil, err
}
return &proto.CreateOrderResponse{
OrderId: order.ID,
PaymentUrl: paymentResp.PaymentUrl,
}, nil
}
// GetOrder 获取订单详情
func (s *OrderService) GetOrder(ctx context.Context, req *proto.GetOrderRequest) (*proto.GetOrderResponse, error) {
var order Order
if err := s.db.First(&order, req.OrderId).Error; err != nil {
return nil, errors.New(“order not found”)
}
var orderItems []OrderItem
if err := s.db.Where(“order_id = ?”, req.OrderId).Find(&orderItems).Error; err != nil {
return nil, err
}
var items []*proto.OrderItem
for _, item := range orderItems {
items = append(items, &proto.OrderItem{
ProductId: item.ProductID,
Quantity: item.Quantity,
Price: item.Price,
})
}
return &proto.GetOrderResponse{
OrderId: order.ID,
UserId: order.UserID,
Items: items,
TotalAmount: order.TotalAmount,
Status: order.Status,
Address: order.Address,
PaymentMethod: order.PaymentMethod,
CreatedAt: order.CreatedAt.Format(time.RFC3339),
}, nil
}
// ListOrders 列出订单
func (s *OrderService) ListOrders(ctx context.Context, req *proto.ListOrdersRequest) (*proto.ListOrdersResponse, error) {
var orders []Order
var total int64
query := s.db.Model(&Order{}).Where(“user_id = ?”, req.UserId)
query.Count(&total)
offset := (req.Page – 1) * req.PageSize
query.Offset(int(offset)).Limit(int(req.PageSize)).Order(“created_at DESC”).Find(&orders)
var orderList []*proto.Order
for _, o := range orders {
orderList = append(orderList, &proto.Order{
OrderId: o.ID,
TotalAmount: o.TotalAmount,
Status: o.Status,
CreatedAt: o.CreatedAt.Format(time.RFC3339),
})
}
return &proto.ListOrdersResponse{
Orders: orderList,
Total: int32(total),
}, nil
}
// UpdateOrderStatus 更新订单状态
func (s *OrderService) UpdateOrderStatus(ctx context.Context, req *proto.UpdateOrderStatusRequest) (*proto.UpdateOrderStatusResponse, error) {
var order Order
if err := s.db.First(&order, req.OrderId).Error; err != nil {
return nil, errors.New(“order not found”)
}
order.Status = req.Status
if err := s.db.Save(&order).Error; err != nil {
return nil, err
}
return &proto.UpdateOrderStatusResponse{Success: true}, nil
}
func main() {
// 连接数据库
db, err := gorm.Open(mysql.Open(“user:password@tcp(localhost:3306)/eino_example?charset=utf8mb4&parseTime=True&loc=Local”), &gorm.Config{})
if err != nil {
log.Fatalf(“Failed to connect to database: %v”, err)
}
// 自动迁移数据库
db.AutoMigrate(&Order{}, &OrderItem{})
// 创建客户端
client := eino.NewClient()
paymentClient := payment.NewPaymentServiceClient(client)
productClient := product.NewProductServiceClient(client)
userClient := user.NewUserServiceClient(client)
// 创建服务器
server := eino.NewServer()
proto.RegisterOrderService(server, &OrderService{
db: db,
paymentClient: paymentClient,
productClient: productClient,
userClient: userClient,
})
// 启动服务器
log.Println(“Order service starting on :8083”)
if err := server.ListenAndServe(“:8083”); err != nil {
log.Fatalf(“Failed to start server: %v”, err)
}
}
“`
### 4. 支付服务实现
“`go
package main
import (
“context”
“log”
“time”
“github.com/cloudwego/eino”
“github.com/cloudwego/eino-examples/order/proto/order”
“github.com/cloudwego/eino-examples/payment/proto”
“gorm.io/driver/mysql”
“gorm.io/gorm”
)
// Payment 支付模型
type Payment struct {
ID string `gorm:”primaryKey;size:50″`
OrderID int64 `gorm:”index”`
Amount float64 `gorm:”type:decimal(10,2)”`
PaymentMethod string `gorm:”size:50″`
Status string `gorm:”size:20;default:’pending'”`
TransactionID string `gorm:”size:100″`
CreatedAt time.Time `gorm:”autoCreateTime”`
UpdatedAt time.Time `gorm:”autoUpdateTime”`
}
// PaymentService 支付服务实现
type PaymentService struct {
db *gorm.DB
orderClient order.OrderServiceClient
}
// CreatePayment 创建支付
func (s *PaymentService) CreatePayment(ctx context.Context, req *proto.CreatePaymentRequest) (*proto.CreatePaymentResponse, error) {
// 生成支付ID
paymentID := fmt.Sprintf(“PAY_%d_%d”, req.OrderId, time.Now().Unix())
// 创建支付记录
payment := Payment{
ID: paymentID,
OrderID: req.OrderId,
Amount: req.Amount,
PaymentMethod: req.PaymentMethod,
Status: “pending”,
}
if err := s.db.Create(&payment).Error; err != nil {
return nil, err
}
// 生成支付URL(实际应用中应该调用支付网关API)
paymentURL := fmt.Sprintf(“http://payment-gateway.com/pay?payment_id=%s&amount=%.2f”, paymentID, req.Amount)
return &proto.CreatePaymentResponse{
PaymentUrl: paymentURL,
PaymentId: paymentID,
}, nil
}
// HandlePaymentCallback 处理支付回调
func (s *PaymentService) HandlePaymentCallback(ctx context.Context, req *proto.HandlePaymentCallbackRequest) (*proto.HandlePaymentCallbackResponse, error) {
// 查找支付记录
var payment Payment
if err := s.db.Where(“id = ?”, req.PaymentId).First(&payment).Error; err != nil {
return nil, errors.New(“payment not found”)
}
// 更新支付状态
payment.Status = req.Status
payment.TransactionID = req.TransactionId
if err := s.db.Save(&payment).Error; err != nil {
return nil, err
}
// 更新订单状态
orderStatus := “pending”
if req.Status == “success” {
orderStatus = “paid”
} else if req.Status == “failed” {
orderStatus = “cancelled”
}
_, err := s.orderClient.UpdateOrderStatus(ctx, &order.UpdateOrderStatusRequest{
OrderId: payment.OrderID,
Status: orderStatus,
})
if err != nil {
log.Printf(“Failed to update order status: %v”, err)
}
return &proto.HandlePaymentCallbackResponse{Success: true}, nil
}
// GetPaymentStatus 获取支付状态
func (s *PaymentService) GetPaymentStatus(ctx context.Context, req *proto.GetPaymentStatusRequest) (*proto.GetPaymentStatusResponse, error) {
var payment Payment
if err := s.db.Where(“id = ?”, req.PaymentId).First(&payment).Error; err != nil {
return nil, errors.New(“payment not found”)
}
return &proto.GetPaymentStatusResponse{
Status: payment.Status,
TransactionId: payment.TransactionID,
}, nil
}
func main() {
// 连接数据库
db, err := gorm.Open(mysql.Open(“user:password@tcp(localhost:3306)/eino_example?charset=utf8mb4&parseTime=True&loc=Local”), &gorm.Config{})
if err != nil {
log.Fatalf(“Failed to connect to database: %v”, err)
}
// 自动迁移数据库
db.AutoMigrate(&Payment{})
// 创建客户端
client := eino.NewClient()
orderClient := order.NewOrderServiceClient(client)
// 创建服务器
server := eino.NewServer()
proto.RegisterPaymentService(server, &PaymentService{
db: db,
orderClient: orderClient,
})
// 启动服务器
log.Println(“Payment service starting on :8084”)
if err := server.ListenAndServe(“:8084”); err != nil {
log.Fatalf(“Failed to start server: %v”, err)
}
}
“`
### 5. 网关服务实现
“`go
package main
import (
“context”
“log”
“net/http”
“github.com/cloudwego/eino”
“github.com/cloudwego/eino-examples/user/proto/user”
“github.com/cloudwego/eino-examples/product/proto/product”
“github.com/cloudwego/eino-examples/order/proto/order”
“github.com/cloudwego/eino-examples/payment/proto/payment”
“github.com/cloudwego/hertz/pkg/app”
“github.com/cloudwego/hertz/pkg/app/server”
“github.com/golang-jwt/jwt/v4”
)
// Gateway 网关服务
type Gateway struct {
userClient user.UserServiceClient
productClient product.ProductServiceClient
orderClient order.OrderServiceClient
paymentClient payment.PaymentServiceClient
}
// Register 用户注册
func (g *Gateway) Register(c context.Context, ctx *app.RequestContext) {
var req user.RegisterRequest
if err := ctx.BindAndValidate(&req); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]string{“error”: err.Error()})
return
}
resp, err := g.userClient.Register(c, &req)
if err != nil {
ctx.JSON(http.StatusInternalServerError, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// Login 用户登录
func (g *Gateway) Login(c context.Context, ctx *app.RequestContext) {
var req user.LoginRequest
if err := ctx.BindAndValidate(&req); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]string{“error”: err.Error()})
return
}
resp, err := g.userClient.Login(c, &req)
if err != nil {
ctx.JSON(http.StatusUnauthorized, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// GetUserInfo 获取用户信息
func (g *Gateway) GetUserInfo(c context.Context, ctx *app.RequestContext) {
// 从JWT令牌获取用户ID
userID, err := getUserIDFromToken(ctx)
if err != nil {
ctx.JSON(http.StatusUnauthorized, map[string]string{“error”: “unauthorized”})
return
}
resp, err := g.userClient.GetUserInfo(c, &user.GetUserInfoRequest{UserId: userID})
if err != nil {
ctx.JSON(http.StatusInternalServerError, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// CreateProduct 创建商品
func (g *Gateway) CreateProduct(c context.Context, ctx *app.RequestContext) {
// 验证管理员权限(简化处理)
if !isAdmin(ctx) {
ctx.JSON(http.StatusForbidden, map[string]string{“error”: “forbidden”})
return
}
var req product.CreateProductRequest
if err := ctx.BindAndValidate(&req); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]string{“error”: err.Error()})
return
}
resp, err := g.productClient.CreateProduct(c, &req)
if err != nil {
ctx.JSON(http.StatusInternalServerError, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// GetProduct 获取商品详情
func (g *Gateway) GetProduct(c context.Context, ctx *app.RequestContext) {
productID := ctx.Param(“id”)
id, err := strconv.ParseInt(productID, 10, 64)
if err != nil {
ctx.JSON(http.StatusBadRequest, map[string]string{“error”: “invalid product id”})
return
}
resp, err := g.productClient.GetProduct(c, &product.GetProductRequest{ProductId: id})
if err != nil {
ctx.JSON(http.StatusNotFound, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// ListProducts 列出商品
func (g *Gateway) ListProducts(c context.Context, ctx *app.RequestContext) {
category := ctx.Query(“category”)
page, _ := strconv.Atoi(ctx.DefaultQuery(“page”, “1”))
pageSize, _ := strconv.Atoi(ctx.DefaultQuery(“page_size”, “10”))
resp, err := g.productClient.ListProducts(c, &product.ListProductsRequest{
Category: category,
Page: int32(page),
PageSize: int32(pageSize),
})
if err != nil {
ctx.JSON(http.StatusInternalServerError, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// CreateOrder 创建订单
func (g *Gateway) CreateOrder(c context.Context, ctx *app.RequestContext) {
// 从JWT令牌获取用户ID
userID, err := getUserIDFromToken(ctx)
if err != nil {
ctx.JSON(http.StatusUnauthorized, map[string]string{“error”: “unauthorized”})
return
}
var req order.CreateOrderRequest
if err := ctx.BindAndValidate(&req); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]string{“error”: err.Error()})
return
}
req.UserId = userID
resp, err := g.orderClient.CreateOrder(c, &req)
if err != nil {
ctx.JSON(http.StatusInternalServerError, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// GetOrder 获取订单详情
func (g *Gateway) GetOrder(c context.Context, ctx *app.RequestContext) {
// 从JWT令牌获取用户ID
userID, err := getUserIDFromToken(ctx)
if err != nil {
ctx.JSON(http.StatusUnauthorized, map[string]string{“error”: “unauthorized”})
return
}
orderID := ctx.Param(“id”)
id, err := strconv.ParseInt(orderID, 10, 64)
if err != nil {
ctx.JSON(http.StatusBadRequest, map[string]string{“error”: “invalid order id”})
return
}
resp, err := g.orderClient.GetOrder(c, &order.GetOrderRequest{OrderId: id})
if err != nil {
ctx.JSON(http.StatusNotFound, map[string]string{“error”: err.Error()})
return
}
// 验证订单所属用户
if resp.UserId != userID && !isAdmin(ctx) {
ctx.JSON(http.StatusForbidden, map[string]string{“error”: “forbidden”})
return
}
ctx.JSON(http.StatusOK, resp)
}
// ListOrders 列出订单
func (g *Gateway) ListOrders(c context.Context, ctx *app.RequestContext) {
// 从JWT令牌获取用户ID
userID, err := getUserIDFromToken(ctx)
if err != nil {
ctx.JSON(http.StatusUnauthorized, map[string]string{“error”: “unauthorized”})
return
}
page, _ := strconv.Atoi(ctx.DefaultQuery(“page”, “1”))
pageSize, _ := strconv.Atoi(ctx.DefaultQuery(“page_size”, “10”))
resp, err := g.orderClient.ListOrders(c, &order.ListOrdersRequest{
UserId: userID,
Page: int32(page),
PageSize: int32(pageSize),
})
if err != nil {
ctx.JSON(http.StatusInternalServerError, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// HandlePaymentCallback 处理支付回调
func (g *Gateway) HandlePaymentCallback(c context.Context, ctx *app.RequestContext) {
var req payment.HandlePaymentCallbackRequest
if err := ctx.BindAndValidate(&req); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]string{“error”: err.Error()})
return
}
resp, err := g.paymentClient.HandlePaymentCallback(c, &req)
if err != nil {
ctx.JSON(http.StatusInternalServerError, map[string]string{“error”: err.Error()})
return
}
ctx.JSON(http.StatusOK, resp)
}
// getUserIDFromToken 从JWT令牌获取用户ID
func getUserIDFromToken(ctx *app.RequestContext) (int64, error) {
tokenString := ctx.GetHeader(“Authorization”)
if tokenString == “” {
return 0, errors.New(“missing authorization header”)
}
// 移除Bearer前缀
tokenString = strings.TrimPrefix(tokenString, “Bearer “)
// 解析JWT令牌
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
return []byte(“your-secret-key”), nil
})
if err != nil {
return 0, err
}
if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
userID := int64(claims[“user_id”].(float64))
return userID, nil
}
return 0, errors.New(“invalid token”)
}
// isAdmin 检查是否为管理员
func isAdmin(ctx *app.RequestContext) bool {
// 简化处理,实际应用中应该从用户角色判断
return false
}
func main() {
// 创建客户端
client := eino.NewClient()
userClient := user.NewUserServiceClient(client)
productClient := product.NewProductServiceClient(client)
orderClient := order.NewOrderServiceClient(client)
paymentClient := payment.NewPaymentServiceClient(client)
// 创建网关
gateway := &Gateway{
userClient: userClient,
productClient: productClient,
orderClient: orderClient,
paymentClient: paymentClient,
}
// 创建Hertz服务器
h := server.Default()
// 注册路由
h.POST(“/api/users/register”, gateway.Register)
h.POST(“/api/users/login”, gateway.Login)
h.GET(“/api/users/me”, gateway.GetUserInfo)
h.POST(“/api/products”, gateway.CreateProduct)
h.GET(“/api/products/:id”, gateway.GetProduct)
h.GET(“/api/products”, gateway.ListProducts)
h.POST(“/api/orders”, gateway.CreateOrder)
h.GET(“/api/orders/:id”, gateway.GetOrder)
h.GET(“/api/orders”, gateway.ListOrders)
h.POST(“/api/payments/callback”, gateway.HandlePaymentCallback)
// 启动服务器
log.Println(“Gateway service starting on :8080”)
h.Spin()
}
“`
## 服务部署
### 1. 容器化
为每个服务创建Dockerfile:
“`dockerfile
# 用户服务Dockerfile
FROM golang:1.20-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o user-service .
FROM alpine:latest
RUN apk –no-cache add ca-certificates
WORKDIR /app
COPY –from=builder /app/user-service .
EXPOSE 8081
CMD [“./user-service”]
“`
### 2. Kubernetes部署
创建Kubernetes部署配置:
“`yaml
# user-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
namespace: eino-example
spec:
replicas: 2
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
– name: user-service
image: user-service:latest
ports:
– containerPort: 8081
env:
– name: DATABASE_URL
value: “user:password@tcp(mysql:3306)/eino_example”
—
apiVersion: v1
kind: Service
metadata:
name: user-service
namespace: eino-example
spec:
selector:
app: user-service
ports:
– port: 8081
targetPort: 8081
type: ClusterIP
“`
### 3. 服务发现
使用Consul进行服务发现:
“`yaml
# consul-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: consul
namespace: eino-example
spec:
replicas: 1
selector:
matchLabels:
app: consul
template:
metadata:
labels:
app: consul
spec:
containers:
– name: consul
image: consul:latest
ports:
– containerPort: 8500
—
apiVersion: v1
kind: Service
metadata:
name: consul
namespace: eino-example
spec:
selector:
app: consul
ports:
– port: 8500
targetPort: 8500
type: ClusterIP
“`
### 4. 配置管理
使用etcd进行配置管理:
“`yaml
# etcd-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: etcd
namespace: eino-example
spec:
replicas: 1
selector:
matchLabels:
app: etcd
template:
metadata:
labels:
app: etcd
spec:
containers:
– name: etcd
image: bitnami/etcd:latest
ports:
– containerPort: 2379
env:
– name: ETCD_ROOT_PASSWORD
value: “password”
—
apiVersion: v1
kind: Service
metadata:
name: etcd
namespace: eino-example
spec:
selector:
app: etcd
ports:
– port: 2379
targetPort: 2379
type: ClusterIP
“`
## 监控与可观测性
### 1. 监控
使用Prometheus和Grafana监控服务:
“`yaml
# prometheus-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: prometheus
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: prometheus
template:
metadata:
labels:
app: prometheus
spec:
containers:
– name: prometheus
image: prom/prometheus:latest
ports:
– containerPort: 9090
volumeMounts:
– name: prometheus-config
mountPath: /etc/prometheus
volumes:
– name: prometheus-config
configMap:
name: prometheus-config
—
apiVersion: v1
kind: Service
metadata:
name: prometheus
namespace: monitoring
spec:
selector:
app: prometheus
ports:
– port: 9090
targetPort: 9090
type: ClusterIP
“`
### 2. 追踪
使用Jaeger进行分布式追踪:
“`yaml
# jaeger-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: jaeger
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: jaeger
template:
metadata:
labels:
app: jaeger
spec:
containers:
– name: jaeger
image: jaegertracing/all-in-one:latest
ports:
– containerPort: 16686
– containerPort: 9411
—
apiVersion: v1
kind: Service
metadata:
name: jaeger
namespace: monitoring
spec:
selector:
app: jaeger
ports:
– port: 16686
targetPort: 16686
– port: 9411
targetPort: 9411
type: ClusterIP
“`
## 测试与验证
### 1. 单元测试
为每个服务编写单元测试:
“`go
// user_service_test.go
func TestRegister(t *testing.T) {
// 模拟数据库
db, err := gorm.Open(sqlite.Open(“:memory:”), &gorm.Config{})
if err != nil {
t.Fatalf(“Failed to connect to database: %v”, err)
}
db.AutoMigrate(&User{})
service := &UserService{db: db}
req := &user.RegisterRequest{
Username: “test”,
Password: “password”,
Email: “test@example.com”,
Phone: “1234567890”,
}
resp, err := service.Register(context.Background(), req)
if err != nil {
t.Fatalf(“Register failed: %v”, err)
}
if resp.UserId == 0 {
t.Fatalf(“Expected user ID > 0, got %d”, resp.UserId)
}
if resp.Token == “” {
t.Fatalf(“Expected token, got empty string”)
}
}
“`
### 2. 集成测试
编写集成测试,测试服务间的交互:
“`go
// integration_test.go
func TestCreateOrder(t *testing.T) {
// 启动模拟服务
userService := startMockUserService()
productService := startMockProductService()
paymentService := startMockPaymentService()
orderService := startOrderService(userService, productService, paymentService)
req := &order.CreateOrderRequest{
UserId: 1,
Items: []*order.OrderItem{
{
ProductId: 1,
Quantity: 1,
Price: 100.0,
},
},
Address: “123 Main St”,
PaymentMethod: “credit_card”,
}
resp, err := orderService.CreateOrder(context.Background(), req)
if err != nil {
t.Fatalf(“CreateOrder failed: %v”, err)
}
if resp.OrderId == 0 {
t.Fatalf(“Expected order ID > 0, got %d”, resp.OrderId)
}
if resp.PaymentUrl == “” {
t.Fatalf(“Expected payment URL, got empty string”)
}
}
“`
### 3. 负载测试
使用wrk进行负载测试:
“`bash
# 测试用户服务登录接口
wrk -t12 -c400 -d30s http://localhost:8081/api/users/login –header “Content-Type: application/json” –body ‘{“username”: “test”, “password”: “password”}’
# 测试商品服务列表接口
wrk -t12 -c400 -d30s http://localhost:8082/api/products
# 测试订单服务创建接口
wrk -t12 -c400 -d30s http://localhost:8083/api/orders –header “Content-Type: application/json” –header “Authorization: Bearer
“`
## 总结
通过本文的实战案例,我们展示了如何使用CloudWeGo Eino构建一个完整的微服务系统,包括服务设计、实现、部署和监控等方面。
在实际项目中,开发者可以根据具体的业务需求和技术栈,调整和扩展这个示例系统。Eino的高性能、可靠性和灵活性使其成为构建微服务系统的理想选择。
通过合理的服务设计、模块化的代码结构、完善的监控和可观测性,开发者可以构建出高性能、可靠、可维护的微服务系统,满足现代应用的需求。
同时,Eino的生态系统和社区支持也为开发者提供了丰富的资源和工具,帮助开发者更快地构建和部署微服务系统。
我们相信,随着CloudWeGo Eino的不断发展和完善,它将成为构建云原生微服务系统的重要工具,为开发者提供更好的开发体验和系统性能。