# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Simple node-assignment backend using a single, static node.

This is a greatly-simplified node-assignment backend.  It keeps user records
in an SQL database, but does not attempt to do any node management.  All users
are implicitly assigned to a single, static node.

"""
import time

try:
    from urlparse import urlparse
except ImportError:
    from urllib.parse import urlparse

from mozsvc.exceptions import BackendError

from sqlalchemy import Column, Integer, String, BigInteger, Index
from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.pool import QueuePool
from sqlalchemy.sql import text as sqltext

from tokenserver.assignment import INodeAssignment
from zope.interface import implements


metadata = MetaData()


users = Table(
    "users",
    metadata,
    Column("uid", Integer(), primary_key=True, autoincrement=True,
           nullable=False),
    Column("service", String(32), nullable=False),
    Column("email", String(255), nullable=False),
    Column("generation", BigInteger(), nullable=False),
    Column("client_state", String(32), nullable=False),
    Column("created_at", BigInteger(), nullable=False),
    Column("replaced_at", BigInteger(), nullable=True),
    Column("keys_changed_at", BigInteger(), nullable=True),
    Column("node", String(255), nullable=True),
    Index('lookup_idx', 'email', 'service', 'created_at'),
)


_GET_USER_RECORDS = sqltext("""\
select
    uid, generation, client_state, created_at, replaced_at,
    keys_changed_at, node
from
    users
where
    email = :email
and
    service = :service
order by
    created_at desc, uid desc
limit
    20
""")


_CREATE_USER_RECORD = sqltext("""\
insert into
    users
    (service, email, generation, client_state, created_at, replaced_at,
     keys_changed_at, node)
values
    (:service, :email, :generation, :client_state, :timestamp, NULL,
     :keys_changed_at, :node)
""")


_UPDATE_GENERATION_NUMBER = sqltext("""\
update
    users
set
    generation = :generation
where
    service = :service and email = :email and
    generation < :generation and replaced_at is null
""")


_REPLACE_USER_RECORDS = sqltext("""\
update
    users
set
    replaced_at = :timestamp
where
    service = :service and email = :email
    and replaced_at is null and created_at < :timestamp
""")


def get_timestamp():
    """Get current timestamp in milliseconds."""
    return int(time.time() * 1000)


class StaticNodeAssignment(object):
    implements(INodeAssignment)

    def __init__(self, sqluri, node_url, **kw):
        self.sqluri = sqluri
        self.node_url = node_url
        self.driver = urlparse(sqluri).scheme.lower()
        sqlkw = {
            "logging_name": "syncserver",
            "connect_args": {},
            "poolclass": QueuePool,
            "pool_reset_on_return": True,
        }
        if self.driver == "sqlite":
            # We must mark it as safe to share sqlite connections between
            # threads.  The pool will ensure there's no race conditions.
            sqlkw["connect_args"]["check_same_thread"] = False
            # If using a :memory: database, we must use a QueuePool of
            # size 1 so that a single connection is shared by all threads.
            if urlparse(sqluri).path.lower() in ("/", "/:memory:"):
                sqlkw["pool_size"] = 1
                sqlkw["max_overflow"] = 0
        if "mysql" in self.driver:
            # Guard against the db closing idle conections.
            sqlkw["pool_recycle"] = kw.get("pool_recycle", 3600)
        self._engine = create_engine(sqluri, **sqlkw)
        users.create(self._engine, checkfirst=True)

    def get_user(self, service, email, **kw):
        params = {'service': service, 'email': email}
        res = self._engine.execute(_GET_USER_RECORDS, **params)
        try:
            row = res.fetchone()
            if row is None:
                return None
            # The first row is the most up-to-date user record.
            user = {
                'email': email,
                'uid': row.uid,
                'node': self.node_url,
                'generation': row.generation,
                'client_state': row.client_state,
                'first_seen_at': row.created_at,
                'old_client_states': {},
                'keys_changed_at': row.keys_changed_at,
            }
            # Any subsequent rows are due to old client-state values.
            old_row = res.fetchone()
            update_replaced_at = False
            while old_row is not None:
                if old_row.client_state != user['client_state']:
                    user['old_client_states'][old_row.client_state] = True
                # Make sure each old row is marked as replaced.
                # They might not be, due to races in row creation.
                if old_row.replaced_at is None:
                    update_replaced_at = True
                old_row = res.fetchone()
            if update_replaced_at:
                self._engine.execute(_REPLACE_USER_RECORDS, {
                    'service': service,
                    'email': user['email'],
                    'timestamp': row.created_at,
                }).close()
            return user
        finally:
            res.close()

    def allocate_user(self, service, email, generation=0, client_state='',
                      keys_changed_at=0, **kw):
        now = get_timestamp()
        params = {
            'service': service, 'email': email, 'generation': generation,
            'client_state': client_state, 'timestamp': now,
            'keys_changed_at': keys_changed_at, 'node': self.node_url,
        }
        res = self._engine.execute(_CREATE_USER_RECORD, **params)
        res.close()
        return {
            'email': email,
            'uid': res.lastrowid,
            'node': self.node_url,
            'generation': generation,
            'client_state': client_state,
            'first_seen_at': now,
            'old_client_states': {},
            'keys_changed_at': keys_changed_at,
        }

    def update_user(self, service, user, generation=None, client_state=None,
                    keys_changed_at=0, node=None, **kw):
        if client_state is None:
            # uid can stay the same, just update the generation number.
            if generation is not None:
                params = {
                    'service': service,
                    'email': user['email'],
                    'generation': generation,
                }
                res = self._engine.execute(_UPDATE_GENERATION_NUMBER, **params)
                res.close()
                user['generation'] = max(generation, user['generation'])
        else:
            # reject previously-seen client-state strings.
            if client_state == user['client_state']:
                raise BackendError('previously seen client-state string')
            if client_state in user['old_client_states']:
                raise BackendError('previously seen client-state string')
            # need to create a new record for new client_state.
            if generation is not None:
                generation = max(user['generation'], generation)
            else:
                generation = user['generation']
            now = get_timestamp()
            params = {
                'service': service, 'email': user['email'],
                'generation': generation, 'client_state': client_state,
                'timestamp': now,
                'keys_changed_at': keys_changed_at, 'node': node,
            }
            res = self._engine.execute(_CREATE_USER_RECORD, **params)
            res.close()
            user['uid'] = res.lastrowid
            user['generation'] = generation
            user['old_client_states'][user['client_state']] = True
            user['client_state'] = client_state
            user['keys_changed_at'] = keys_changed_at
            user['node'] = node
            # Mark old records as having been replaced.
            # If we crash here, they are unmarked and we may fail to
            # garbage collect them for a while, but the active state
            # will be undamaged.
            params = {
                'service': service, 'email': user['email'], 'timestamp': now
            }
            res = self._engine.execute(_REPLACE_USER_RECORDS, **params)
            res.close()
