nyasukun's picture
update app.py
bec1fe0
raw
history blame
16.6 kB
import math, json
import gradio as gr
import torch, pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# ZeroGPU support
try:
import spaces
ZEROGPU_AVAILABLE = True
print("ZeroGPU support enabled")
except ImportError:
ZEROGPU_AVAILABLE = False
print("ZeroGPU not available, running in standard mode")
# Create dummy decorator for local development
def spaces_gpu_decorator(duration=60):
def decorator(func):
return func
return decorator
spaces = type('spaces', (), {'GPU': spaces_gpu_decorator})
# Model configuration - can be replaced with other models
MODEL_NAME = "fdtn-ai/Foundation-Sec-8B"
#MODEL_NAME = "sshleifer/tiny-gpt2"
# Initialize tokenizer and model using pipeline approach
print(f"Loading model: {MODEL_NAME}")
try:
print(f"Initializing text generation model: {MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
text_pipeline = pipeline(
"text-generation",
model=MODEL_NAME,
tokenizer=tokenizer,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)
print(f"Model initialized successfully: {MODEL_NAME}")
# Extract model and tokenizer from pipeline for direct access
model = text_pipeline.model
tok = text_pipeline.tokenizer
except Exception as e:
print(f"Error initializing model {MODEL_NAME}: {str(e)}")
print("Falling back to tiny-gpt2...")
MODEL_NAME = "sshleifer/tiny-gpt2"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
text_pipeline = pipeline(
"text-generation",
model=MODEL_NAME,
tokenizer=tokenizer,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)
model = text_pipeline.model
tok = text_pipeline.tokenizer
print(f"Fallback model loaded: {MODEL_NAME}")
# Log device information
if hasattr(model, 'device'):
print(f"Model loaded on device: {model.device}")
else:
device_info = next(model.parameters()).device
print(f"Model parameters on device: {device_info}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA device count: {torch.cuda.device_count()}")
print(f"Current CUDA device: {torch.cuda.current_device()}")
print(f"CUDA device name: {torch.cuda.get_device_name()}")
# Configuration parameters
LEN_ALPHA = 0.7 # Length correction factor (0=no correction, 1=full average logP)
# Sample data for testing
CAMPAIGN_LIST = [
"Operation Aurora",
"Dust Storm",
"ShadowHammer",
"NotPetya",
"SolarWinds",
]
ACTOR_LIST = ["APT1", "APT28", "APT33", "APT38", "FIN8"]
# Sample ATT&CK technique IDs with names
TECHNIQUE_LIST = [
"T1059 Command and Scripting Interpreter",
"T1566 Phishing",
"T1027 Obfuscated/Stored Files",
"T1036 Masquerading",
"T1105 Ingress Tool Transfer",
"T1018 Remote System Discovery",
"T1568 Dynamic Resolution",
]
@spaces.GPU(duration=120)
@torch.no_grad()
def phrase_log_prob(prompt, phrase):
"""Calculate log probability of a phrase given a prompt using the language model."""
try:
# Log GPU usage information
device_info = next(model.parameters()).device
print(f"Running phrase_log_prob on device: {device_info}")
ids_prompt = tok(prompt, return_tensors="pt").to(model.device)["input_ids"][0]
ids_phrase = tok(phrase, add_special_tokens=False)["input_ids"]
lp = 0.0
cur = ids_prompt.unsqueeze(0)
for tid in ids_phrase:
logits = model(cur).logits[0, -1].float()
lp += torch.log_softmax(logits, -1)[tid].item()
cur = torch.cat([cur, torch.tensor([[tid]], device=model.device)], 1)
return lp
except Exception as e:
print(f"Error in phrase_log_prob: {e}")
raise e
def binary_assoc_score(prompt: str, phrase: str, neg="does NOT use", prompt_template="typically uses") -> float:
"""
Calculate binary association score: p ≈ P(use) / (P(use)+P(not use))
Applies length normalization to correct for longer phrases.
Args:
prompt: Base prompt string
phrase: Phrase to evaluate
neg: Negative template to replace positive template
prompt_template: Positive template to be replaced
Returns:
Length-normalized association score between 0 and 1
"""
lp_pos = phrase_log_prob(prompt, phrase)
lp_neg = phrase_log_prob(prompt.replace(prompt_template, neg), phrase)
# Logistic transformation
prob = 1 / (1 + math.exp(lp_neg - lp_pos))
# Length normalization
n_tok = len(tok(phrase, add_special_tokens=False)["input_ids"])
return prob / (n_tok ** LEN_ALPHA)
def campaign_actor_associations(campaigns, actors):
"""Campaign × Actor の関連度を計算し、各CampaignごとにTop Actorを返す"""
results = {}
for camp in campaigns:
prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp)
actor_scores = {}
for actor in actors:
score = binary_assoc_score(prompt_base, actor, neg="is NOT associated with")
actor_scores[actor] = score
# スコア順でソート
sorted_actors = sorted(actor_scores.items(), key=lambda x: x[1], reverse=True)
results[camp] = sorted_actors
return results
def campaign_technique_matrix(campaigns, techniques, prompt_template="typically uses", neg_template="typically does NOT use"):
"""
Generate Campaign × Technique association matrix using binary scoring.
Args:
campaigns: List of campaign names
techniques: List of technique names
prompt_template: Template for positive association
neg_template: Template for negative association
Returns:
DataFrame with campaigns as rows, techniques as columns, scores as values
"""
rows = {}
for camp in campaigns:
prompt_base = f"{camp} {prompt_template}"
rows[camp] = {
tech: binary_assoc_score(prompt_base, tech, neg=neg_template, prompt_template=prompt_template)
for tech in techniques
}
return pd.DataFrame.from_dict(rows, orient="index")
def campaign_actor_matrix(campaigns, actors):
"""Campaign × Actor 行列を生成"""
rows = {}
for camp in campaigns:
prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp)
rows[camp] = {
actor: binary_assoc_score(prompt_base, actor, neg="is NOT associated with")
for actor in actors
}
return pd.DataFrame.from_dict(rows, orient="index")
def campaign_actor_probs(campaigns, actors, prompt_template="is conducted by"):
"""
Generate Campaign × Actor probability matrix using softmax normalization.
Args:
campaigns: List of campaign names
actors: List of actor names
prompt_template: Template for actor association prompt
Returns:
DataFrame with campaigns as rows, actors as columns, probabilities as values
"""
rows = {}
for camp in campaigns:
prompt = f"{camp} {prompt_template}"
logps = [phrase_log_prob(prompt, a) for a in actors]
# Softmax normalization (with max-shift for numerical stability)
m = max(logps)
ps = [math.exp(lp - m) for lp in logps]
s = sum(ps)
rows[camp] = {a: p/s for a, p in zip(actors, ps)}
return pd.DataFrame.from_dict(rows, orient="index")
def generate_actor_heatmap(c_list, a_list, actor_prompt_template):
"""Generate Campaign-Actor association heatmap with probability visualization."""
try:
campaigns = [c.strip() for c in c_list.split(",") if c.strip()]
actors = [a.strip() for a in a_list.split(",") if a.strip()]
if not campaigns or not actors:
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, 'Please enter both Campaigns and Actors',
ha='center', va='center', fontsize=16)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
print(f"Processing {len(campaigns)} campaigns and {len(actors)} actors...")
print(f"Using prompt template: '{actor_prompt_template}'")
# Check GPU availability
if torch.cuda.is_available():
print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}")
else:
print("Running on CPU")
# Calculate probability matrix
df_ca = campaign_actor_probs(campaigns, actors, actor_prompt_template)
print(f"Actor probability matrix shape: {df_ca.shape}")
print("Actor probability matrix:")
print(df_ca.round(4))
# Create heatmap with matplotlib/seaborn
fig, ax = plt.subplots(figsize=(max(8, len(actors)*1.2), max(6, len(campaigns)*0.8)))
sns.heatmap(df_ca, annot=True, cmap='plasma', fmt='.3f',
cbar_kws={'label': 'P(actor)'}, ax=ax)
ax.set_title('Campaign-Actor Probabilities (softmax normalized)',
fontsize=14, pad=20)
ax.set_xlabel('Actor', fontsize=12)
ax.set_ylabel('Campaign', fontsize=12)
# Adjust label rotation
plt.setp(ax.get_xticklabels(), rotation=45, ha='right')
plt.setp(ax.get_yticklabels(), rotation=0)
plt.tight_layout()
print("Actor heatmap generated successfully!")
return fig
except Exception as e:
print(f"Error in generate_actor_heatmap: {e}")
import traceback
traceback.print_exc()
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, f'Error occurred: {str(e)}',
ha='center', va='center', fontsize=12, color='red')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
def generate_technique_heatmap(c_list, t_list, technique_prompt_template, technique_neg_template):
"""Generate Campaign-Technique association heatmap with binary scoring visualization."""
try:
campaigns = [c.strip() for c in c_list.split(",") if c.strip()]
techniques = [t.strip() for t in t_list.split(",") if t.strip()]
if not campaigns or not techniques:
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, 'Please enter both Campaigns and Techniques',
ha='center', va='center', fontsize=16)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
print(f"Processing {len(campaigns)} campaigns and {len(techniques)} techniques...")
print(f"Using prompt templates: '{technique_prompt_template}' / '{technique_neg_template}'")
# Check GPU availability
if torch.cuda.is_available():
print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}")
else:
print("Running on CPU")
# Calculate score matrix
df_ct = campaign_technique_matrix(campaigns, techniques, technique_prompt_template, technique_neg_template)
print(f"Score matrix shape: {df_ct.shape}")
print("Score matrix:")
print(df_ct.round(4))
# Create heatmap with matplotlib/seaborn
fig, ax = plt.subplots(figsize=(max(8, len(techniques)*1.2), max(6, len(campaigns)*0.8)))
sns.heatmap(df_ct, annot=True, cmap='viridis', fmt='.3f',
cbar_kws={'label': 'Association Score'}, ax=ax)
ax.set_title('Campaign-Technique Associations (len-norm, independent)',
fontsize=14, pad=20)
ax.set_xlabel('Technique', fontsize=12)
ax.set_ylabel('Campaign', fontsize=12)
# Adjust label rotation
plt.setp(ax.get_xticklabels(), rotation=45, ha='right')
plt.setp(ax.get_yticklabels(), rotation=0)
plt.tight_layout()
print("Technique heatmap generated successfully!")
return fig
except Exception as e:
print(f"Error in generate_technique_heatmap: {e}")
import traceback
traceback.print_exc()
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, f'Error occurred: {str(e)}',
ha='center', va='center', fontsize=12, color='red')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
with gr.Blocks(title="LLM Threat Graph Demo") as demo:
gr.Markdown("# 🕸️ LLM Threat Association Analysis\n*Visualizing Campaign-Actor-Technique relationships using Language Models*")
# Common inputs
with gr.Row():
campaigns = gr.Textbox(
"Operation Aurora, Dust Storm, ShadowHammer, NotPetya, SolarWinds",
label="Campaigns (comma-separated)",
placeholder="e.g., Operation Aurora, NotPetya, Stuxnet"
)
# Campaign-Actor section (probabilistic)
gr.Markdown("## 👤 Campaign-Actor Associations")
gr.Markdown("Visualizing Campaign-Actor relationships with probabilistic heatmaps")
gr.Markdown("""
**Calculation Method**: `P(actor | "{campaign} is conducted by") (softmax normalized)`
1. Calculate `phrase_log_prob("{campaign} is conducted by", actor)` for each Actor
2. Apply softmax normalization to create probability distribution (probabilities sum to 1.0 per Campaign)
3. Result: Shows relative likelihood of each Actor conducting each Campaign
""")
with gr.Row():
actor_prompt_template = gr.Textbox(
"is conducted by",
label="Actor Prompt Template",
placeholder="e.g., is conducted by, is attributed to"
)
actors = gr.Textbox(
"APT1, APT28, APT33, APT38, FIN8",
label="Actors (comma-separated)",
placeholder="e.g., APT1, Lazarus Group, Cozy Bear"
)
btn_actor = gr.Button("Generate Actor Heatmap", variant="primary")
plot_actor = gr.Plot(label="Campaign-Actor Heatmap")
btn_actor.click(
fn=generate_actor_heatmap,
inputs=[campaigns, actors, actor_prompt_template],
outputs=plot_actor,
show_progress=True
)
# Campaign-Technique section (independent scoring)
gr.Markdown("## 🛠️ Campaign-Technique Associations")
gr.Markdown("Visualizing Campaign-Technique relationships with independent association scores")
gr.Markdown("""
**Calculation Method**: `Binary Association Score (length-normalized, independent)`
1. For each Technique, calculate:
- `lp_pos = phrase_log_prob("{campaign} typically uses", technique)`
- `lp_neg = phrase_log_prob("{campaign} typically does NOT use", technique)`
2. Apply logistic transformation: `prob = 1 / (1 + exp(lp_neg - lp_pos))`
3. Length normalization: `score = prob / (n_tokens^0.7)` (penalty for longer phrases)
4. Result: Independent association scores (0-1) for each Campaign-Technique pair
""")
with gr.Row():
technique_prompt_template = gr.Textbox(
"typically uses",
label="Technique Prompt Template (positive)",
placeholder="e.g., typically uses, commonly employs"
)
technique_neg_template = gr.Textbox(
"typically does NOT use",
label="Technique Prompt Template (negative)",
placeholder="e.g., typically does NOT use, never employs"
)
techniques = gr.Textbox(
"T1059 Command and Scripting Interpreter, T1566 Phishing, T1027 Obfuscated/Stored Files, T1036 Masquerading, T1105 Ingress Tool Transfer, T1018 Remote System Discovery, T1568 Dynamic Resolution",
label="Techniques (comma-separated)",
placeholder="e.g., T1059 Command and Scripting Interpreter, T1566 Phishing"
)
btn_technique = gr.Button("Generate Technique Heatmap", variant="primary")
plot_technique = gr.Plot(label="Campaign-Technique Heatmap")
btn_technique.click(
fn=generate_technique_heatmap,
inputs=[campaigns, techniques, technique_prompt_template, technique_neg_template],
outputs=plot_technique,
show_progress=True
)
demo.launch()