ArcGIS Pro 3.4 API Reference Guide
ArcGIS.Core.Data.Realtime Namespace / RealtimeCursorBase Class / WaitForRowsAsync Method / WaitForRowsAsync(CancellationToken) Method
A System.Threading.CancellationToken used to control cancellation behavior for the returned task.
Example Version

WaitForRowsAsync(CancellationToken) Method
Asynchronously waits for new rows to be available in the internal queue of this real-time cursor. The returned System.Threading.Tasks.Task will also complete if the state of this RealtimeCursor (see GetState) changes from RealtimeCursorState.Subscribed. This method can be called on any thread.
Syntax
public Task<bool> WaitForRowsAsync( 
   CancellationToken cancellationToken
)

Parameters

cancellationToken
A System.Threading.CancellationToken used to control cancellation behavior for the returned task.

Return Value

A System.Threading.Tasks.Task that will complete once this cursor becomes unsubscribed or new rows are available in the internal queue of this real-time cursor. The returned System.Boolean value is false if this cursor is unsubscribed and there are no more rows to be read. Otherwise it's true.
Exceptions
ExceptionDescription
A geodatabase-related exception has occurred.
Example
Submit a Graph Query
//On the QueuedTask...
//and assuming you have established a connection to a knowledge graph
//...
//Construct an openCypher query - return the first 10 entities (whatever
//they are...)
var query = "MATCH (n) RETURN n LIMIT 10";//default limit is 100 if not specified
//other examples...
//query = "MATCH (a:Person) RETURN [a.name, a.age] ORDER BY a.age DESC LIMIT 50";
//query = "MATCH (b:Person) RETURN { Xperson: { Xname: b.name, Xage: b.age } } ORDER BY b.name DESC";
//query = "MATCH p = (c:Person)-[:HasCar]-() RETURN p ORDER BY c.name DESC";

//Create a query filter
//Note: OutputSpatialReference is currently ignored
var kg_qf = new KnowledgeGraphQueryFilter()
{
  QueryText = query
};
//Optionally - u can choose to include provenance in the results
//(_if_ the KG has provenance - otherwise the query will fail)
if (includeProvenanceIfPresent)
{
  //see "Get Whether KG Supports Provenance" snippet
  if (KnowledgeGraphSupportsProvenance(kg))
  {
    //Only include if the KG has provenance
    kg_qf.ProvenanceBehavior =
      KnowledgeGraphProvenanceBehavior.Include;//default is exclude
  }
}
//submit the query - returns a KnowledgeGraphCursor
using (var kg_rc = kg.SubmitQuery(kg_qf))
{
  //wait for rows to be returned from the server
  //note the "await"...
  while (await kg_rc.WaitForRowsAsync())
  {
    //Rows have been retrieved - process this "batch"...
    while (kg_rc.MoveNext())
    {
      //Get the current KnowledgeGraphRow
      using (var graph_row = kg_rc.Current)
      {
        //Graph row is an array, process all returned values...
        var val_count = (int)graph_row.GetCount();
        for (int i = 0; i < val_count; i++)
        {
          var retval = graph_row[i];
          //Process row value (note: recursive)
          //See "Process a KnowledgeGraphRow Value" snippet
          ProcessKnowledgeGraphRowValue(retval);
        }
      }
    }
  }//WaitForRowsAsync
}//SubmitQuery
Submit a Graph Query
//On the QueuedTask...
//and assuming you have established a connection to a knowledge graph
//...
//Construct an openCypher query - return the first 10 entities (whatever
//they are...)
var query = "MATCH (n) RETURN n LIMIT 10";//default limit is 100 if not specified
//other examples...
//query = "MATCH (a:Person) RETURN [a.name, a.age] ORDER BY a.age DESC LIMIT 50";
//query = "MATCH (b:Person) RETURN { Xperson: { Xname: b.name, Xage: b.age } } ORDER BY b.name DESC";
//query = "MATCH p = (c:Person)-[:HasCar]-() RETURN p ORDER BY c.name DESC";

//Create a query filter
//Note: OutputSpatialReference is currently ignored
var kg_qf = new KnowledgeGraphQueryFilter()
{
  QueryText = query
};
//Optionally - u can choose to include provenance in the results
//(_if_ the KG has provenance - otherwise the query will fail)
if (includeProvenanceIfPresent)
{
  //see "Get Whether KG Supports Provenance" snippet
  if (KnowledgeGraphSupportsProvenance(kg))
  {
    //Only include if the KG has provenance
    kg_qf.ProvenanceBehavior =
      KnowledgeGraphProvenanceBehavior.Include;//default is exclude
  }
}
//submit the query - returns a KnowledgeGraphCursor
using (var kg_rc = kg.SubmitQuery(kg_qf))
{
  //wait for rows to be returned from the server
  //note the "await"...
  while (await kg_rc.WaitForRowsAsync())
  {
    //Rows have been retrieved - process this "batch"...
    while (kg_rc.MoveNext())
    {
      //Get the current KnowledgeGraphRow
      using (var graph_row = kg_rc.Current)
      {
        //Graph row is an array, process all returned values...
        var val_count = (int)graph_row.GetCount();
        for (int i = 0; i < val_count; i++)
        {
          var retval = graph_row[i];
          //Process row value (note: recursive)
          //See "Process a KnowledgeGraphRow Value" snippet
          ProcessKnowledgeGraphRowValue(retval);
        }
      }
    }
  }//WaitForRowsAsync
}//SubmitQuery
Call WaitForRowsAsync With Cancellation
//On the QueuedTask...
//and assuming you have established a connection to a knowledge graph
//...
//submit query or search to return a KnowledgeGraphCursor
//using (var kg_rc = kg.SubmitQuery(kg_qf)) {
//using (var kg_rc = kg.SubmitSearch(kg_sf)) {
//...
//wait for rows to be returned from the server
//"auto-cancel" after 20 seconds
var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
//catch TaskCanceledException
try
{
  while (await kg_rc.WaitForRowsAsync(cancel.Token))
  {
    //check for row events
    while (kg_rc.MoveNext())
    {
      using (var graph_row = kg_rc.Current)
      {
        //Graph row is an array, process all returned values...
        var val_count = (int)graph_row.GetCount();
        for (int i = 0; i < val_count; i++)
        {
          var retval = graph_row[i];
          //Process row value (note: recursive)
          //See "Process a KnowledgeGraphRow Value" snippet
          ProcessKnowledgeGraphRowValue(retval);
        }
      }
    }
  }
}
//Timeout expired
catch (TaskCanceledException tce)
{
  //Handle cancellation as needed
}
cancel.Dispose();
Search And Subscribe for Streaming Data
await QueuedTask.Run(async () =>
{
  //query filter can be null to search and retrieve all rows
  //true means recycling cursor
  using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
  {
    //waiting for new features to be streamed
    //default is no cancellation
    while (await rc.WaitForRowsAsync())
    {
      while (rc.MoveNext())
      {
        using (var row = rc.Current)
        {
          //determine the origin of the row event
          switch (row.GetRowSource())
          {
            case RealtimeRowSource.PreExisting:
              //pre-existing row at the time of subscribe
              continue;
            case RealtimeRowSource.EventInsert:
              //row was inserted after subscribe
              continue;
            case RealtimeRowSource.EventDelete:
              //row was deleted after subscribe
              continue;
          }
        }
      }
    }
  }//row cursor is disposed. row cursor is unsubscribed

  //....or....
  //Use the feature class instead of the layer
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    using (var rc = rfc.SearchAndSubscribe(qfilter, false))
    {
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        //etc
      }
    }
  }
});
Search And Subscribe With Cancellation
await QueuedTask.Run(async () =>
{
  //Recycling cursor - 2nd param "true"
  //or streamLayer.Subscribe(qfilter, true) to just subscribe
  using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
  {
    //auto-cancel after 20 seconds
    var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
    //catch TaskCanceledException
    try
    {
      while (await rc.WaitForRowsAsync(cancel.Token))
      {
        //check for row events
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            //etc
          }
        }
      }
    }
    catch (TaskCanceledException )
    {
      //Handle cancellation as needed
    }
    cancel.Dispose();
  }
});
Explicitly Cancel WaitForRowsAsync
//somewhere in our code we create a CancellationTokenSource
var cancel = new CancellationTokenSource();
//...

//call cancel on the CancellationTokenSource anywhere in
//the add-in, assuming the CancellationTokenSource is in scope
if (SomeConditionForCancel)
  cancel.Cancel();//<-- will cancel the token

//Within QueuedTask we are subscribed! streamLayer.Subscribe() or SearchAndSubscribe()
try
{
  //TaskCanceledException will be thrown when the token is cancelled
  while (await rc.WaitForRowsAsync(cancel.Token))
  {
    //check for row events
    while (rc.MoveNext())
    {
      using (var row = rc.Current)
      {
        //etc
      }
    }
  }
}
catch (TaskCanceledException )
{
  //Handle cancellation as needed
}
cancel.Dispose();
Subscribe to Streaming Data
//Note: with feature class we can also use a System Task to subscribe and
//process rows
await QueuedTask.Run(async () =>
{
  // or var rfc = realtimeDatastore.OpenTable(name) as RealtimeFeatureClass
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    //subscribe, pre-existing rows are not searched
    using (var rc = rfc.Subscribe(qfilter, false))
    {
      SpatialQueryFilter spatialFilter = new SpatialQueryFilter();
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            switch (row.GetRowSource())
            {
              case RealtimeRowSource.EventInsert:
                //getting geometry from new events as they arrive
                Polygon poly = ((RealtimeFeature)row).GetShape() as Polygon;

                //using the geometry to select features from another feature layer
                spatialFilter.FilterGeometry = poly;//project poly if needed...
                countyFeatureLayer.Select(spatialFilter);
                continue;
              default:
                continue;
            }
          }                  
        }
      }
    }//row cursor is disposed. row cursor is unsubscribed
  }
});
Search Existing Data and Subscribe for Streaming Data
//Note we can use System Task with the Realtime feature class
//for subscribe
await System.Threading.Tasks.Task.Run(async () =>
// or use ... QueuedTask.Run()
{
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    using (var rc = rfc.SearchAndSubscribe(qfilter, false))
    {
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        //pre-existing rows will be retrieved that were searched
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            var row_source = row.GetRowSource();
            switch (row_source)
            {
              case RealtimeRowSource.EventDelete:
                //TODO - handle deletes
                break;
              case RealtimeRowSource.EventInsert:
                //TODO handle inserts
                break;
              case RealtimeRowSource.PreExisting:
                //TODO handle pre-existing rows
                break;
            }
          }
        }
      }
    }//row cursor is disposed. row cursor is unsubscribed
  }
});
Search And Subscribe With Cancellation
await QueuedTask.Run(async () =>
{
  //Recycling cursor - 2nd param "true"
  //or streamLayer.Subscribe(qfilter, true) to just subscribe
  using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
  {
    //auto-cancel after 20 seconds
    var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
    //catch TaskCanceledException
    try
    {
      while (await rc.WaitForRowsAsync(cancel.Token))
      {
        //check for row events
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            //etc
          }
        }
      }
    }
    catch (TaskCanceledException )
    {
      //Handle cancellation as needed
    }
    cancel.Dispose();
  }
});
Requirements

Target Platforms: Windows 11, Windows 10

ArcGIS Pro version: 3.2 or higher.
See Also