iOS

OSAtomic原子操作

多线程原子操作

Posted by Jincc on January 23, 2018

今天在查阅ReactiveCocoa的RACDisposable源码的时候,发现里面大量使用了<libkern/OSAtomic.h>里面的原子操作,这一块的知识不是很了解所以写了这一篇记录.

- (id)init {
	self = [super init];
	if (self == nil) return nil;

	_disposeBlock = (__bridge void *)self;
	OSMemoryBarrier();

	return self;
}

- (id)initWithBlock:(void (^)(void))block {
	NSCParameterAssert(block != nil);

	self = [super init];
	if (self == nil) return nil;

	_disposeBlock = (void *)CFBridgingRetain([block copy]); 
	OSMemoryBarrier();

	return self;
}

OSMemoryBarrier

为了达到最佳性能,编译器通常会对汇编基本的指令进行重新排序来尽可能保持处理器的指令流水线。如果看似独立的变量实际上 是相互影响,那么编译器优化有可能把这些变量更新成了错误的顺序,导致潜在不不正确结果。

内存屏障(memorybarrier)是一个使用来确保内存操作按照正确的顺序工作的非阻塞的同步工具。内存屏障的作用就像一个栅栏,迫使处理器来完成位于障碍前面的任何加载和存储操作,才允许它执行位于屏障之后的加载和存储操作。内存屏障同样使用来确保一个线程(但对另外一个线程可见)的内存操作总是按照预定的顺序完成。

为了使用一个内存屏障,你只要在你代码里面需要的地方简单的调用OSMemoryBarrier函数。

上面的代码就是想在创建实例之前给_disposeBlock负值成功.

OSAtomicCompareAndSwapPtrBarrier

- (void)dispose {
	void (^disposeBlock)(void) = NULL;

	while (YES) {
		void *blockPtr = _disposeBlock;
		if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL, &_disposeBlock)) {
			if (blockPtr != (__bridge void *)self) {
				disposeBlock = CFBridgingRelease(blockPtr);
			}

			break;
		}
	}

	if (disposeBlock != nil) disposeBlock();
}

看到上面的代码的时候,我开始纳闷了。为什么不直接执行disposeBlock呢?看下方代码:

- (void)dispose {
	void (^disposeBlock)(void) = NULL;
    
    disposeBlock = CFBridgingRelease(_disposeBlock);
    if (disposeBlock != nil) disposeBlock();
    }

在多线程条件下,如果我们不加锁的话, disposeBlock可能就会执行多次了,造成性能销毁.所以说这里作者加了OSAtomicCompareAndSwapPtrBarrier来完成线程同步.

OSAtomicCompareAndSwapPtrBarrier

  1. 对比blockPtr和_disposeBlock是否相等
  2. 如果相等,_disposeBlock = NULL,返回YES
  3. 进入循环体里面取出disposeBlock.

综合上述,这段代码的逻辑实际上就是在多线程条件下,即使已经有多个自动变量ptr已经读取到相同的objectPtr,依然会按序轮次执行OSAtomicCompareAndSwapPtr方法。 假设有两条线程同时进入这段逻辑。那么,第一次执行后会成功使得objectPtr=NULL。 第二条线程在等待结束后也开始执行OSAtomicCompareAndSwapPtr。但是发现objectPtr已经和ptr不相等了(ptr依然是旧值,而objectPtr己经被第一条线程更新为NULL) 这时就返回NO,所以无法执行下面的break将继续循环。 此时ptr得到新值NULL,匹配成功,继续将objectPtr设为NULL。 从而这一段流程下来并不会进入系统内核引起大量CPU消耗 却又完成了线程同步的操作,使得逻辑之间具备一致性。

自旋锁

我所了解的自旋锁和互斥锁不一样,当一个线程获得这个锁之后,其他想要获得此锁的线程将会被阻塞,直到该锁被释放。但自选锁不一样,当一个线程获得锁之后,其他线程将会一直循环在哪里查看是否该锁被释放。所以,此锁比较适用于锁的持有者保存时间较短的情况下。

可以理解为有一个全局的状态变量,然后上锁的地方是一个dowhile循环,一直在循环直到我们的全局状态变为false的时候.

libkern/OSAtomic.h中包含了三个关于自旋锁的函数:OSSpinLockLock, OSSpinLockTry, OSSpinLockUnlock

它的使用也很简单:

- (void)initLock
{
    _spinLock = OS_SPINLOCK_INIT; //1.init
}
- (BOOL)isDisposed {
	OSSpinLockLock(&_spinLock); // 2.lock
	BOOL disposed = _disposed; // work
	OSSpinLockUnlock(&_spinLock); // 3.unlock
	return disposed;
}

原子队列操作

队列操作主要包含两类:

  • 不加锁的FIFO(先进先出)入队和出队原子操作,包含OSAtomicFifoDequeue和OSAtomicFifoEnqueue两个函数
  • 不加锁的LIFO(后进先出)入队和出队原子操作,包含OSAtomicDequeue和OSAtomicEnqueue两个函数。

这两个函数是线程安全的,对有潜在精确要求的代码来说,这会是强大的构建方式。

static void RACCheckActiveSignals(void) {
	// Clear this flag now, so another thread can re-dispatch to the main queue
	// as needed.
    //RACWillCheckActiveSignals = 0;
	OSAtomicAnd32Barrier(0, &RACWillCheckActiveSignals);

	RACSignalList * restrict elem;
    //--------------------出栈
	while ((elem = OSAtomicDequeue(&RACActiveSignalsToCheck, offsetof(RACSignalList, next))) != NULL) {
		RACDynamicSignal *signal = CFBridgingRelease(elem->retainedSignal);
		free(elem);
        //如果有订阅,保存到RACActiveSignals全局set里面,否则的话删除
		if (signal.hasSubscribers) {
			// We want to keep the signal around until all its subscribers are done
			CFSetAddValue(RACActiveSignals, (__bridge void *)signal);
		} else {
			CFSetRemoveValue(RACActiveSignals, (__bridge void *)signal);
		}
	}
}

- (void)invalidateGlobalRefIfNoNewSubscribersShowUp {
	// If no one subscribes in one pass of the main run loop, then we're free to
	// go. It's up to the caller to keep us alive if they still want us.
    
	RACSignalList *elem = malloc(sizeof(*elem));

	// This also serves to retain the signal until the next pass.
    //retetain signal
	elem->retainedSignal = CFBridgingRetain(self);
    //--------------------进栈
	OSAtomicEnqueue(&RACActiveSignalsToCheck, elem, offsetof(RACSignalList, next));
    
	// Not using a barrier because duplicate scheduling isn't erroneous, just
	// less optimized.
    
    // 将RACWillCheckActiveSignals = 1;
	int32_t willCheck = OSAtomicOr32Orig(1, &RACWillCheckActiveSignals);

	// Only schedule a check if RACWillCheckActiveSignals was 0 before.
	if (willCheck == 0) {
		dispatch_async(dispatch_get_main_queue(), ^{
            //执行
			RACCheckActiveSignals();
		});
	}
}

AND or OR

int32_t	OSAtomicOr32( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicOr32Barrier( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicOr32Orig( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicOr32OrigBarrier( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicAnd32( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicAnd32Barrier( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicAnd32Orig( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicAnd32OrigBarrier( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicXor32( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicXor32Barrier( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicXor32Orig( uint32_t __theMask, volatile uint32_t *__theValue );

int32_t	OSAtomicXor32OrigBarrier( uint32_t __theMask, volatile uint32_t *__theValue );
 

这组函数可根据以下两个规则来分类:

  • 是否使用Barrier
  • 返回值是原始值还是操作完成后的值 函数将__theMask与__theValue指向的值做AND或者OR操作,新的值然后赋值给__theValue.
- (void)setAllowsConcurrentExecution:(BOOL)allowed {
	[self willChangeValueForKey:@keypath(self.allowsConcurrentExecution)];
	if (allowed) {
		OSAtomicOr32Barrier(1, &_allowsConcurrentExecution);
	} else {
		OSAtomicAnd32Barrier(0, &_allowsConcurrentExecution);
	}

	[self didChangeValueForKey:@keypath(self.allowsConcurrentExecution)];
}

数学操作 ADD Increment Decrement

这组函数主要包括:

  • 加操作:OSAtomicAdd, OSAtomicAddBarrier
  • 递减操作:OSAtomicDecrement, OSAtomicDecrementBarrier
  • 递增操作:OSAtomicIncrement, OSAtomicIncrementBarrier

示例代码:

- (RACSignal *)autoconnect {
	__block volatile int32_t subscriberCount = 0;

	return [[RACSignal
		createSignal:^(id<RACSubscriber> subscriber) {
		//1.
			OSAtomicIncrement32Barrier(&subscriberCount);

			RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
			RACDisposable *connectionDisposable = [self connect];

			return [RACDisposable disposableWithBlock:^{
				[subscriptionDisposable dispose];
               //2.
				if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
					[connectionDisposable dispose];
				}
			}];
		}]
		setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}

我们在连接多播信号的时候,每次连接创建订阅者的时候+1,销毁的时候-1,当发现subscriberCount==0的时候就会释放connet资源.

下面的这几行代码也很有意思:

RAC/RACDynamicSequence 里面
- (void)dealloc {
	static volatile int32_t directDeallocCount = 0;
//    int32_t MaxCount = 10000000000;
    int32_t MaxCount = DEALLOC_OVERFLOW_GUARD;
	
	if (OSAtomicIncrement32(&directDeallocCount) >= MaxCount) {
		OSAtomicAdd32(-MaxCount, &directDeallocCount);

		// Put this sequence's tail onto the autorelease pool so we stop
		// recursing.
		__autoreleasing RACSequence *tail __attribute__((unused)) = _tail;
	}
	
    _tail = nil;
}
// Determines how RACDynamicSequences will be deallocated before the next one is
// shifted onto the autorelease pool.
//
// This avoids stack overflows when deallocating long chains of dynamic
// sequences.
#define DEALLOC_OVERFLOW_GUARD 100

当大量的sequence还没被加入到自动释放池的表中时,如果超过我们设的预期数量时,就会将tail加入到自动释放池,避免堆栈溢出.这是优化点还是挺赞的,可以借鉴.