71 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			C#
		
	
	
	
			
		
		
	
	
			71 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			C#
		
	
	
	
| using System;
 | |
| 
 | |
| namespace UniRx
 | |
| {
 | |
|     public interface IConnectableObservable<T> : IObservable<T>
 | |
|     {
 | |
|         IDisposable Connect();
 | |
|     }
 | |
| 
 | |
|     public static partial class Observable
 | |
|     {
 | |
|         class ConnectableObservable<T> : IConnectableObservable<T>
 | |
|         {
 | |
|             readonly IObservable<T> source;
 | |
|             readonly ISubject<T> subject;
 | |
|             readonly object gate = new object();
 | |
|             Connection connection;
 | |
| 
 | |
|             public ConnectableObservable(IObservable<T> source, ISubject<T> subject)
 | |
|             {
 | |
|                 this.source = source.AsObservable();
 | |
|                 this.subject = subject;
 | |
|             }
 | |
| 
 | |
|             public IDisposable Connect()
 | |
|             {
 | |
|                 lock (gate)
 | |
|                 {
 | |
|                     // don't subscribe twice
 | |
|                     if (connection == null)
 | |
|                     {
 | |
|                         var subscription = source.Subscribe(subject);
 | |
|                         connection = new Connection(this, subscription);
 | |
|                     }
 | |
| 
 | |
|                     return connection;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             public IDisposable Subscribe(IObserver<T> observer)
 | |
|             {
 | |
|                 return subject.Subscribe(observer);
 | |
|             }
 | |
| 
 | |
|             class Connection : IDisposable
 | |
|             {
 | |
|                 readonly ConnectableObservable<T> parent;
 | |
|                 IDisposable subscription;
 | |
| 
 | |
|                 public Connection(ConnectableObservable<T> parent, IDisposable subscription)
 | |
|                 {
 | |
|                     this.parent = parent;
 | |
|                     this.subscription = subscription;
 | |
|                 }
 | |
| 
 | |
|                 public void Dispose()
 | |
|                 {
 | |
|                     lock (parent.gate)
 | |
|                     {
 | |
|                         if (subscription != null)
 | |
|                         {
 | |
|                             subscription.Dispose();
 | |
|                             subscription = null;
 | |
|                             parent.connection = null;
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| } |