Files
RasadDam_Backend/apps/warehouse/services/transaction_dashboard_service.py

111 lines
4.6 KiB
Python
Raw Normal View History

from collections import defaultdict
from django.db.models import Sum, Count, Case, When, Q, Value
from django.db.models.functions import Coalesce
from apps.authentication.models import Organization
from apps.authentication.services.service import get_all_org_child
from apps.warehouse.models import InventoryQuotaSaleTransaction, InventoryQuotaSaleItem
class TransactionDashboardService:
@staticmethod
def get_dashboard(org: Organization):
orgs_child = get_all_org_child(org=org)
orgs_child.append(org)
if org.type.key == 'ADM':
transactions = InventoryQuotaSaleTransaction.objects.all()
items = InventoryQuotaSaleItem.objects.all().select_related("gov_product", "free_product")
else:
transactions = InventoryQuotaSaleTransaction.objects.filter(
seller_organization__in=orgs_child
)
items = InventoryQuotaSaleItem.objects.filter(
transaction__seller_organization__in=orgs_child
).select_related("gov_product", "free_product")
transaction_stats = transactions.aggregate(
total_transactions=Count("id"),
success_transactions=Count("id", filter=Q(transaction_status="success")),
failed_transactions=Count("id", filter=Q(transaction_status="failed")),
waiting_transactions=Count("id", filter=Q(transaction_status="waiting")),
total_amount=Coalesce(Sum("price_paid", filter=Q(transaction_status="success")), 0),
unique_ranchers=Count("rancher", distinct=True),
)
transaction_stats['total_weight'] = items.aggregate(
total_weight=Coalesce(Sum(
"weight",
filter=Q(transaction__transaction_status="success")
), 0)
)['total_weight']
products_stats = items.values(
product_id=Case(
When(gov_product__isnull=False, then="gov_product_id"),
When(free_product__isnull=False, then="free_product_id"),
),
product_name=Case(
When(gov_product__isnull=False, then="gov_product__name"),
When(free_product__isnull=False, then="free_product__product__name"),
),
product_type=Case(
When(gov_product__isnull=False, then=Value("gov")),
When(free_product__isnull=False, then=Value("free")),
)
).annotate(
total_sales=Count("id"),
total_weight=Coalesce(Sum("weight"), 0),
# total_price=Coalesce(Sum("total_price"), 0),
# avg_unit_price=Coalesce(Sum("total_price") / Sum("weight"), 0),
success_sales=Count("id", filter=Q(transaction__transaction_status="success")),
failed_sales=Count("id", filter=Q(transaction__transaction_status="failed")),
waiting_sales=Count("id", filter=Q(transaction__transaction_status="waiting")),
card_payments=Count("id", filter=Q(transaction__price_type="card")),
cash_payments=Count("id", filter=Q(transaction__price_type="cash")),
check_payments=Count("id", filter=Q(transaction__price_type="check")),
credit_payments=Count("id", filter=Q(transaction__price_type="credit")),
extra_items=Count("id", filter=Q(is_extra=True)),
pre_sale_items=Count("id", filter=Q(is_pre_sale=True)),
).order_by("-total_sales")
# calculate sum of item share percentage by product
items_by_product = defaultdict(list)
for item in items:
pid = item.gov_product_id or item.free_product_id
items_by_product[pid].append(item)
for product in products_stats:
pid = product["product_id"]
share_totals = defaultdict(lambda: {"total_price": 0, "count": 0})
for item in items_by_product.get(pid, []):
if item.item_share:
for share in item.item_share:
# share: {"name": ..., "price": ..., "shaba": ...}
name = share.get("name")
price = share.get("price", 0)
share_totals[name]["total_price"] += price
share_totals[name]["count"] += 1
product["item_share_stats"] = sorted(
[
{"name": name, **val}
for name, val in share_totals.items()
],
key=lambda x: -x["total_price"]
)
return {
"transaction_summary": transaction_stats,
"product_summary": list(products_stats),
}