package couchbase

import (
	"log"
	"sync"
	"time"

	"fmt"
	"github.com/couchbase/gomemcached"
	"github.com/couchbase/gomemcached/client"
	"github.com/couchbase/goutils/logging"
)

// A UprFeed streams mutation events from a bucket.
//
// Events from the bucket can be read from the channel 'C'.  Remember
// to call Close() on it when you're done, unless its channel has
// closed itself already.
type UprFeed struct {
	C <-chan *memcached.UprEvent

	bucket          *Bucket
	nodeFeeds       map[string]*FeedInfo     // The UPR feeds of the individual nodes
	output          chan *memcached.UprEvent // Same as C but writeably-typed
	outputClosed    bool
	quit            chan bool
	name            string // name of this UPR feed
	sequence        uint32 // sequence number for this feed
	connected       bool
	killSwitch      chan bool
	closing         bool
	wg              sync.WaitGroup
	dcp_buffer_size uint32
	data_chan_size  int
}

// UprFeed from a single connection
type FeedInfo struct {
	uprFeed   *memcached.UprFeed // UPR feed handle
	host      string             // hostname
	connected bool               // connected
	quit      chan bool          // quit channel
}

type FailoverLog map[uint16]memcached.FailoverLog

// GetFailoverLogs, get the failover logs for a set of vbucket ids
func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) {

	// map vbids to their corresponding hosts
	vbHostList := make(map[string][]uint16)
	vbm := b.VBServerMap()
	if len(vbm.VBucketMap) < len(vBuckets) {
		return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
			vbm.VBucketMap, vBuckets)
	}

	for _, vb := range vBuckets {
		masterID := vbm.VBucketMap[vb][0]
		master := b.getMasterNode(masterID)
		if master == "" {
			return nil, fmt.Errorf("No master found for vb %d", vb)
		}

		vbList := vbHostList[master]
		if vbList == nil {
			vbList = make([]uint16, 0)
		}
		vbList = append(vbList, vb)
		vbHostList[master] = vbList
	}

	failoverLogMap := make(FailoverLog)
	for _, serverConn := range b.getConnPools(false /* not already locked */) {

		vbList := vbHostList[serverConn.host]
		if vbList == nil {
			continue
		}

		mc, err := serverConn.Get()
		if err != nil {
			logging.Infof("No Free connections for vblist %v", vbList)
			return nil, fmt.Errorf("No Free connections for host %s",
				serverConn.host)

		}
		// close the connection so that it doesn't get reused for upr data
		// connection
		defer mc.Close()
		failoverlogs, err := mc.UprGetFailoverLog(vbList)
		if err != nil {
			return nil, fmt.Errorf("Error getting failover log %s host %s",
				err.Error(), serverConn.host)

		}

		for vb, log := range failoverlogs {
			failoverLogMap[vb] = *log
		}
	}

	return failoverLogMap, nil
}

func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) {
	return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE)
}

// StartUprFeed creates and starts a new Upr feed
// No data will be sent on the channel unless vbuckets streams are requested
func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) {

	feed := &UprFeed{
		bucket:          b,
		output:          make(chan *memcached.UprEvent, data_chan_size),
		quit:            make(chan bool),
		nodeFeeds:       make(map[string]*FeedInfo, 0),
		name:            name,
		sequence:        sequence,
		killSwitch:      make(chan bool),
		dcp_buffer_size: dcp_buffer_size,
		data_chan_size:  data_chan_size,
	}

	err := feed.connectToNodes()
	if err != nil {
		return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error())
	}
	feed.connected = true
	go feed.run()

	feed.C = feed.output
	return feed, nil
}

// UprRequestStream starts a stream for a vb on a feed
func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
	vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {

	defer func() {
		if r := recover(); r != nil {
			log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket)
		}
	}()

	vbm := feed.bucket.VBServerMap()
	if len(vbm.VBucketMap) < int(vb) {
		return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
			vb, vbm.VBucketMap)
	}

	if int(vb) >= len(vbm.VBucketMap) {
		return fmt.Errorf("Invalid vbucket id %d", vb)
	}

	masterID := vbm.VBucketMap[vb][0]
	master := feed.bucket.getMasterNode(masterID)
	if master == "" {
		return fmt.Errorf("Master node not found for vbucket %d", vb)
	}
	singleFeed := feed.nodeFeeds[master]
	if singleFeed == nil {
		return fmt.Errorf("UprFeed for this host not found")
	}

	if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags,
		vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil {
		return err
	}

	return nil
}

// UprCloseStream ends a vbucket stream.
func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error {

	defer func() {
		if r := recover(); r != nil {
			log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket)
		}
	}()

	vbm := feed.bucket.VBServerMap()
	if len(vbm.VBucketMap) < int(vb) {
		return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
			vb, vbm.VBucketMap)
	}

	if int(vb) >= len(vbm.VBucketMap) {
		return fmt.Errorf("Invalid vbucket id %d", vb)
	}

	masterID := vbm.VBucketMap[vb][0]
	master := feed.bucket.getMasterNode(masterID)
	if master == "" {
		return fmt.Errorf("Master node not found for vbucket %d", vb)
	}
	singleFeed := feed.nodeFeeds[master]
	if singleFeed == nil {
		return fmt.Errorf("UprFeed for this host not found")
	}

	if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil {
		return err
	}
	return nil
}

// Goroutine that runs the feed
func (feed *UprFeed) run() {
	retryInterval := initialRetryInterval
	bucketOK := true
	for {
		// Connect to the UPR feed of each server node:
		if bucketOK {
			// Run until one of the sub-feeds fails:
			select {
			case <-feed.killSwitch:
			case <-feed.quit:
				return
			}
			//feed.closeNodeFeeds()
			retryInterval = initialRetryInterval
		}

		if feed.closing == true {
			// we have been asked to shut down
			return
		}

		// On error, try to refresh the bucket in case the list of nodes changed:
		logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v",
			feed.bucket.Name, retryInterval)

		if err := feed.bucket.Refresh(); err != nil {
			// if we fail to refresh the bucket, exit the feed
			// MB-14917
			logging.Infof("Unable to refresh bucket %s ", err.Error())
			close(feed.output)
			feed.outputClosed = true
			feed.closeNodeFeeds()
			return
		}

		// this will only connect to nodes that are not connected or changed
		// user will have to reconnect the stream
		err := feed.connectToNodes()
		if err != nil {
			logging.Infof("Unable to connect to nodes..exit ")
			close(feed.output)
			feed.outputClosed = true
			feed.closeNodeFeeds()
			return
		}
		bucketOK = err == nil

		select {
		case <-time.After(retryInterval):
		case <-feed.quit:
			return
		}
		if retryInterval *= 2; retryInterval > maximumRetryInterval {
			retryInterval = maximumRetryInterval
		}
	}
}

func (feed *UprFeed) connectToNodes() (err error) {
	nodeCount := 0
	for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {

		// this maybe a reconnection, so check if the connection to the node
		// already exists. Connect only if the node is not found in the list
		// or connected == false
		nodeFeed := feed.nodeFeeds[serverConn.host]

		if nodeFeed != nil && nodeFeed.connected == true {
			continue
		}

		var singleFeed *memcached.UprFeed
		var name string
		if feed.name == "" {
			name = "DefaultUprClient"
		} else {
			name = feed.name
		}
		singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size)
		if err != nil {
			logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err)
			feed.closeNodeFeeds()
			return
		}
		// add the node to the connection map
		feedInfo := &FeedInfo{
			uprFeed:   singleFeed,
			connected: true,
			host:      serverConn.host,
			quit:      make(chan bool),
		}
		feed.nodeFeeds[serverConn.host] = feedInfo
		go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host)
		feed.wg.Add(1)
		nodeCount++
	}
	if nodeCount == 0 {
		return fmt.Errorf("No connection to bucket")
	}

	return nil
}

// Goroutine that forwards Upr events from a single node's feed to the aggregate feed.
func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) {
	singleFeed := nodeFeed.uprFeed

	defer func() {
		feed.wg.Done()
		if r := recover(); r != nil {
			//if feed is not closing, re-throw the panic
			if feed.outputClosed != true && feed.closing != true {
				panic(r)
			} else {
				logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully")

			}
		}
	}()

	for {
		select {
		case <-nodeFeed.quit:
			nodeFeed.connected = false
			return

		case event, ok := <-singleFeed.C:
			if !ok {
				if singleFeed.Error != nil {
					logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error)
				}
				killSwitch <- true
				return
			}
			if feed.outputClosed == true {
				// someone closed the node feed
				logging.Infof("Node need closed, returning from forwardUprEvent")
				return
			}
			feed.output <- event
			if event.Status == gomemcached.NOT_MY_VBUCKET {
				logging.Infof(" Got a not my vbucket error !! ")
				if err := feed.bucket.Refresh(); err != nil {
					logging.Errorf("Unable to refresh bucket %s ", err.Error())
					feed.closeNodeFeeds()
					return
				}
				// this will only connect to nodes that are not connected or changed
				// user will have to reconnect the stream
				if err := feed.connectToNodes(); err != nil {
					logging.Errorf("Unable to connect to nodes %s", err.Error())
					return
				}

			}
		}
	}
}

func (feed *UprFeed) closeNodeFeeds() {
	for _, f := range feed.nodeFeeds {
		logging.Infof(" Sending close to forwardUprEvent ")
		close(f.quit)
		f.uprFeed.Close()
	}
	feed.nodeFeeds = nil
}

// Close a Upr feed.
func (feed *UprFeed) Close() error {
	select {
	case <-feed.quit:
		return nil
	default:
	}

	feed.closing = true
	feed.closeNodeFeeds()
	close(feed.quit)

	feed.wg.Wait()
	if feed.outputClosed == false {
		feed.outputClosed = true
		close(feed.output)
	}

	return nil
}