from __future__ import annotations
import base64
import Cryptodome.PublicKey.DSA
import Cryptodome.Hash.SHA256
import Cryptodome.Signature.DSS
import re
import time
import threading
import deephaven_enterprise.proto.auth_service_pb2_grpc
import deephaven_enterprise.proto.auth_pb2
import grpc
import deephaven_enterprise.client.util
class RefreshThread(threading.Thread):
def __init__(self, auth_client: "AuthClient", slack_seconds: int = 60 * 2):
"""
Create a thread to refresh the authentication cookie. If the cookie is not refreshed within the deadline,
then we will become unauthenticated and not be able to generate any more tokens.
:param auth_client: auth client to refresh
:param slack_seconds: how long before the cookie's expiration, in seconds, should we begin the refresh.
Defaults to two minutes.
"""
threading.Thread.__init__(self)
self.daemon = True
self.auth_client = auth_client
self.slack = slack_seconds
def run(self):
# We don't want to hit the server too hard
minimum_sleeps: list[int] = [1, 1, 2, 3, 5, 8, 13]
sleep_index = 0
while self.auth_client.cookie is not None:
start_time = time.time()
next_deadline_millis = self.auth_client.cookie_deadline_time_millis
if next_deadline_millis is None or next_deadline_millis < start_time:
# we've failed to refresh, we could try re-authentication with a private key if that was our method
break
next_deadline_seconds = next_deadline_millis / 1000
to_sleep: float = next_deadline_seconds - start_time - self.slack
time.sleep(to_sleep)
self.auth_client.ping()
# We expect the deadline to handle most sleeps, this logic prevents tight loops.
duration_seconds: float = (time.time() - start_time)
if duration_seconds < minimum_sleeps[sleep_index]:
catchup_seconds = minimum_sleeps[sleep_index] - \
duration_seconds
time.sleep(catchup_seconds)
sleep_index = min(len(minimum_sleeps) - 1, sleep_index + 1)
else:
sleep_index = 0
[docs]class AuthClient:
"""
AuthClient authenticates to a Deephaven authentication server and produces tokens for use with other Deephaven
services.
Presently, password and private key authentication are provided.
"""
cookie = None
cookie_deadline_time_millis = None
refresh_thread = None
channel: grpc.Channel = None
opened_channel: bool = None
rpc_timeout_secs: int = None
def __init__(self, host: str = None, port: int = None, rpc_timeout_seconds: int = 120, channel: grpc.Channel = None):
"""
Create an AuthClient and connect to the server.
You may either specify the host and port to connect to, or provide your own grpc channel. The simplest case
is to simply provide the host and port, but if you need advanced channel configuration or want to share the
channel for several clients, then you can create it and pass it in.
:param host: the host to connect to, requires port
:param port: the port to connect to, requires host
:param rpc_timeout_seconds: the rpc timeout period to use, defaults to 120 seconds if not provided
:param channel: a pre-created channel to use for the gRPC messages
"""
self.rpc_timeout_secs = rpc_timeout_seconds
(self.channel, self.me) = deephaven_enterprise.client.util.get_grpc_channel(
channel=channel, host=host, port=port)
self.opened_channel = channel is None
self.clientID = deephaven_enterprise.client.util.make_client_id(
self.me)
self.stub = deephaven_enterprise.proto.auth_service_pb2_grpc.AuthApiStub(
self.channel)
self._send_ping()
def _send_ping(self) -> float:
"""
Sends a common ping request over the stub.
:param stub: the stub to send a ping request over
:param me: client identifier
:return: the RTT in milliseconds of the ping
"""
pingreq = deephaven_enterprise.proto.common_pb2.PingRequest()
sendTime = time.time_ns()
pingreq.sender_send_time_millis = int(sendTime / 1_000_000)
pingreq.__setattr__("from", self.me)
self.stub.ping(pingreq, timeout=self.rpc_timeout_secs,
wait_for_ready=True)
rtt = (time.time_ns() - sendTime) / 1_000_000
return rtt
[docs] def password(self, user: str, password: str, effective_user: str = None) -> None:
"""
Authenticates to the server using a username and password.
:param user: the user to authenticate
:param password: the user's password
:param effective_user: the user to operate as, defaults to the user to authenticate
"""
uc = deephaven_enterprise.proto.auth_pb2.UserContext()
uc.authenticatedUser = user
if effective_user is None:
uc.effectiveUser = user
else:
uc.effectiveUser = effective_user
abp = deephaven_enterprise.proto.auth_pb2.AuthenticateByPasswordRequest()
abp.client_id.CopyFrom(self.clientID)
abp.user_context.CopyFrom(uc)
abp.password = password
ar = self.stub.authenticateByPassword(abp,
timeout=self.rpc_timeout_secs, wait_for_ready=True)
if not ar.result.authenticated:
raise AuthenticationFailedException(
"Failed to authenticate with password.")
self.__set_cookie(ar)
[docs] def private_key(self, file: str) -> None:
"""
Authenticate to the server using a Deephaven format private key file.
https://deephaven.io/enterprise/docs/resources/how-to/connect-from-java/#instructions-for-setting-up-private-keys
:param file: the file containing the private key produced by generate-iris-keys
"""
# Read the key information from the file
keydict = {}
for x in open(file, 'r').readlines():
stripped = x.split("#", 1)[0].strip()
if len(stripped) == 0:
continue
kv = re.split("\\s+", stripped, 2)
keydict[kv[0]] = kv[1]
for field in ["public", "private", "user", "operateas"]:
if field not in keydict:
raise Exception(
"'%s' is not a Deephaven private key file, %s is not present" % (file, field))
try:
# pkcs8 encoded keyspec
public_pkcs8_bytes = base64.b64decode(keydict["public"])
private_pkcs8_bytes = base64.b64decode(keydict["private"])
# Turn them into usable keys
private_key = Cryptodome.PublicKey.DSA.importKey(
private_pkcs8_bytes)
public_key = Cryptodome.PublicKey.DSA.importKey(public_pkcs8_bytes)
except Exception as e:
raise Exception("Invalid private key file '%s'" % file) from e
# Get the nonce
nr = deephaven_enterprise.proto.auth_pb2.GetNonceRequest()
nr.client_id.CopyFrom(self.clientID)
gnr = self.stub.getNonce(nr)
# Sign the nonce
try:
hash_of_nonce = Cryptodome.Hash.SHA256.new(gnr.nonce)
signer = Cryptodome.Signature.DSS.new(
private_key, 'fips-186-3', encoding="der")
signature = signer.sign(hash_of_nonce)
except Exception as e:
raise Exception(
"Could not sign nonce with private key '%s'" % file) from e
# Now verify our signature locally, just in case
try:
verifier = Cryptodome.Signature.DSS.new(
public_key, 'fips-186-3', encoding="der")
verifier.verify(hash_of_nonce, signature)
except Exception as e:
raise Exception(
"Could not verify our own signature with private key '%s'" % file) from e
# Produce the authentication message
abpk = deephaven_enterprise.proto.auth_pb2.AuthenticateByPublicKeyRequest()
abpk.user_context.authenticatedUser = keydict["user"]
abpk.user_context.effectiveUser = keydict["operateas"]
abpk.client_id.CopyFrom(self.clientID)
abpk.public_key = public_pkcs8_bytes
abpk.challenge_response = signature
abpk.ip_address = gnr.ip_address
# Do the authentication
response = self.stub.authenticateByPublicKey(abpk,
timeout=self.rpc_timeout_secs, wait_for_ready=True)
if not response.result.authenticated:
raise AuthenticationFailedException(
"Failed to authenticate with private key.")
self.__set_cookie(response)
[docs] def get_token(self, service: str) -> deephaven_enterprise.proto.auth_pb2.Token:
"""
Get an authentication token to present to another Deephaven service. This token may only be used one time,
as it is consumed by the authentication server during the verification process.
:param service: the service that will verify the token (e.g., "PersistentQueryController")
:return: the token
"""
gtr: deephaven_enterprise.proto.auth_pb2.GetTokenRequest = deephaven_enterprise.proto.auth_pb2.GetTokenRequest()
gtr.service = service
gtr.cookie = self.cookie
return self.stub.getToken(gtr,
timeout=self.rpc_timeout_secs, wait_for_ready=True).token
[docs] def close(self) -> None:
"""
Logout from the authentication server. No further tokens may be requested by this client.
"""
if self.cookie is not None:
ivcr = deephaven_enterprise.proto.auth_pb2.InvalidateCookieRequest()
ivcr.cookie = self.cookie
self.cookie = None
self.cookie_deadline_time_millis = None
self.stub.invalidateCookie(ivcr,
timeout=self.rpc_timeout_secs, wait_for_ready=True)
if self.opened_channel:
self.channel.close()
[docs] def ping(self):
"""
Pings the server, refreshing our cookie.
:returns: True if a ping was sent, False if there is no active cookie.
"""
cookie = self.cookie
if cookie is None:
return False
rcreq = deephaven_enterprise.proto.auth_pb2.RefreshCookieRequest()
rcreq.cookie = cookie
rcresp = self.stub.refreshCookie(rcreq,
timeout=self.rpc_timeout_secs, wait_for_ready=True)
self.cookie_deadline_time_millis = rcresp.cookie_deadline_time_millis
return True
def __set_cookie(self, ar):
self.cookie = ar.result.cookie
self.cookie_deadline_time_millis = ar.result.cookie_deadline_time_millis
self.__start_refresh()
def __start_refresh(self):
self.refresh_thread = RefreshThread(self)
self.refresh_thread.start()
[docs]class AuthenticationFailedException(Exception):
"""
This Exception is raised when the server responds to our authentication request with a failure (e.g. bad password or
bad key). Other errors, like the server not responding at all are not covered by this Exception.
"""
pass