using System; using System.Threading; namespace Cysharp.Threading.Tasks { public interface IReadOnlyAsyncReactiveProperty<T> : IUniTaskAsyncEnumerable<T> { T Value { get; } IUniTaskAsyncEnumerable<T> WithoutCurrent(); UniTask<T> WaitAsync(CancellationToken cancellationToken = default); } public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T> { new T Value { get; set; } } [Serializable] public class AsyncReactiveProperty<T> : IAsyncReactiveProperty<T>, IDisposable { TriggerEvent<T> triggerEvent; #if UNITY_2018_3_OR_NEWER [UnityEngine.SerializeField] #endif T latestValue; public T Value { get { return latestValue; } set { this.latestValue = value; triggerEvent.SetResult(value); } } public AsyncReactiveProperty(T value) { this.latestValue = value; this.triggerEvent = default; } public IUniTaskAsyncEnumerable<T> WithoutCurrent() { return new WithoutCurrentEnumerable(this); } public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken) { return new Enumerator(this, cancellationToken, true); } public void Dispose() { triggerEvent.SetCompleted(); } public static implicit operator T(AsyncReactiveProperty<T> value) { return value.Value; } public override string ToString() { if (isValueType) return latestValue.ToString(); return latestValue?.ToString(); } public UniTask<T> WaitAsync(CancellationToken cancellationToken = default) { return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token); } static bool isValueType; static AsyncReactiveProperty() { isValueType = typeof(T).IsValueType; } sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource> { static Action<object> cancellationCallback = CancellationCallback; static TaskPool<WaitAsyncSource> pool; WaitAsyncSource nextNode; ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode; static WaitAsyncSource() { TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size); } AsyncReactiveProperty<T> parent; CancellationToken cancellationToken; CancellationTokenRegistration cancellationTokenRegistration; UniTaskCompletionSourceCore<T> core; WaitAsyncSource() { } public static IUniTaskSource<T> Create(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token) { if (cancellationToken.IsCancellationRequested) { return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token); } if (!pool.TryPop(out var result)) { result = new WaitAsyncSource(); } result.parent = parent; result.cancellationToken = cancellationToken; if (cancellationToken.CanBeCanceled) { result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result); } result.parent.triggerEvent.Add(result); TaskTracker.TrackActiveTask(result, 3); token = result.core.Version; return result; } bool TryReturn() { TaskTracker.RemoveTracking(this); core.Reset(); cancellationTokenRegistration.Dispose(); cancellationTokenRegistration = default; parent.triggerEvent.Remove(this); parent = null; cancellationToken = default; return pool.TryPush(this); } static void CancellationCallback(object state) { var self = (WaitAsyncSource)state; self.OnCanceled(self.cancellationToken); } // IUniTaskSource public T GetResult(short token) { try { return core.GetResult(token); } finally { TryReturn(); } } void IUniTaskSource.GetResult(short token) { GetResult(token); } public void OnCompleted(Action<object> continuation, object state, short token) { core.OnCompleted(continuation, state, token); } public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } public UniTaskStatus UnsafeGetStatus() { return core.UnsafeGetStatus(); } // ITriggerHandler ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; } ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; } public void OnCanceled(CancellationToken cancellationToken) { core.TrySetCanceled(cancellationToken); } public void OnCompleted() { // Complete as Cancel. core.TrySetCanceled(CancellationToken.None); } public void OnError(Exception ex) { core.TrySetException(ex); } public void OnNext(T value) { core.TrySetResult(value); } } sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T> { readonly AsyncReactiveProperty<T> parent; public WithoutCurrentEnumerable(AsyncReactiveProperty<T> parent) { this.parent = parent; } public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new Enumerator(parent, cancellationToken, false); } } sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T> { static Action<object> cancellationCallback = CancellationCallback; readonly AsyncReactiveProperty<T> parent; readonly CancellationToken cancellationToken; readonly CancellationTokenRegistration cancellationTokenRegistration; T value; bool isDisposed; bool firstCall; public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue) { this.parent = parent; this.cancellationToken = cancellationToken; this.firstCall = publishCurrentValue; parent.triggerEvent.Add(this); TaskTracker.TrackActiveTask(this, 3); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this); } } public T Current => value; ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; } ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; } public UniTask<bool> MoveNextAsync() { // raise latest value on first call. if (firstCall) { firstCall = false; value = parent.Value; return CompletedTasks.True; } completionSource.Reset(); return new UniTask<bool>(this, completionSource.Version); } public UniTask DisposeAsync() { if (!isDisposed) { isDisposed = true; TaskTracker.RemoveTracking(this); completionSource.TrySetCanceled(cancellationToken); parent.triggerEvent.Remove(this); } return default; } public void OnNext(T value) { this.value = value; completionSource.TrySetResult(true); } public void OnCanceled(CancellationToken cancellationToken) { DisposeAsync().Forget(); } public void OnCompleted() { completionSource.TrySetResult(false); } public void OnError(Exception ex) { completionSource.TrySetException(ex); } static void CancellationCallback(object state) { var self = (Enumerator)state; self.DisposeAsync().Forget(); } } } public class ReadOnlyAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable { TriggerEvent<T> triggerEvent; T latestValue; IUniTaskAsyncEnumerator<T> enumerator; public T Value { get { return latestValue; } } public ReadOnlyAsyncReactiveProperty(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken) { latestValue = initialValue; ConsumeEnumerator(source, cancellationToken).Forget(); } public ReadOnlyAsyncReactiveProperty(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken) { ConsumeEnumerator(source, cancellationToken).Forget(); } async UniTaskVoid ConsumeEnumerator(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken) { enumerator = source.GetAsyncEnumerator(cancellationToken); try { while (await enumerator.MoveNextAsync()) { var value = enumerator.Current; this.latestValue = value; triggerEvent.SetResult(value); } } finally { await enumerator.DisposeAsync(); enumerator = null; } } public IUniTaskAsyncEnumerable<T> WithoutCurrent() { return new WithoutCurrentEnumerable(this); } public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken) { return new Enumerator(this, cancellationToken, true); } public void Dispose() { if (enumerator != null) { enumerator.DisposeAsync().Forget(); } triggerEvent.SetCompleted(); } public static implicit operator T(ReadOnlyAsyncReactiveProperty<T> value) { return value.Value; } public override string ToString() { if (isValueType) return latestValue.ToString(); return latestValue?.ToString(); } public UniTask<T> WaitAsync(CancellationToken cancellationToken = default) { return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token); } static bool isValueType; static ReadOnlyAsyncReactiveProperty() { isValueType = typeof(T).IsValueType; } sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource> { static Action<object> cancellationCallback = CancellationCallback; static TaskPool<WaitAsyncSource> pool; WaitAsyncSource nextNode; ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode; static WaitAsyncSource() { TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size); } ReadOnlyAsyncReactiveProperty<T> parent; CancellationToken cancellationToken; CancellationTokenRegistration cancellationTokenRegistration; UniTaskCompletionSourceCore<T> core; WaitAsyncSource() { } public static IUniTaskSource<T> Create(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token) { if (cancellationToken.IsCancellationRequested) { return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token); } if (!pool.TryPop(out var result)) { result = new WaitAsyncSource(); } result.parent = parent; result.cancellationToken = cancellationToken; if (cancellationToken.CanBeCanceled) { result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result); } result.parent.triggerEvent.Add(result); TaskTracker.TrackActiveTask(result, 3); token = result.core.Version; return result; } bool TryReturn() { TaskTracker.RemoveTracking(this); core.Reset(); cancellationTokenRegistration.Dispose(); cancellationTokenRegistration = default; parent.triggerEvent.Remove(this); parent = null; cancellationToken = default; return pool.TryPush(this); } static void CancellationCallback(object state) { var self = (WaitAsyncSource)state; self.OnCanceled(self.cancellationToken); } // IUniTaskSource public T GetResult(short token) { try { return core.GetResult(token); } finally { TryReturn(); } } void IUniTaskSource.GetResult(short token) { GetResult(token); } public void OnCompleted(Action<object> continuation, object state, short token) { core.OnCompleted(continuation, state, token); } public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } public UniTaskStatus UnsafeGetStatus() { return core.UnsafeGetStatus(); } // ITriggerHandler ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; } ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; } public void OnCanceled(CancellationToken cancellationToken) { core.TrySetCanceled(cancellationToken); } public void OnCompleted() { // Complete as Cancel. core.TrySetCanceled(CancellationToken.None); } public void OnError(Exception ex) { core.TrySetException(ex); } public void OnNext(T value) { core.TrySetResult(value); } } sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T> { readonly ReadOnlyAsyncReactiveProperty<T> parent; public WithoutCurrentEnumerable(ReadOnlyAsyncReactiveProperty<T> parent) { this.parent = parent; } public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new Enumerator(parent, cancellationToken, false); } } sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T> { static Action<object> cancellationCallback = CancellationCallback; readonly ReadOnlyAsyncReactiveProperty<T> parent; readonly CancellationToken cancellationToken; readonly CancellationTokenRegistration cancellationTokenRegistration; T value; bool isDisposed; bool firstCall; public Enumerator(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue) { this.parent = parent; this.cancellationToken = cancellationToken; this.firstCall = publishCurrentValue; parent.triggerEvent.Add(this); TaskTracker.TrackActiveTask(this, 3); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this); } } public T Current => value; ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; } ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; } public UniTask<bool> MoveNextAsync() { // raise latest value on first call. if (firstCall) { firstCall = false; value = parent.Value; return CompletedTasks.True; } completionSource.Reset(); return new UniTask<bool>(this, completionSource.Version); } public UniTask DisposeAsync() { if (!isDisposed) { isDisposed = true; TaskTracker.RemoveTracking(this); completionSource.TrySetCanceled(cancellationToken); parent.triggerEvent.Remove(this); } return default; } public void OnNext(T value) { this.value = value; completionSource.TrySetResult(true); } public void OnCanceled(CancellationToken cancellationToken) { DisposeAsync().Forget(); } public void OnCompleted() { completionSource.TrySetResult(false); } public void OnError(Exception ex) { completionSource.TrySetException(ex); } static void CancellationCallback(object state) { var self = (Enumerator)state; self.DisposeAsync().Forget(); } } } public static class StateExtensions { public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken) { return new ReadOnlyAsyncReactiveProperty<T>(source, cancellationToken); } public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken) { return new ReadOnlyAsyncReactiveProperty<T>(initialValue, source, cancellationToken); } } }