Source code for deephaven_enterprise.edge_acl
import jpy
from deephaven import DHError
from deephaven.table import Table
from deephaven.jcompat import j_array_list
from deephaven._wrapper import unwrap
_j_provider_type = jpy.get_type("io.deephaven.enterprise.acl.EdgeAclProvider")
_j_generator_type = jpy.get_type("io.deephaven.enterprise.acl.AclFilterGenerator")
[docs]class EdgeAclProvider:
"""
This object provides ACLs for tables based on the requesting user at request time.
"""
[docs] @staticmethod
def builder():
"""
Create a builder to construct an EdgeAclProvider
:return: a new builder
"""
return EdgeAclProviderBuilder(_j_provider_type.builder())
@staticmethod
def add_object_acl(group: str, value):
_j_provider_type.addAcl(group, unwrap(value))
def __init__(self, j_provider: jpy.JType):
self.j_provider = jpy.cast(j_provider, _j_provider_type)
if self.j_provider is None:
raise DHError("j_provider type is not io.deephaven.enterprise.database.Database")
[docs] def apply_to(self, table: Table):
"""
Attach this ACL Provider to a table
:param table: the table to attach to
:return: the table with the ACLs attached
"""
return Table(self.j_provider.applyTo(table.j_table))
[docs]class EdgeAclProviderBuilder:
j_object_type = jpy.get_type("io.deephaven.enterprise.acl.EdgeAclProvider$Builder")
def __init__(self, j_builder: j_object_type):
self.j_builder = j_builder
[docs] def row_acl(self, group: str, acl: _j_generator_type):
"""
Add a Row ACL for the specified group to the table.
:param group: the group the ACL applies to
:param acl: the generator that will produce a filter for a given user
:return: this builder
"""
self.j_builder.rowAcl(group, acl)
return self
[docs] def column_acl(self, group: str, columns: list, acl: _j_generator_type):
"""
Add a Column ACL for the specified group and column to the table.
:param group: the group the ACL applies to
:param columns: the column the acl applies to
:param acl: the generator that will produce a filter for a given user
:return: this builder
"""
self.j_builder.columnAcl(group, j_array_list(columns), acl)
return self
[docs] def build(self):
"""
Construct the completed EdgeAclProvider object from this builder state
:return: a new EdgeAclProvider
"""
return EdgeAclProvider(self.j_builder.build())