Source code for aiogremlin.remote.driver_remote_side_effects
from gremlin_python.driver import request
from gremlin_python.process import traversal
[docs]class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects):
def __init__(self, side_effect, client):
self._side_effect = side_effect
self._client = client
self._keys = set()
self._side_effects = {}
self._closed = False
async def __getitem__(self, key):
if isinstance(key, slice):
raise TypeError(
'AsyncRemoteTraversalSideEffects does not support slicing')
return await self.get(key)
[docs] async def keys(self):
"""Get side effect keys associated with Traversal"""
if not self._closed:
message = request.RequestMessage(
'traversal', 'keys',
{'sideEffect': self._side_effect,
'aliases': self._client.aliases})
result_set = await self._client.submit(message)
results = await result_set.all()
self._keys = set(results)
return self._keys
[docs] async def get(self, key):
"""Get side effects associated with a specific key"""
if not self._side_effects.get(key):
if not self._closed:
results = await self._get(key)
self._side_effects[key] = results
self._keys.add(key)
else:
return None
return self._side_effects[key]
async def _get(self, key):
message = request.RequestMessage(
'traversal', 'gather',
{'sideEffect': self._side_effect, 'sideEffectKey': key,
'aliases': self._client.aliases})
result_set = await self._client.submit(message)
return await self._aggregate_results(result_set)
[docs] async def close(self):
"""Release side effects"""
if not self._closed:
message = request.RequestMessage(
'traversal', 'close',
{'sideEffect': self._side_effect,
'aliases': {'g': self._client.aliases}})
result_set = await self._client.submit(message)
self._closed = True
return await result_set.one()
async def _aggregate_results(self, result_set):
aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
'none': None}
results = None
async for msg in result_set:
if results is None:
aggregate_to = result_set.aggregate_to
results = aggregates.get(aggregate_to, [])
# on first message, get the right result data structure
# if there is no update to a structure, then the item is the result
if results is None:
results = msg
# updating a map is different than a list or a set
elif isinstance(results, dict):
if aggregate_to == "map":
results.update(msg)
else:
results[msg.object] = msg.bulk
elif isinstance(results, set):
results.update(msg)
# flat add list to result list
else:
results.append(msg)
if results is None:
results = []
return results