# Golang 实战场景汇总:常见业务场景的实现方案
## 原生锁和分布式锁
### 原生锁
#### 互斥锁
“`go
import “sync”
var mu sync.Mutex
func criticalSection() {
mu.Lock()
defer mu.Unlock()
// 临界区代码
}
“`
#### 读写锁
“`go
import “sync”
var rwmu sync.RWMutex
// 读操作
func readOperation() {
rwmu.RLock()
defer rwmu.RUnlock()
// 读操作代码
}
// 写操作
func writeOperation() {
rwmu.Lock()
defer rwmu.Unlock()
// 写操作代码
}
“`
### 分布式锁
#### 基于 Redis 的分布式锁
“`go
import (
“context”
“fmt”
“time”
“github.com/go-redis/redis/v8”
)
func acquireLock(client *redis.Client, key string, expiration time.Duration) (bool, error) {
ctx := context.Background()
value := fmt.Sprintf(“%d”, time.Now().UnixNano())
result, err := client.SetNX(ctx, key, value, expiration).Result()
return result, err
}
func releaseLock(client *redis.Client, key string) error {
ctx := context.Background()
_, err := client.Del(ctx, key).Result()
return err
}
“`
#### 基于 etcd 的分布式锁
“`go
import (
“context”
“time”
“go.etcd.io/etcd/client/v3”
“go.etcd.io/etcd/client/v3/concurrency”
)
func acquireLock(client *clientv3.Client, key string, ttl int) (*concurrency.Mutex, error) {
ctx := context.Background()
session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl))
if err != nil {
return nil, err
}
mutex := concurrency.NewMutex(session, key)
if err := mutex.Lock(ctx); err != nil {
return nil, err
}
return mutex, nil
}
func releaseLock(mutex *concurrency.Mutex) error {
return mutex.Unlock(context.Background())
}
“`
## 远程资源的读写并发控制
### 并发读取远程资源
“`go
import (
“sync”
“time”
)
func fetchRemoteResource(url string) (string, error) {
// 模拟网络请求
time.Sleep(100 * time.Millisecond)
return “data from ” + url, nil
}
func concurrentFetch(urls []string) []string {
var wg sync.WaitGroup
results := make([]string, len(urls))
for i, url := range urls {
wg.Add(1)
go func(i int, url string) {
defer wg.Done()
data, err := fetchRemoteResource(url)
if err == nil {
results[i] = data
}
}(i, url)
}
wg.Wait()
return results
}
“`
### 并发写入远程资源
“`go
import (
“sync”
“time”
)
func writeRemoteResource(url string, data string) error {
// 模拟网络请求
time.Sleep(100 * time.Millisecond)
return nil
}
func concurrentWrite(urls []string, data string) error {
var wg sync.WaitGroup
var mu sync.Mutex
var err error
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
if e := writeRemoteResource(url, data); e != nil {
mu.Lock()
if err == nil {
err = e
}
mu.Unlock()
}
}(url)
}
wg.Wait()
return err
}
“`
## 网关实现
### 基本网关结构
“`go
import (
“net/http”
“net/http/httputil”
“net/url”
)
type Gateway struct {
backends []*url.URL
}
func NewGateway(backendURLs []string) (*Gateway, error) {
var backends []*url.URL
for _, urlStr := range backendURLs {
parsedURL, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
backends = append(backends, parsedURL)
}
return &Gateway{backends: backends}, nil
}
func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 简单的轮询负载均衡
backend := g.backends[0]
g.backends = append(g.backends[1:], g.backends[0])
proxy := httputil.NewSingleHostReverseProxy(backend)
proxy.ServeHTTP(w, r)
}
func main() {
backendURLs := []string{
“http://localhost:8081”,
“http://localhost:8082”,
“http://localhost:8083”,
}
gateway, err := NewGateway(backendURLs)
if err != nil {
panic(err)
}
http.ListenAndServe(“:8080”, gateway)
}
“`
### 高级网关功能
“`go
import (
“net/http”
“net/http/httputil”
“net/url”
“time”
)
type Gateway struct {
backends []*url.URL
healthCheck func(*url.URL) bool
}
func (g *Gateway) healthCheckWorker() {
for {
for i, backend := range g.backends {
if !g.healthCheck(backend) {
// 移除不健康的后端
g.backends = append(g.backends[:i], g.backends[i+1:]…)
}
}
time.Sleep(10 * time.Second)
}
}
func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(g.backends) == 0 {
http.Error(w, “No healthy backends available”, http.StatusServiceUnavailable)
return
}
// 轮询负载均衡
backend := g.backends[0]
g.backends = append(g.backends[1:], g.backends[0])
proxy := httputil.NewSingleHostReverseProxy(backend)
proxy.ServeHTTP(w, r)
}
“`
## 数据流转发处理
### 基本数据流转发
“`go
import (
“io”
“net/http”
)
func forwardData(src io.Reader, dst io.Writer) error {
_, err := io.Copy(dst, src)
return err
}
func handler(w http.ResponseWriter, r *http.Request) {
// 从请求体读取数据
// 转发到其他服务
// 将响应写回客户端
client := &http.Client{}
req, err := http.NewRequest(“POST”, “http://target-service/path”, r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := client.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
“`
### 带缓冲的数据流转发
“`go
import (
“bufio”
“io”
“net/http”
)
func forwardDataWithBuffer(src io.Reader, dst io.Writer) error {
bufSrc := bufio.NewReader(src)
bufDst := bufio.NewWriter(dst)
_, err := io.Copy(bufDst, bufSrc)
if err != nil {
return err
}
return bufDst.Flush()
}
“`
## 聊天系统中多种实现方式
### 基于 WebSocket 的聊天系统
“`go
import (
“log”
“net/http”
“sync”
“github.com/gorilla/websocket”
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Client struct {
conn *websocket.Conn
send chan []byte
}
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.Mutex
}
func newHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.Lock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
h.mu.Unlock()
}
}
}
func (c *Client) readPump(hub *Hub) {
defer func() {
hub.unregister <- c
c.conn.Close()
}()
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
hub.broadcast <- message
}
}
func (c *Client) writePump() {
defer func() {
c.conn.Close()
}()
for {
message, ok := <-c.send
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.conn.WriteMessage(websocket.TextMessage, message)
}
}
func handler(hub *Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{
conn: conn,
send: make(chan []byte, 256),
}
hub.register <- client
go client.writePump()
go client.readPump(hub)
}
}
func main() {
hub := newHub()
go hub.run()
http.HandleFunc("/ws", handler(hub))
http.ListenAndServe(":8080", nil)
}
```
### 基于消息队列的聊天系统
```go
import (
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
"github.com/streadway/amqp"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Client struct {
conn *websocket.Conn
send chan []byte
}
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.Mutex
}
func newHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
func (h *Hub) run(ch *amqp.Channel, queueName string) {
// 消费消息
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
go func() {
for d := range msgs {
h.broadcast <- d.Body
}
}()
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.Lock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
h.mu.Unlock()
}
}
}
func (c *Client) readPump(ch *amqp.Channel, queueName string) {
defer func() {
c.conn.Close()
}()
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// 发布消息到 RabbitMQ
err = ch.Publish(
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: message,
}
)
if err != nil {
log.Printf("Failed to publish a message: %v", err)
}
}
}
func (c *Client) writePump() {
defer func() {
c.conn.Close()
}()
for {
message, ok := <-c.send
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.conn.WriteMessage(websocket.TextMessage, message)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
queueName := "chat_messages"
_, err = ch.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
hub := newHub()
go hub.run(ch, queueName)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{
conn: conn,
send: make(chan []byte, 256),
}
hub.register <- client
go client.writePump()
go client.readPump(ch, queueName)
})
http.ListenAndServe(":8080", nil)
}
```
## 定时任务的处理
### 基于 time 包的定时任务
```go
import (
"log"
"time"
)
func runTask() {
log.Println("Running task at", time.Now())
}
func main() {
// 立即执行一次
runTask()
// 每隔 5 分钟执行一次
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
<-ticker.C
runTask()
}
}
```
### 基于 cron 库的定时任务
```go
import (
"log"
"time"
"github.com/robfig/cron/v3"
)
func runTask() {
log.Println("Running task at", time.Now())
}
func main() {
c := cron.New()
// 每隔 5 分钟执行一次
_, err := c.AddFunc("*/5 * * * *", runTask)
if err != nil {
log.Fatalf("Failed to add cron job: %v", err)
}
// 每天凌晨 1 点执行
_, err = c.AddFunc("0 1 * * *", func() {
log.Println("Running daily task at", time.Now())
})
if err != nil {
log.Fatalf("Failed to add cron job: %v", err)
}
c.Start()
defer c.Stop()
// 保持程序运行
select {}
}
```
### 分布式定时任务
```go
import (
"context"
"log"
"time"
"github.com/robfig/cron/v3"
"github.com/go-redis/redis/v8"
)
func runTask(client *redis.Client, taskName string) {
ctx := context.Background()
// 尝试获取锁
lockKey := "lock:" + taskName
gotLock, err := client.SetNX(ctx, lockKey, time.Now().Unix(), 1*time.Minute).Result()
if err != nil {
log.Printf("Failed to acquire lock: %v", err)
return
}
if !gotLock {
log.Println("Another instance is running this task")
return
}
defer client.Del(ctx, lockKey)
log.Println("Running task", taskName, "at", time.Now())
// 执行任务
}
func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
c := cron.New()
_, err := c.AddFunc("*/5 * * * *", func() {
runTask(client, "every_five_minutes")
})
if err != nil {
log.Fatalf("Failed to add cron job: %v", err)
}
c.Start()
defer c.Stop()
select {}
}
```
## 总结
本文介绍了 Go 语言在实际业务场景中的常见实现方案,包括原生锁和分布式锁、远程资源的读写并发控制、网关实现、数据流转发处理、聊天系统的多种实现方式以及定时任务的处理。这些场景覆盖了 Go 语言在并发、网络、分布式系统等方面的应用。通过合理运用这些实现方案,可以构建高效、可靠的 Go 应用程序。在实际开发中,我们应该根据具体业务需求和系统规模,选择合适的实现方式,以达到最佳的性能和可靠性。