跳到主要内容

go并发

进程

每一个进程都有一个非负整数表示的唯一进程ID,一个现有的进程可以调用fork函数创建一个新的进程(子进程)。

父进程和子进程有些区域只读共享,当需要修改时,采取写时复制技术。

fork 的一个特性是父进程的所有打开文件描述符都被复制到子进程,所以fork之后操作文件描述符有两种情况

  • 父进程等待子进程
  • 父进程和子进程各自执行不同的程序段

wait 和 waitpid ,父进程用来等待子进程的函数。当一个进程正常或异常终止时,内核向其父进程发送 SIGCHLD 信号。

fork 函数创建了新的子进程后,子进程往往要调用一种exec函数以执行另一个程序。

线程

典型的 unix进程可以看成只有一个控制线程: 一个进程在某个时刻只能做一件事情,有了多个控制线程以后,在程序设计时就可以把进程设计成在某一个时刻能够做不止一件事,每个线程处理各自独立的任务。

当多个控制线程共享相同的内存时,需要确保每个线程看到一致的数据视图。当一个线程可以修改的变量,其他线程也可以读取或者修改的时候,我们就需要对这些线程进行同步,确保他们在访问变量的存储内容时不会访问到无效的值。

为了解决这个问题,线程不得不使用锁,同一时间只允许一个线程访问该变量。互斥量,读写锁,自旋锁

进程间通信

  • 管道 (半双工,只能在公共祖先的两个进程间使用)
  • FIFO (没有第二种局限)
  • UNIX 嵌套字 (没有这两种局限)
  • 信号量

golang 并发编程理论基础

不要通过共享内存进行通信;相反,通过通信来共享内存。这句话是一条并发编程指导建议,和golang本身关系不大,golang中通过channel来践行这一理论。channel 是一等公民。

golang 并发编程GMP调度模型

面试官: “GMP 模型,为什么要有 P?”

在 Go1.1 之前 Go 的调度模型其实就是 GM 模型,也就是没有 P。今天带大家一起回顾过去的设计。

Go1.0.1 的调度器GM源码的核心关键步骤:

  • 调用 schedlock 方法来获取全局锁。

  • 获取全局锁成功后,将当前 Goroutine 状态从 Running(正在被调度) 状态修改为 Runnable(可以被调度)状态。

  • 调用 gput 方法来保存当前 Goroutine 的运行状态等信息,以便于后续的使用。

  • 调用 nextgandunlock 方法来寻找下一个可运行 Goroutine,并且释放全局锁给其他调度使用。

  • 获取到下一个待运行的 Goroutine 后,将其运行状态修改为 Running。

  • 调用 runtime·gogo 方法,将刚刚所获取到的下一个待执行的 Goroutine 运行起来,进入下一轮调度。

他会不断地运行调度流程,GoroutineA 完成了,就开始寻找 GoroutineB,寻找到 B 了,就把已经完成的 A 的调度权交给 B,让 GoroutineB 开始被调度,也就是运行。

当然了,也有被正在阻塞(Blocked)的 G。假设 G 正在做一些系统、网络调用,那么就会导致 G 停滞。这时候 M(系统线程)就会被会重新放内核队列中,等待新的一轮唤醒

GM 缺陷

这么表面的看起来,GM 模型似乎牢不可破,毫无缺陷。但为什么要改呢?

在 2012 年时 Dmitry Vyukov 发表了文章《Scalable Go Scheduler Design Doc》

  1. Single global mutex (Sched.Lock) and centralized state. The mutex protects all goroutine-related operations (creation, completion, rescheduling, etc).
  2. Goroutine (G) hand-off (G.nextg). Worker threads (M's) frequently hand-off runnable goroutines between each other, this may lead to increased latencies and additional overheads. Every M must be able to execute any runnable G, in particular the M that just created the G.
  3. Per-M memory cache (M.mcache). Memory cache and other caches (stack alloc) are associated with all M's, while they need to be associated only with M's running Go code (an M blocked inside of syscall does not need mcache). A ratio between M's running Go code and all M's can be as high as 1:100. This leads to excessive resource consumption (each MCache can suck up up to 2M) and poor data locality.
  4. Aggressive thread blocking/unblocking. In presence of syscalls worker threads are frequently blocked and unblocked. This adds a lot of overhead.

GM -> GMP

Processors The general idea is to introduce a notion of P (Processors) into runtime and implement work-stealing scheduler on top of Processors. M represents OS thread (as it is now). P represents a resource that is required to execute Go code. When M executes Go code, it has an associated P. When M is idle or in syscall, it does not need P. There is exactly GOMAXPROCS P’s. All P’s are organized into an array, that is a requirement of work-stealing. GOMAXPROCS change involves stop/start the world to resize array of P’s. Some variables from sched are de-centralized and moved to P. Some variables from M are moved to P (the ones that relate to active execution of Go code).

struct P
{
Lock;
G *gfree; // freelist, moved from sched
G *ghead; // runnable, moved from sched
G *gtail;
MCache *mcache; // moved from M
FixAlloc *stackalloc; // moved from M
uint64 ncgocall;
GCStats gcstats;
// etc
...
};
  • 每个 P 有自己的本地队列,大幅度的减轻了对全局队列的直接依赖,所带来的效果就是锁竞争的减少。而 GM 模型的性能开销大头就是锁竞争。

  • 每个 P 相对的平衡上,在 GMP 模型中也实现了 Work Stealing 算法,如果 P 的本地队列为空,则会从全局队列或其他 P 的本地队列中窃取可运行的 G 来运行,减少空转,提高了资源利用率。

GMP

// Goroutine scheduler
// The scheduler's job is to distribute ready-to-run goroutines over worker threads.
//
// The main concepts are:
// G - goroutine.
// M - worker thread, or machine.
// P - processor, a resource that is required to execute Go code.
// M must have an associated P to execute Go code, however it can be
// blocked or in a syscall w/o an associated P.

  • M 是线程的抽象;G 是 goroutine;P 是承上启下的调度器;

  • M调度G前,需要和P绑定;

  • 全局有多个M和多个P,但同时并行的G的最大数量等于P的数量;

  • M调度G时,优先取P本地队列,其次取全局队列,最后取wait队列;这样的好处是,取本地队列时,可以接近于无锁化,减少全局锁竞争;

  • 为防止不同P的闲忙差异过大,设立work-stealing机制,本地队列为空的P可以尝试从其他P本地队列偷取一半的G补充到自身队列.

G


type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the C stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
stack stack // offset known to runtime/cgo
stackguard0 uintptr // offset known to liblink
stackguard1 uintptr // offset known to liblink

_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
m *m // current m; offset known to arm liblink
sched gobuf
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
stktopsp uintptr // expected sp at top of stack, to check in traceback
// param is a generic pointer parameter field used to pass
// values in particular contexts where other storage for the
// parameter would be difficult to find. It is currently used
// in three ways:
// 1. When a channel operation wakes up a blocked goroutine, it sets param to
// point to the sudog of the completed blocking operation.
// 2. By gcAssistAlloc1 to signal back to its caller that the goroutine completed
// the GC cycle. It is unsafe to do so in any other way, because the goroutine's
// stack may have moved in the meantime.
// 3. By debugCallWrap to pass parameters to a new goroutine because allocating a
// closure in the runtime is forbidden.
param unsafe.Pointer
atomicstatus atomic.Uint32
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
goid uint64
schedlink guintptr
waitsince int64 // approx time when the g become blocked
waitreason waitReason // if status==Gwaiting

preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule
preemptShrink bool // shrink stack at synchronous safe point

// asyncSafePoint is set if g is stopped at an asynchronous
// safe point. This means there are frames on the stack
// without precise pointer information.
asyncSafePoint bool

paniconfault bool // panic (instead of crash) on unexpected fault address
gcscandone bool // g has scanned stack; protected by _Gscan bit in status
throwsplit bool // must not split stack
// activeStackChans indicates that there are unlocked channels
// pointing into this goroutine's stack. If true, stack
// copying needs to acquire channel locks to protect these
// areas of the stack.
activeStackChans bool
// parkingOnChan indicates that the goroutine is about to
// park on a chansend or chanrecv. Used to signal an unsafe point
// for stack shrinking.
parkingOnChan atomic.Bool

raceignore int8 // ignore race detection events
tracking bool // whether we're tracking this G for sched latency statistics
trackingSeq uint8 // used to decide whether to track this G
trackingStamp int64 // timestamp of when the G last started being tracked
runnableTime int64 // the amount of time spent runnable, cleared when running, only used when tracking
lockedm muintptr
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
parentGoid uint64 // goid of goroutine that created this goroutine
gopc uintptr // pc of go statement that created this goroutine
ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
startpc uintptr // pc of goroutine function
racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
timer *timer // cached timer for time.Sleep
selectDone atomic.Uint32 // are we participating in a select and did someone win the race?

// goroutineProfiled indicates the status of this goroutine's stack for the
// current in-progress goroutine profile
goroutineProfiled goroutineProfileStateHolder

// Per-G tracer state.
trace gTraceState

// Per-G GC state

// gcAssistBytes is this G's GC assist credit in terms of
// bytes allocated. If this is positive, then the G has credit
// to allocate gcAssistBytes bytes without assisting. If this
// is negative, then the G must correct this by performing
// scan work. We track this in bytes to make it fast to update
// and check for debt in the malloc hot path. The assist ratio
// determines how this corresponds to scan work debt.
gcAssistBytes int64
}

// defined constants
const (
// G status
//
// Beyond indicating the general state of a G, the G status
// acts like a lock on the goroutine's stack (and hence its
// ability to execute user code).
//
// If you add to this list, add to the list
// of "okay during garbage collection" status
// in mgcmark.go too.
//
// TODO(austin): The _Gscan bit could be much lighter-weight.
// For example, we could choose not to run _Gscanrunnable
// goroutines found in the run queue, rather than CAS-looping
// until they become _Grunnable. And transitions like
// _Gscanwaiting -> _Gscanrunnable are actually okay because
// they don't affect stack ownership.

// _Gidle means this goroutine was just allocated and has not
// yet been initialized.
_Gidle = iota // 0

// _Grunnable means this goroutine is on a run queue. It is
// not currently executing user code. The stack is not owned.
_Grunnable // 1

// _Grunning means this goroutine may execute user code. The
// stack is owned by this goroutine. It is not on a run queue.
// It is assigned an M and a P (g.m and g.m.p are valid).
_Grunning // 2

// _Gsyscall means this goroutine is executing a system call.
// It is not executing user code. The stack is owned by this
// goroutine. It is not on a run queue. It is assigned an M.
_Gsyscall // 3

// _Gwaiting means this goroutine is blocked in the runtime.
// It is not executing user code. It is not on a run queue,
// but should be recorded somewhere (e.g., a channel wait
// queue) so it can be ready()d when necessary. The stack is
// not owned *except* that a channel operation may read or
// write parts of the stack under the appropriate channel
// lock. Otherwise, it is not safe to access the stack after a
// goroutine enters _Gwaiting (e.g., it may get moved).
_Gwaiting // 4

// _Gmoribund_unused is currently unused, but hardcoded in gdb
// scripts.
_Gmoribund_unused // 5

// _Gdead means this goroutine is currently unused. It may be
// just exited, on a free list, or just being initialized. It
// is not executing user code. It may or may not have a stack
// allocated. The G and its stack (if any) are owned by the M
// that is exiting the G or that obtained the G from the free
// list.
_Gdead // 6

// _Genqueue_unused is currently unused.
_Genqueue_unused // 7

// _Gcopystack means this goroutine's stack is being moved. It
// is not executing user code and is not on a run queue. The
// stack is owned by the goroutine that put it in _Gcopystack.
_Gcopystack // 8

// _Gpreempted means this goroutine stopped itself for a
// suspendG preemption. It is like _Gwaiting, but nothing is
// yet responsible for ready()ing it. Some suspendG must CAS
// the status to _Gwaiting to take responsibility for
// ready()ing this G.
_Gpreempted // 9

// _Gscan combined with one of the above states other than
// _Grunning indicates that GC is scanning the stack. The
// goroutine is not executing user code and the stack is owned
// by the goroutine that set the _Gscan bit.
//
// _Gscanrunning is different: it is used to briefly block
// state transitions while GC signals the G to scan its own
// stack. This is otherwise like _Grunning.
//
// atomicstatus&~Gscan gives the state the goroutine will
// return to when the scan completes.
_Gscan = 0x1000
_Gscanrunnable = _Gscan + _Grunnable // 0x1001
_Gscanrunning = _Gscan + _Grunning // 0x1002
_Gscansyscall = _Gscan + _Gsyscall // 0x1003
_Gscanwaiting = _Gscan + _Gwaiting // 0x1004
_Gscanpreempted = _Gscan + _Gpreempted // 0x1009
)

M

type m struct {
g0 *g // goroutine with scheduling stack
morebuf gobuf // gobuf arg to morestack
divmod uint32 // div/mod denominator for arm - known to liblink
_ uint32 // align next field to 8 bytes

// Fields not known to debuggers.
procid uint64 // for debuggers, but offset not hard-coded
gsignal *g // signal-handling g
goSigStack gsignalStack // Go-allocated signal handling stack
sigmask sigset // storage for saved signal mask
tls [tlsSlots]uintptr // thread-local storage (for x86 extern register)
mstartfn func()
curg *g // current running goroutine
caughtsig guintptr // goroutine running during fatal signal
p puintptr // attached p for executing go code (nil if not executing go code)
nextp puintptr
oldp puintptr // the p that was attached before executing a syscall
id int64
mallocing int32
throwing throwType
preemptoff string // if != "", keep curg running on this m
locks int32
dying int32
profilehz int32
spinning bool // m is out of work and is actively looking for work
blocked bool // m is blocked on a note
newSigstack bool // minit on C thread called sigaltstack
printlock int8
incgo bool // m is executing a cgo call
isextra bool // m is an extra m
isExtraInC bool // m is an extra m that is not executing Go code
freeWait atomic.Uint32 // Whether it is safe to free g0 and delete m (one of freeMRef, freeMStack, freeMWait)
fastrand uint64
needextram bool
traceback uint8
ncgocall uint64 // number of cgo calls in total
ncgo int32 // number of cgo calls currently in progress
cgoCallersUse atomic.Uint32 // if non-zero, cgoCallers in use temporarily
cgoCallers *cgoCallers // cgo traceback if crashing in cgo call
park note
alllink *m // on allm
schedlink muintptr
lockedg guintptr
createstack [32]uintptr // stack that created this thread.
lockedExt uint32 // tracking for external LockOSThread
lockedInt uint32 // tracking for internal lockOSThread
nextwaitm muintptr // next m waiting for lock

// wait* are used to carry arguments from gopark into park_m, because
// there's no stack to put them on. That is their sole purpose.
waitunlockf func(*g, unsafe.Pointer) bool
waitlock unsafe.Pointer
waitTraceBlockReason traceBlockReason
waitTraceSkip int

syscalltick uint32
freelink *m // on sched.freem
trace mTraceState

// these are here because they are too large to be on the stack
// of low-level NOSPLIT functions.
libcall libcall
libcallpc uintptr // for cpu profiler
libcallsp uintptr
libcallg guintptr
syscall libcall // stores syscall parameters on windows

vdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call)
vdsoPC uintptr // PC for traceback while in VDSO call

// preemptGen counts the number of completed preemption
// signals. This is used to detect when a preemption is
// requested, but fails.
preemptGen atomic.Uint32

// Whether this is a pending preemption signal on this M.
signalPending atomic.Uint32

dlogPerM

mOS

// Up to 10 locks held by this m, maintained by the lock ranking code.
locksHeldLen int
locksHeld [10]heldLockInfo
}

一类特殊的调度协程,不用于执行用户函数,负责执行 g 之间的切换调度. 与 m 的关系为 1:1;

P

type p struct {
id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
mcache *mcache
pcache pageCache
raceprocctx uintptr

deferpool []*_defer // pool of available defer structs (see panic.go)
deferpoolbuf [32]*_defer

// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64

// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
//
// Note that while other P's may atomically CAS this to zero,
// only the owner P can CAS it to a valid G.
runnext guintptr

// Available G's (status == Gdead)
gFree struct {
gList
n int32
}

sudogcache []*sudog
sudogbuf [128]*sudog

// Cache of mspan objects from the heap.
mspancache struct {
// We need an explicit length here because this field is used
// in allocation codepaths where write barriers are not allowed,
// and eliminating the write barrier/keeping it eliminated from
// slice updates is tricky, more so than just managing the length
// ourselves.
len int
buf [128]*mspan
}

// Cache of a single pinner object to reduce allocations from repeated
// pinner creation.
pinnerCache *pinner

trace pTraceState

palloc persistentAlloc // per-P to avoid mutex

// The when field of the first entry on the timer heap.
// This is 0 if the timer heap is empty.
timer0When atomic.Int64

// The earliest known nextwhen field of a timer with
// timerModifiedEarlier status. Because the timer may have been
// modified again, there need not be any timer with this value.
// This is 0 if there are no timerModifiedEarlier timers.
timerModifiedEarliest atomic.Int64

// Per-P GC state
gcAssistTime int64 // Nanoseconds in assistAlloc
gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)

// limiterEvent tracks events for the GC CPU limiter.
limiterEvent limiterEvent

// gcMarkWorkerMode is the mode for the next mark worker to run in.
// That is, this is used to communicate with the worker goroutine
// selected for immediate execution by
// gcController.findRunnableGCWorker. When scheduling other goroutines,
// this field must be set to gcMarkWorkerNotWorker.
gcMarkWorkerMode gcMarkWorkerMode
// gcMarkWorkerStartTime is the nanotime() at which the most recent
// mark worker started.
gcMarkWorkerStartTime int64

// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
// disposed on certain GC state transitions.
gcw gcWork

// wbBuf is this P's GC write barrier buffer.
//
// TODO: Consider caching this in the running G.
wbBuf wbBuf

runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

// statsSeq is a counter indicating whether this P is currently
// writing any stats. Its value is even when not, odd when it is.
statsSeq atomic.Uint32

// Lock for timers. We normally access the timers while running
// on this P, but the scheduler can also do it from a different P.
timersLock mutex

// Actions to take at some time. This is used to implement the
// standard library's time package.
// Must hold timersLock to access.
timers []*timer

// Number of timers in P's heap.
numTimers atomic.Uint32

// Number of timerDeleted timers in P's heap.
deletedTimers atomic.Uint32

// Race context used while executing timer functions.
timerRaceCtx uintptr

// maxStackScanDelta accumulates the amount of stack space held by
// live goroutines (i.e. those eligible for stack scanning).
// Flushed to gcController.maxStackScan once maxStackScanSlack
// or -maxStackScanSlack is reached.
maxStackScanDelta int64

// gc-time statistics about current goroutines
// Note that this differs from maxStackScan in that this
// accumulates the actual stack observed to be used at GC time (hi - sp),
// not an instantaneous measure of the total stack size that might need
// to be scanned (hi - lo).
scannedStackSize uint64 // stack size of goroutines scanned by this P
scannedStacks uint64 // number of goroutines scanned by this P

// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
preempt bool

// pageTraceBuf is a buffer for writing out page allocation/free/scavenge traces.
//
// Used only if GOEXPERIMENT=pagetrace.
pageTraceBuf pageTraceBuf

// Padding is no longer needed. False sharing is now not a worry because p is large enough
// that its size class is an integer multiple of the cache line size (for any of our architectures).
}

runq:本地 goroutine 队列,最大长度为 256.

Schedt


type schedt struct {
goidgen atomic.Uint64
lastpoll atomic.Int64 // time of last network poll, 0 if currently polling
pollUntil atomic.Int64 // time to which current poll is sleeping

lock mutex

// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
// sure to call checkdead().

midle muintptr // idle m's waiting for work
nmidle int32 // number of idle m's waiting for work
nmidlelocked int32 // number of locked m's waiting for work
mnext int64 // number of m's that have been created and next M ID
maxmcount int32 // maximum number of m's allowed (or die)
nmsys int32 // number of system m's not counted for deadlock
nmfreed int64 // cumulative number of freed m's

ngsys atomic.Int32 // number of system goroutines

pidle puintptr // idle p's
npidle atomic.Int32
nmspinning atomic.Int32 // See "Worker thread parking/unparking" comment in proc.go.
needspinning atomic.Uint32 // See "Delicate dance" comment in proc.go. Boolean. Must hold sched.lock to set to 1.

// Global runnable queue.
runq gQueue
runqsize int32

// disable controls selective disabling of the scheduler.
//
// Use schedEnableUser to control this.
//
// disable is protected by sched.lock.
disable struct {
// user disables scheduling of user goroutines.
user bool
runnable gQueue // pending runnable Gs
n int32 // length of runnable
}

// Global cache of dead G's.
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}

// Central cache of sudog structs.
sudoglock mutex
sudogcache *sudog

// Central pool of available defer structs.
deferlock mutex
deferpool *_defer

// freem is the list of m's waiting to be freed when their
// m.exited is set. Linked through m.freelink.
freem *m

gcwaiting atomic.Bool // gc is waiting to run
stopwait int32
stopnote note
sysmonwait atomic.Bool
sysmonnote note

// safepointFn should be called on each P at the next GC
// safepoint if p.runSafePointFn is set.
safePointFn func(*p)
safePointWait int32
safePointNote note

profilehz int32 // cpu profiling rate

procresizetime int64 // nanotime() of last change to gomaxprocs
totaltime int64 // ∫gomaxprocs dt up to procresizetime

// sysmonlock protects sysmon's actions on the runtime.
//
// Acquire and hold this mutex to block sysmon from interacting
// with the rest of the runtime.
sysmonlock mutex

// timeToRun is a distribution of scheduling latencies, defined
// as the sum of time a G spends in the _Grunnable state before
// it transitions to _Grunning.
timeToRun timeHistogram

// idleTime is the total CPU time Ps have "spent" idle.
//
// Reset on each GC cycle.
idleTime atomic.Int64

// totalMutexWaitTime is the sum of time goroutines have spent in _Gwaiting
// with a waitreason of the form waitReasonSync{RW,}Mutex{R,}Lock.
totalMutexWaitTime atomic.Int64
}