Golang 实战场景汇总:常见业务场景的实现方案

# Golang 实战场景汇总:常见业务场景的实现方案

## 原生锁和分布式锁

### 原生锁

#### 互斥锁

“`go
import “sync”

var mu sync.Mutex

func criticalSection() {
mu.Lock()
defer mu.Unlock()
// 临界区代码
}
“`

#### 读写锁

“`go
import “sync”

var rwmu sync.RWMutex

// 读操作
func readOperation() {
rwmu.RLock()
defer rwmu.RUnlock()
// 读操作代码
}

// 写操作
func writeOperation() {
rwmu.Lock()
defer rwmu.Unlock()
// 写操作代码
}
“`

### 分布式锁

#### 基于 Redis 的分布式锁

“`go
import (
“context”
“fmt”
“time”

“github.com/go-redis/redis/v8”
)

func acquireLock(client *redis.Client, key string, expiration time.Duration) (bool, error) {
ctx := context.Background()
value := fmt.Sprintf(“%d”, time.Now().UnixNano())
result, err := client.SetNX(ctx, key, value, expiration).Result()
return result, err
}

func releaseLock(client *redis.Client, key string) error {
ctx := context.Background()
_, err := client.Del(ctx, key).Result()
return err
}
“`

#### 基于 etcd 的分布式锁

“`go
import (
“context”
“time”

“go.etcd.io/etcd/client/v3”
“go.etcd.io/etcd/client/v3/concurrency”
)

func acquireLock(client *clientv3.Client, key string, ttl int) (*concurrency.Mutex, error) {
ctx := context.Background()
session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl))
if err != nil {
return nil, err
}
mutex := concurrency.NewMutex(session, key)
if err := mutex.Lock(ctx); err != nil {
return nil, err
}
return mutex, nil
}

func releaseLock(mutex *concurrency.Mutex) error {
return mutex.Unlock(context.Background())
}
“`

## 远程资源的读写并发控制

### 并发读取远程资源

“`go
import (
“sync”
“time”
)

func fetchRemoteResource(url string) (string, error) {
// 模拟网络请求
time.Sleep(100 * time.Millisecond)
return “data from ” + url, nil
}

func concurrentFetch(urls []string) []string {
var wg sync.WaitGroup
results := make([]string, len(urls))

for i, url := range urls {
wg.Add(1)
go func(i int, url string) {
defer wg.Done()
data, err := fetchRemoteResource(url)
if err == nil {
results[i] = data
}
}(i, url)
}

wg.Wait()
return results
}
“`

### 并发写入远程资源

“`go
import (
“sync”
“time”
)

func writeRemoteResource(url string, data string) error {
// 模拟网络请求
time.Sleep(100 * time.Millisecond)
return nil
}

func concurrentWrite(urls []string, data string) error {
var wg sync.WaitGroup
var mu sync.Mutex
var err error

for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
if e := writeRemoteResource(url, data); e != nil {
mu.Lock()
if err == nil {
err = e
}
mu.Unlock()
}
}(url)
}

wg.Wait()
return err
}
“`

## 网关实现

### 基本网关结构

“`go
import (
“net/http”
“net/http/httputil”
“net/url”
)

type Gateway struct {
backends []*url.URL
}

func NewGateway(backendURLs []string) (*Gateway, error) {
var backends []*url.URL
for _, urlStr := range backendURLs {
parsedURL, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
backends = append(backends, parsedURL)
}
return &Gateway{backends: backends}, nil
}

func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 简单的轮询负载均衡
backend := g.backends[0]
g.backends = append(g.backends[1:], g.backends[0])

proxy := httputil.NewSingleHostReverseProxy(backend)
proxy.ServeHTTP(w, r)
}

func main() {
backendURLs := []string{
“http://localhost:8081”,
“http://localhost:8082”,
“http://localhost:8083”,
}

gateway, err := NewGateway(backendURLs)
if err != nil {
panic(err)
}

http.ListenAndServe(“:8080”, gateway)
}
“`

### 高级网关功能

“`go
import (
“net/http”
“net/http/httputil”
“net/url”
“time”
)

type Gateway struct {
backends []*url.URL
healthCheck func(*url.URL) bool
}

func (g *Gateway) healthCheckWorker() {
for {
for i, backend := range g.backends {
if !g.healthCheck(backend) {
// 移除不健康的后端
g.backends = append(g.backends[:i], g.backends[i+1:]…)
}
}
time.Sleep(10 * time.Second)
}
}

func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(g.backends) == 0 {
http.Error(w, “No healthy backends available”, http.StatusServiceUnavailable)
return
}

// 轮询负载均衡
backend := g.backends[0]
g.backends = append(g.backends[1:], g.backends[0])

proxy := httputil.NewSingleHostReverseProxy(backend)
proxy.ServeHTTP(w, r)
}
“`

## 数据流转发处理

### 基本数据流转发

“`go
import (
“io”
“net/http”
)

func forwardData(src io.Reader, dst io.Writer) error {
_, err := io.Copy(dst, src)
return err
}

func handler(w http.ResponseWriter, r *http.Request) {
// 从请求体读取数据
// 转发到其他服务
// 将响应写回客户端
client := &http.Client{}
req, err := http.NewRequest(“POST”, “http://target-service/path”, r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

resp, err := client.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()

w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
“`

### 带缓冲的数据流转发

“`go
import (
“bufio”
“io”
“net/http”
)

func forwardDataWithBuffer(src io.Reader, dst io.Writer) error {
bufSrc := bufio.NewReader(src)
bufDst := bufio.NewWriter(dst)

_, err := io.Copy(bufDst, bufSrc)
if err != nil {
return err
}
return bufDst.Flush()
}
“`

## 聊天系统中多种实现方式

### 基于 WebSocket 的聊天系统

“`go
import (
“log”
“net/http”
“sync”

“github.com/gorilla/websocket”
)

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

type Client struct {
conn *websocket.Conn
send chan []byte
}

type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.Mutex
}

func newHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}

func (h *Hub) run() {
for {
select {
case client := <-h.register: h.mu.Lock() h.clients[client] = true h.mu.Unlock() case client := <-h.unregister: h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) } h.mu.Unlock() case message := <-h.broadcast: h.mu.Lock() for client := range h.clients { select { case client.send <- message: default: close(client.send) delete(h.clients, client) } } h.mu.Unlock() } } } func (c *Client) readPump(hub *Hub) { defer func() { hub.unregister <- c c.conn.Close() }() for { _, message, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("error: %v", err) } break } hub.broadcast <- message } } func (c *Client) writePump() { defer func() { c.conn.Close() }() for { message, ok := <-c.send if !ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } c.conn.WriteMessage(websocket.TextMessage, message) } } func handler(hub *Hub) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return } client := &Client{ conn: conn, send: make(chan []byte, 256), } hub.register <- client go client.writePump() go client.readPump(hub) } } func main() { hub := newHub() go hub.run() http.HandleFunc("/ws", handler(hub)) http.ListenAndServe(":8080", nil) } ``` ### 基于消息队列的聊天系统 ```go import ( "log" "net/http" "sync" "github.com/gorilla/websocket" "github.com/streadway/amqp" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } type Client struct { conn *websocket.Conn send chan []byte } type Hub struct { clients map[*Client]bool broadcast chan []byte register chan *Client unregister chan *Client mu sync.Mutex } func newHub() *Hub { return &Hub{ clients: make(map[*Client]bool), broadcast: make(chan []byte), register: make(chan *Client), unregister: make(chan *Client), } } func (h *Hub) run(ch *amqp.Channel, queueName string) { // 消费消息 msgs, err := ch.Consume( queueName, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatalf("Failed to register a consumer: %v", err) } go func() { for d := range msgs { h.broadcast <- d.Body } }() for { select { case client := <-h.register: h.mu.Lock() h.clients[client] = true h.mu.Unlock() case client := <-h.unregister: h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) } h.mu.Unlock() case message := <-h.broadcast: h.mu.Lock() for client := range h.clients { select { case client.send <- message: default: close(client.send) delete(h.clients, client) } } h.mu.Unlock() } } } func (c *Client) readPump(ch *amqp.Channel, queueName string) { defer func() { c.conn.Close() }() for { _, message, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("error: %v", err) } break } // 发布消息到 RabbitMQ err = ch.Publish( "", // exchange queueName, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: message, } ) if err != nil { log.Printf("Failed to publish a message: %v", err) } } } func (c *Client) writePump() { defer func() { c.conn.Close() }() for { message, ok := <-c.send if !ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } c.conn.WriteMessage(websocket.TextMessage, message) } } func main() { // 连接 RabbitMQ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() queueName := "chat_messages" _, err = ch.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } hub := newHub() go hub.run(ch, queueName) http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return } client := &Client{ conn: conn, send: make(chan []byte, 256), } hub.register <- client go client.writePump() go client.readPump(ch, queueName) }) http.ListenAndServe(":8080", nil) } ``` ## 定时任务的处理 ### 基于 time 包的定时任务 ```go import ( "log" "time" ) func runTask() { log.Println("Running task at", time.Now()) } func main() { // 立即执行一次 runTask() // 每隔 5 分钟执行一次 ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { <-ticker.C runTask() } } ``` ### 基于 cron 库的定时任务 ```go import ( "log" "time" "github.com/robfig/cron/v3" ) func runTask() { log.Println("Running task at", time.Now()) } func main() { c := cron.New() // 每隔 5 分钟执行一次 _, err := c.AddFunc("*/5 * * * *", runTask) if err != nil { log.Fatalf("Failed to add cron job: %v", err) } // 每天凌晨 1 点执行 _, err = c.AddFunc("0 1 * * *", func() { log.Println("Running daily task at", time.Now()) }) if err != nil { log.Fatalf("Failed to add cron job: %v", err) } c.Start() defer c.Stop() // 保持程序运行 select {} } ``` ### 分布式定时任务 ```go import ( "context" "log" "time" "github.com/robfig/cron/v3" "github.com/go-redis/redis/v8" ) func runTask(client *redis.Client, taskName string) { ctx := context.Background() // 尝试获取锁 lockKey := "lock:" + taskName gotLock, err := client.SetNX(ctx, lockKey, time.Now().Unix(), 1*time.Minute).Result() if err != nil { log.Printf("Failed to acquire lock: %v", err) return } if !gotLock { log.Println("Another instance is running this task") return } defer client.Del(ctx, lockKey) log.Println("Running task", taskName, "at", time.Now()) // 执行任务 } func main() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) c := cron.New() _, err := c.AddFunc("*/5 * * * *", func() { runTask(client, "every_five_minutes") }) if err != nil { log.Fatalf("Failed to add cron job: %v", err) } c.Start() defer c.Stop() select {} } ``` ## 总结 本文介绍了 Go 语言在实际业务场景中的常见实现方案,包括原生锁和分布式锁、远程资源的读写并发控制、网关实现、数据流转发处理、聊天系统的多种实现方式以及定时任务的处理。这些场景覆盖了 Go 语言在并发、网络、分布式系统等方面的应用。通过合理运用这些实现方案,可以构建高效、可靠的 Go 应用程序。在实际开发中,我们应该根据具体业务需求和系统规模,选择合适的实现方式,以达到最佳的性能和可靠性。

Scroll to Top