ArcGIS Pro 3.2 API Reference Guide
ArcGIS.Core.Data.Realtime Namespace / RealtimeCursorBase Class / WaitForRowsAsync Method / WaitForRowsAsync(CancellationToken) Method
A System.Threading.CancellationToken used to control cancellation behavior for the returned task.
Example

In This Topic
    WaitForRowsAsync(CancellationToken) Method
    In This Topic
    Asynchronously waits for new rows to be available in the internal queue of this real-time cursor. The returned System.Threading.Tasks.Task will also complete if the state of this RealtimeCursor (see GetState) changes from RealtimeCursorState.Subscribed. This method can be called on any thread.
    Syntax
    public Task<bool> WaitForRowsAsync( 
       CancellationToken cancellationToken
    )
    Public Overloads Function WaitForRowsAsync( _
       ByVal cancellationToken As CancellationToken _
    ) As Task(Of Boolean)

    Parameters

    cancellationToken
    A System.Threading.CancellationToken used to control cancellation behavior for the returned task.

    Return Value

    A System.Threading.Tasks.Task that will complete once this cursor becomes unsubscribed or new rows are available in the internal queue of this real-time cursor. The returned System.Boolean value is false if this cursor is unsubscribed and there are no more rows to be read. Otherwise it's true.
    Exceptions
    ExceptionDescription
    A geodatabase-related exception has occurred.
    Example
    Submit a Graph Query
    //On the QueuedTask...
    //and assuming you have established a connection to a knowledge graph
    //...
    //Construct an openCypher query - return the first 10 entities (whatever
    //they are...)
    var query = "MATCH (n) RETURN n LIMIT 10";//default limit is 100 if not specified
    //other examples...
    //query = "MATCH (a:Person) RETURN [a.name, a.age] ORDER BY a.age DESC LIMIT 50";
    //query = "MATCH (b:Person) RETURN { Xperson: { Xname: b.name, Xage: b.age } } ORDER BY b.name DESC";
    //query = "MATCH p = (c:Person)-[:HasCar]-() RETURN p ORDER BY c.name DESC";
    
    //Create a query filter
    //Note: OutputSpatialReference is currently ignored
    var kg_qf = new KnowledgeGraphQueryFilter()
    {
      QueryText = query
    };
    //Optionally - u can choose to include provenance in the results
    //(_if_ the KG has provenance - otherwise the query will fail)
    if (includeProvenanceIfPresent)
    {
      //see "Get Whether KG Supports Provenance" snippet
      if (KnowledgeGraphSupportsProvenance(kg))
      {
        //Only include if the KG has provenance
        kg_qf.ProvenanceBehavior = 
          KnowledgeGraphProvenanceBehavior.Include;//default is exclude
      }
    }
    //submit the query - returns a KnowledgeGraphCursor
    using (var kg_rc = kg.SubmitQuery(kg_qf))
    {
      //wait for rows to be returned from the server
      //note the "await"...
      while (await kg_rc.WaitForRowsAsync())
      {
        //Rows have been retrieved - process this "batch"...
        while (kg_rc.MoveNext())
        {
          //Get the current KnowledgeGraphRow
          using (var graph_row = kg_rc.Current)
          {
            //Graph row is an array, process all returned values...
            var val_count = (int)graph_row.GetCount();
            for (int i = 0; i < val_count; i++)
            {
              var retval = graph_row[i];
              //Process row value (note: recursive)
              //See "Process a KnowledgeGraphRow Value" snippet
              ProcessKnowledgeGraphRowValue(retval);
            }
          }
        }
      }//WaitForRowsAsync
    }//SubmitQuery
    Submit a Text Search
    //On the QueuedTask...
    //and assuming you have established a connection to a knowledge graph
    //...
    //Construct a KG search filter. Search text uses Apache Lucene Query Parser
    //syntax - https://lucene.apache.org/core/2_9_4/queryparsersyntax.html
    var kg_sf = new KnowledgeGraphSearchFilter()
    {
      SearchTarget = KnowledgeGraphNamedTypeCategory.Entity,
      SearchText = "Acme Electric Co.",
      ReturnSearchContext = true,
      MaxRowCount = 10 //Default is 100 if not specified
    };
          
    //submit the search - returns a KnowledgeGraphCursor
    using (var kg_rc = kg.SubmitSearch(kg_sf))
    {
      //wait for rows to be returned from the server
      //note the "await"...
      while (await kg_rc.WaitForRowsAsync())
      {
        //Rows have been retrieved - process this "batch"...
        while (kg_rc.MoveNext())
        {
          //Get the current KnowledgeGraphRow
          using (var graph_row = kg_rc.Current)
          {
            //Graph row is an array, process all returned values...
            var val_count = (int)graph_row.GetCount();
            for (int i = 0; i < val_count; i++)
            {
              var retval = graph_row[i];
              //Process row value (note: recursive)
              //See "Process a KnowledgeGraphRow Value" snippet
              ProcessKnowledgeGraphRowValue(retval);
            }
          }
        }
      }//WaitForRowsAsync
    }//SubmitSearch
    Call WaitForRowsAsync With Cancellation
    //On the QueuedTask...
    //and assuming you have established a connection to a knowledge graph
    //...
    //submit query or search to return a KnowledgeGraphCursor
    //using (var kg_rc = kg.SubmitQuery(kg_qf)) {
    //using (var kg_rc = kg.SubmitSearch(kg_sf)) {
    //...
    //wait for rows to be returned from the server
    //"auto-cancel" after 20 seconds
    var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
    //catch TaskCanceledException
    try
    {
      while (await kg_rc.WaitForRowsAsync(cancel.Token))
      {
        //check for row events
        while (kg_rc.MoveNext())
        {
          using (var graph_row = kg_rc.Current)
          {
            //Graph row is an array, process all returned values...
            var val_count = (int)graph_row.GetCount();
            for (int i = 0; i < val_count; i++)
            {
              var retval = graph_row[i];
              //Process row value (note: recursive)
              //See "Process a KnowledgeGraphRow Value" snippet
              ProcessKnowledgeGraphRowValue(retval);
            }
          }
        }
      }
    }
    //Timeout expired
    catch (TaskCanceledException tce)
    {
      //Handle cancellation as needed
    }
    cancel.Dispose();
    Search And Subscribe for Streaming Data
    await QueuedTask.Run(async () =>
    {
      //query filter can be null to search and retrieve all rows
      //true means recycling cursor
      using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
      {
        //waiting for new features to be streamed
        //default is no cancellation
        while (await rc.WaitForRowsAsync())
        {
          while (rc.MoveNext())
          {
            using (var row = rc.Current)
            {
              //determine the origin of the row event
              switch (row.GetRowSource())
              {
                case RealtimeRowSource.PreExisting:
                  //pre-existing row at the time of subscribe
                  continue;
                case RealtimeRowSource.EventInsert:
                  //row was inserted after subscribe
                  continue;
                case RealtimeRowSource.EventDelete:
                  //row was deleted after subscribe
                  continue;
              }
            }
          }
        }
      }//row cursor is disposed. row cursor is unsubscribed
    
      //....or....
      //Use the feature class instead of the layer
      using (var rfc = streamLayer.GetFeatureClass())
      {
        //non-recycling cursor - 2nd param "false"
        using (var rc = rfc.SearchAndSubscribe(qfilter, false))
        {
          //waiting for new features to be streamed
          //default is no cancellation
          while (await rc.WaitForRowsAsync())
          {
            //etc
          }
        }
      }
    });
    Search And Subscribe With Cancellation
    await QueuedTask.Run(async () =>
    {
      //Recycling cursor - 2nd param "true"
      //or streamLayer.Subscribe(qfilter, true) to just subscribe
      using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
      {
        //auto-cancel after 20 seconds
        var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
        //catch TaskCanceledException
        try
        {
          while (await rc.WaitForRowsAsync(cancel.Token))
          {
            //check for row events
            while (rc.MoveNext())
            {
              using (var row = rc.Current)
              {
                //etc
              }
            }
          }
        }
        catch (TaskCanceledException )
        {
          //Handle cancellation as needed
        }
        cancel.Dispose();
      }
    });
    Explicitly Cancel WaitForRowsAsync
    //somewhere in our code we create a CancellationTokenSource
    var cancel = new CancellationTokenSource();
    //...
    
    //call cancel on the CancellationTokenSource anywhere in
    //the add-in, assuming the CancellationTokenSource is in scope
    if (SomeConditionForCancel)
      cancel.Cancel();//<-- will cancel the token
    
    //Within QueuedTask we are subscribed! streamLayer.Subscribe() or SearchAndSubscribe()
    try
    {
      //TaskCanceledException will be thrown when the token is cancelled
      while (await rc.WaitForRowsAsync(cancel.Token))
      {
        //check for row events
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            //etc
          }
        }
      }
    }
    catch (TaskCanceledException )
    {
      //Handle cancellation as needed
    }
    cancel.Dispose();
    
    Subscribe to Streaming Data
    //Note: with feature class we can also use a System Task to subscribe and
    //process rows
    await QueuedTask.Run(async () =>
    {
      // or var rfc = realtimeDatastore.OpenTable(name) as RealtimeFeatureClass
      using (var rfc = streamLayer.GetFeatureClass())
      {
        //non-recycling cursor - 2nd param "false"
        //subscribe, pre-existing rows are not searched
        using (var rc = rfc.Subscribe(qfilter, false))
        {
          SpatialQueryFilter spatialFilter = new SpatialQueryFilter();
          //waiting for new features to be streamed
          //default is no cancellation
          while (await rc.WaitForRowsAsync())
          {
            while (rc.MoveNext())
            {
              using (var row = rc.Current)
              {
                switch (row.GetRowSource())
                {
                  case RealtimeRowSource.EventInsert:
                    //getting geometry from new events as they arrive
                    Polygon poly = ((RealtimeFeature)row).GetShape() as Polygon;
    
                    //using the geometry to select features from another feature layer
                    spatialFilter.FilterGeometry = poly;//project poly if needed...
                    countyFeatureLayer.Select(spatialFilter);
                    continue;
                  default:
                    continue;
                }
              }                  
            }
          }
        }//row cursor is disposed. row cursor is unsubscribed
      }
    });
    Search Existing Data and Subscribe for Streaming Data
    //Note we can use System Task with the Realtime feature class
    //for subscribe
    await System.Threading.Tasks.Task.Run(async () =>
    // or use ... QueuedTask.Run()
    {
      using (var rfc = streamLayer.GetFeatureClass())
      {
        //non-recycling cursor - 2nd param "false"
        using (var rc = rfc.SearchAndSubscribe(qfilter, false))
        {
          //waiting for new features to be streamed
          //default is no cancellation
          while (await rc.WaitForRowsAsync())
          {
            //pre-existing rows will be retrieved that were searched
            while (rc.MoveNext())
            {
              using (var row = rc.Current)
              {
                var row_source = row.GetRowSource();
                switch (row_source)
                {
                  case RealtimeRowSource.EventDelete:
                    //TODO - handle deletes
                    break;
                  case RealtimeRowSource.EventInsert:
                    //TODO handle inserts
                    break;
                  case RealtimeRowSource.PreExisting:
                    //TODO handle pre-existing rows
                    break;
                }
              }
            }
          }
        }//row cursor is disposed. row cursor is unsubscribed
      }
    });
    
    Requirements

    Target Platforms: Windows 11, Windows 10

    ArcGIS Pro version: 3.2 or higher.
    See Also