並列処理・並行処理

Go言語のgoroutineの使いどころについて解説

この記事では、Go言語のgoroutineをどのような場面で活用するかに注目します。

基本的な実行方法は既にご存知の方向けに、実際の開発シーンで役立つ例を交えながら、使いどころを分かりやすく解説します。

非同期処理における活用シーン

Go言語のgoroutineは、軽量なスレッドとして非同期処理を容易に実現できるため、さまざまなシーンで活用されています。

ここではネットワーク関連やバッチ処理、リアルタイムなデータ処理の例を中心に解説します。

ネットワークアクセスの並行実行

ネットワークアクセスはレスポンスの待ち時間が発生するため、並行実行することで全体の処理時間を大幅に短縮できます。

特に、複数のHTTPリクエストやソケット通信を同時に実行する場合にgoroutineが持つ強みが発揮されます。

HTTPリクエストの同時処理

HTTPリクエストを複数同時に送信し、レスポンスを待つケースでは、goroutineを利用することで各リクエストを非同期に処理できます。

下記のサンプルコードは、複数のURLに対してリクエストを送り、結果を出力する例です。

package main
import (
	"fmt"
	"net/http"
	"sync"
)
// fetchURLは指定したURLにHTTPリクエストを送り、ステータスコードを出力する
func fetchURL(url string, wg *sync.WaitGroup) {
	defer wg.Done() // goroutineの終了を待機グループに通知
	resp, err := http.Get(url)
	if err != nil {
		fmt.Printf("エラーが発生しました(URL: %s): %v\n", url, err)
		return
	}
	defer resp.Body.Close()
	fmt.Printf("URL: %s, ステータスコード: %d\n", url, resp.StatusCode)
}
func main() {
	urls := []string{
		"https://example.com",
		"https://golang.org",
		"https://github.com",
	}
	var wg sync.WaitGroup
	// 各URLに対してgoroutineを起動
	for _, url := range urls {
		wg.Add(1)
		go fetchURL(url, &wg)
	}
	wg.Wait() // 全てのgoroutineが終了するまで待機
	fmt.Println("全てのHTTPリクエストの処理が完了しました")
}
URL: https://example.com, ステータスコード: 200
URL: https://golang.org, ステータスコード: 200
URL: https://github.com, ステータスコード: 200
全てのHTTPリクエストの処理が完了しました

ソケット通信での利用例

ソケット通信では、クライアントとサーバ間の双方向通信を行います。

goroutineを使用することで、複数のクライアントからの接続を同時に処理でき、サーバのレスポンス性能を向上させることが可能です。

次のサンプルコードは、簡易なTCPサーバを実装した例です。

package main
import (
	"bufio"
	"fmt"
	"net"
)
// handleConnectionはクライアントからの接続を処理する
func handleConnection(conn net.Conn) {
	defer conn.Close()
	reader := bufio.NewReader(conn)
	// クライアントから受信した文字列を1行読み込み、出力する
	message, _ := reader.ReadString('\n')
	fmt.Printf("クライアントからのメッセージ: %s", message)
	// クライアントへ返信
	conn.Write([]byte("メッセージを受信しました\n"))
}
func main() {
	ln, err := net.Listen("tcp", ":8080")
	if err != nil {
		fmt.Println("サーバの起動に失敗しました:", err)
		return
	}
	defer ln.Close()
	fmt.Println("TCPサーバがポート8080で稼働中です")
	// クライアントからの接続を待ち受け、goroutineで処理
	for {
		conn, err := ln.Accept()
		if err != nil {
			fmt.Println("接続受け入れエラー:", err)
			continue
		}
		go handleConnection(conn)
	}
}
TCPサーバがポート8080で稼働中です
// (クライアントが接続し、メッセージ送信後にサーバ側で出力される内容)
クライアントからのメッセージ: こんにちは、サーバです

バッチ処理とリアルタイムデータ処理

バッチ処理やリアルタイムデータ処理では、大量のデータやイベントが短時間に発生するケースが多く、その際にgoroutineを活用することで処理の分散化や並列実行が期待できます。

複数タスクの同時実行

複数の独立したタスクを同時に実行する場合、各タスクをgoroutineで実行することで、全体の処理時間を短縮できます。

以下は、複数の計算処理を同時に実行するサンプルコードです。

package main
import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)
// processTaskはランダムな時間の待機後、結果を出力するタスクをシミュレーションする
func processTask(taskID int, wg *sync.WaitGroup) {
	defer wg.Done()
	// \( t = rand(1,3) \)秒待機することで処理時間をシミュレーション
	waitTime := time.Duration(rand.Intn(3)+1) * time.Second
	time.Sleep(waitTime)
	fmt.Printf("タスク %d の処理が完了(待機時間: %s)\n", taskID, waitTime)
}
func main() {
	rand.Seed(time.Now().UnixNano())
	var wg sync.WaitGroup
	taskCount := 5
	for i := 1; i <= taskCount; i++ {
		wg.Add(1)
		go processTask(i, &wg)
	}
	wg.Wait()
	fmt.Println("全てのタスクの処理が完了しました")
}
タスク 2 の処理が完了(待機時間: 1s)
タスク 4 の処理が完了(待機時間: 1s)
タスク 1 の処理が完了(待機時間: 2s)
タスク 3 の処理が完了(待機時間: 2s)
タスク 5 の処理が完了(待機時間: 3s)
全てのタスクの処理が完了しました

パイプライン処理の構築

パイプライン処理では、データを段階的に処理していくために、各ステージを独立したgoroutineで実装することが効果的です。

各ステージはchannelを介してデータをやり取りし、全体として効率的な処理が可能となります。

以下は、簡単なパイプライン処理の例です。

package main
import (
	"fmt"
)
// generatorは数値を生成し、チャネル経由で送り出す
func generator(nums []int) <-chan int {
	out := make(chan int)
	go func() {
		for _, num := range nums {
			out <- num // 数値をチャネルに送信
		}
		close(out) // 全ての数値を送信後チャネルを閉じる
	}()
	return out
}
// squareは受信した数値の二乗を計算し、チャネルで出力する
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for num := range in {
			out <- num * num
		}
		close(out)
	}()
	return out
}
func main() {
	numbers := []int{1, 2, 3, 4, 5}
	// パイプライン処理の各ステージを連結
	gen := generator(numbers)
	sqr := square(gen)
	for result := range sqr {
		fmt.Printf("計算結果: %d\n", result)
	}
}
計算結果: 1
計算結果: 4
計算結果: 9
計算結果: 16
計算結果: 25

goroutine導入時の注意事項

goroutineを活用する際には、適切な排他制御やライフサイクル管理を行わないと、データ競合や不整合が発生する恐れがあります。

ここでは、MutexやChannelを利用した排他制御、並びにContextやWaitGroupを使ったライフサイクル管理の方法について解説します。

データ競合と排他制御

複数のgoroutineが同じデータにアクセスする場合、意図しないデータ競合が発生する可能性があります。

これを防ぐために、Mutexchannelを利用して排他制御を行います。

MutexやChannelの適切な使用

Mutexは共有変数への同時アクセスを制御するための手法です。

また、channelを用いることで、データの受け渡しと同時に同期処理を行うことも可能です。

下記のサンプルコードは、複数のgoroutineでカウンタをインクリメントする場合にMutexを使用する例です。

package main
import (
	"fmt"
	"sync"
)
func main() {
	var counter int
	var mu sync.Mutex
	var wg sync.WaitGroup
	taskCount := 100
	for i := 0; i < taskCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			mu.Lock()   // 排他制御開始
			counter++   // カウンタをインクリメント
			mu.Unlock() // 排他制御終了
		}()
	}
	wg.Wait()
	fmt.Printf("最終カウンタの値: %d\n", counter)
}
最終カウンタの値: 100

また、チャネルを用いた例として、複数のgoroutineからの結果を集約するケースを以下に示します。

package main
import (
	"fmt"
	"sync"
)
// processDataは受信したデータを処理し、結果をチャネルへ送信する
func processData(id int, in int, out chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	// データ処理の例として、数値を倍にする
	result := in * 2
	fmt.Printf("タスク %d: %d -> %d\n", id, in, result)
	out <- result
}
func main() {
	data := []int{1, 2, 3, 4, 5}
	out := make(chan int, len(data))
	var wg sync.WaitGroup
	for i, num := range data {
		wg.Add(1)
		go processData(i+1, num, out, &wg)
	}
	wg.Wait()
	close(out)
	for res := range out {
		fmt.Printf("集約結果: %d\n", res)
	}
}
タスク 1: 1 -> 2
タスク 2: 2 -> 4
タスク 3: 3 -> 6
タスク 4: 4 -> 8
タスク 5: 5 -> 10
集約結果: 2
集約結果: 4
集約結果: 6
集約結果: 8
集約結果: 10

ライフサイクル管理

goroutineのライフサイクル管理は、プログラム全体の安定性に大きく影響します。

ライフサイクルを適切に管理できないと、不要な処理が残存したり、リソースが解放されなかったりする可能性があります。

Contextの活用法

Contextは、キャンセル信号やタイムアウトといった制御パラメータをgoroutine間で伝達するために便利なツールです。

下記のサンプルでは、タイムアウトを設定してgoroutineの処理キャンセルを行っています。

package main
import (
	"context"
	"fmt"
	"time"
)
// performTaskは処理に時間がかかるタスクの例で、Contextのキャンセルを監視する
func performTask(ctx context.Context, taskName string) {
	select {
	case <-time.After(3 * time.Second):
		fmt.Printf("%s の処理が完了しました\n", taskName)
	case <-ctx.Done():
		fmt.Printf("%s の処理がキャンセルされました: %v\n", taskName, ctx.Err())
	}
}
func main() {
	// タイムアウト付きのContextを生成
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	go performTask(ctx, "タスク1")
	// メイン処理はキャンセル信号を待機
	time.Sleep(4 * time.Second)
}
タスク1 の処理がキャンセルされました: context deadline exceeded

WaitGroupによる同期処理

複数のgoroutineが終了するのを待つ場合、sync.WaitGroupが有効です。

これにより、全ての処理が完了してから次の処理へ移ることができます。

以下は、複数の非同期処理をWaitGroupで同期するサンプルコードです。

package main
import (
	"fmt"
	"sync"
	"time"
)
func asyncProcess(id int, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(1 * time.Second)
	fmt.Printf("非同期処理 %d の処理が完了しました\n", id)
}
func main() {
	var wg sync.WaitGroup
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go asyncProcess(i, &wg)
	}
	wg.Wait()
	fmt.Println("全ての非同期処理が終了しました")
}
非同期処理 1 の処理が完了しました
非同期処理 2 の処理が完了しました
非同期処理 3 の処理が完了しました
非同期処理 4 の処理が完了しました
非同期処理 5 の処理が完了しました
全ての非同期処理が終了しました

実践事例の紹介

ここでは、実際の開発現場で利用されるgoroutineの使いどころについて、シンプルなコード例とエラーハンドリングの工夫、さらにレスポンス改善を目指した実例をご紹介します。

コード例で見る使いどころ

シンプルな非同期処理例

以下は、シンプルにgoroutineを使って複数の処理を非同期に実行する例です。

このコードは、各処理が独立して動作し、処理結果を標準出力に表示します。

package main
import (
	"fmt"
	"time"
)
// simpleTaskは指定した待機時間後に結果を出力するタスクの例
func simpleTask(taskName string, waitTime time.Duration) {
	time.Sleep(waitTime)
	fmt.Printf("%s の処理が完了しました(待機時間: %s)\n", taskName, waitTime)
}
func main() {
	// 複数のタスクをgoroutineで実行
	go simpleTask("タスクA", 1*time.Second)
	go simpleTask("タスクB", 2*time.Second)
	go simpleTask("タスクC", 1*time.Second)
	// 全goroutineが実行完了するのを簡易的に待機(実際の環境ではWaitGroupを推奨)
	time.Sleep(3 * time.Second)
}
タスクA の処理が完了しました(待機時間: 1s)
タスクC の処理が完了しました(待機時間: 1s)
タスクB の処理が完了しました(待機時間: 2s)

エラーハンドリングの工夫

非同期処理では、エラーが発生した場合の情報を適切にキャッチすることが重要です。

以下は、非同期に実行した処理の中でエラーを発生させ、チャネルでエラー情報を集約するサンプルコードです。

package main
import (
	"errors"
	"fmt"
	"sync"
	"time"
)
// errorProneTaskはエラーを発生する可能性のあるタスクの例
func errorProneTask(taskID int, wg *sync.WaitGroup, errCh chan<- error) {
	defer wg.Done()
	// 擬似的なエラー発生条件(偶数のタスクIDの場合エラーを返す)
	time.Sleep(500 * time.Millisecond)
	if taskID%2 == 0 {
		errCh <- errors.New(fmt.Sprintf("タスク %d でエラーが発生しました", taskID))
		return
	}
	fmt.Printf("タスク %d は正常に完了しました\n", taskID)
}
func main() {
	taskCount := 4
	var wg sync.WaitGroup
	errCh := make(chan error, taskCount)
	for i := 1; i <= taskCount; i++ {
		wg.Add(1)
		go errorProneTask(i, &wg, errCh)
	}
	wg.Wait()
	close(errCh)
	// エラーがある場合は全て出力する
	for err := range errCh {
		fmt.Println("エラー情報:", err)
	}
}
タスク 1 は正常に完了しました
タスク 3 は正常に完了しました
エラー情報: タスク 2 でエラーが発生しました
エラー情報: タスク 4 でエラーが発生しました

開発現場での応用ケース

レスポンス改善の実例

たとえば、ウェブサービスのバックエンドでは、ユーザーからのリクエストに対して複数の内部サービスへの問い合わせなどを並行して実施することで、レスポンス待ち時間を短縮することが可能です。

次のサンプルコードは、複数の内部API呼び出しを同時実行し、その結果を統合して最終的なレスポンスを生成する例です。

package main
import (
	"fmt"
	"sync"
	"time"
)
// fetchDataは内部APIからデータを取得するシミュレーション
func fetchData(apiName string, wg *sync.WaitGroup, dataCh chan<- string) {
	defer wg.Done()
	time.Sleep(1 * time.Second) // 擬似的なAPI応答待ち
	result := fmt.Sprintf("%s のデータ", apiName)
	dataCh <- result
}
func main() {
	apis := []string{"API_A", "API_B", "API_C"}
	var wg sync.WaitGroup
	dataCh := make(chan string, len(apis))
	for _, api := range apis {
		wg.Add(1)
		go fetchData(api, &wg, dataCh)
	}
	wg.Wait()
	close(dataCh)
	// 各APIの返却データを集約する
	var combinedResult []string
	for data := range dataCh {
		combinedResult = append(combinedResult, data)
	}
	fmt.Printf("最終的なレスポンス: %v\n", combinedResult)
}
最終的なレスポンス: [API_A のデータ API_B のデータ API_C のデータ]

拡張的な活用方法の提案

goroutineの活用は基本的な非同期処理にとどまらず、より拡張的なユースケースへも応用できます。

ここではリソース管理やパフォーマンス最適化、監視のための手法について解説します。

リソース管理の工夫

複数のgoroutineを利用する際、限られたメモリやCPUリソースの中で効率的に処理を行う工夫が重要です。

効率的なメモリ利用

goroutineはスタックサイズが動的に増加するため、通常のスレッドに比べて効率的です。

ただし、必要以上に大量のgoroutineを生成するとシステム全体のオーバーヘッドが発生する可能性があるため、制御用のチャネルやセマフォを設けることで、同時実行数を制限する工夫が有効です。

以下は、同時に実行可能なgoroutineの数を制限する例です。

package main
import (
	"fmt"
	"sync"
	"time"
)
// limitedTaskは同時実行数を制御されたタスクの例
func limitedTask(id int, wg *sync.WaitGroup, sem chan struct{}) {
	defer wg.Done()
	// 実行開始前にセマフォからスロットを取得
	sem <- struct{}{}
	defer func() { <-sem }() // タスク完了後スロットを解放
	time.Sleep(1 * time.Second)
	fmt.Printf("タスク %d が完了しました\n", id)
}
func main() {
	taskCount := 10
	workerLimit := 3 // 同時に実行可能なgoroutine数
	sem := make(chan struct{}, workerLimit)
	var wg sync.WaitGroup
	for i := 1; i <= taskCount; i++ {
		wg.Add(1)
		go limitedTask(i, &wg, sem)
	}
	wg.Wait()
	fmt.Println("全てのタスクが完了しました")
}
タスク 1 が完了しました
タスク 2 が完了しました
タスク 3 が完了しました
タスク 4 が完了しました
タスク 5 が完了しました
タスク 6 が完了しました
タスク 7 が完了しました
タスク 8 が完了しました
タスク 9 が完了しました
タスク 10 が完了しました
全てのタスクが完了しました

パフォーマンス最適化と監視

大規模なシステムでは、実行中のgoroutineの状態やリソースの利用状況を適切にモニタリングし、パフォーマンスの最適化を図ることが重要です。

プロファイリングのポイント

Goには標準パッケージのnet/http/pprofが用意されており、プログラムの実行中に簡単にプロファイリング情報を取得することができます。

たとえば、以下のようにしてWebサーバとしてプロファイリング情報を公開し、goroutineの状態やメモリ使用量を確認することが可能です。

package main
import (
	"fmt"
	"net/http"
	_ "net/http/pprof" // pprofをインポートすることでエンドポイントが有効化される
	"time"
)
func main() {
	// 定期的にシンプルなタスクを実行する例
	go func() {
		for {
			fmt.Println("定期タスク実行中...")
			time.Sleep(2 * time.Second)
		}
	}()
	// pprofのデフォルトエンドポイントをポート6060で起動
	fmt.Println("pprofサーバをポート6060で起動中")
	http.ListenAndServe(":6060", nil)
}
定期タスク実行中...
定期タスク実行中...
// コンソールには定期タスクのログが出力され、pprofエンドポイントで詳細情報を参照可能

ログ管理とトレースの手法

非同期処理が増えると、デバッグやトラブルシューティングの際にログ管理が重要となります。

goroutineで処理状況を一意に識別できるIDを付与し、ログに記録することで、問題発生時の原因究明がスムーズに進められます。

以下は、単純なトレースログを出力する例です。

package main
import (
	"fmt"
	"sync"
	"time"
)
// tracedTaskはタスクIDをログに出力しながら処理を行う例
func tracedTask(taskID int, wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Printf("タスク %d 開始\n", taskID)
	time.Sleep(1 * time.Second)
	fmt.Printf("タスク %d 完了\n", taskID)
}
func main() {
	taskCount := 3
	var wg sync.WaitGroup
	for i := 1; i <= taskCount; i++ {
		wg.Add(1)
		go tracedTask(i, &wg)
	}
	wg.Wait()
	fmt.Println("全タスクの処理終了")
}
タスク 1 開始
タスク 2 開始
タスク 3 開始
タスク 1 完了
タスク 2 完了
タスク 3 完了
全タスクの処理終了

まとめ

この記事では、Go言語のgoroutineの活用シーン、注意事項、実践事例、拡張的な活用方法について解説しました。

全体として、非同期処理の実装例や排他制御、ライフサイクル管理の手法を具体的なコードと共に示し、効率的な開発手法を理解できる内容となっています。

ぜひ、この記事の内容を参考に、実際の開発現場でgoroutineを取り入れてみてください。

関連記事

Back to top button
目次へ